guile-user
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

stis engine


From: Stefan Israelsson Tampe
Subject: stis engine
Date: Wed, 25 Aug 2021 00:25:53 +0200

I have now made a fiber enables stream library called fpipe in my
stis-engine repository see

https://gitlab.com/tampe/stis-engine/-/tree/master/

The idea is to focus on a high performance byte streaming library on top of
wingo's fibers library and make heavy use of bytevector buffers. We will
also gp between a stream of scheme
values and these byte streams to seamlessly be able to integrate a good
overview of the data pipeline.

The following code uses a c-based serializer and deserializer of scheme
data structures
and allow for optional streamed compression and decompression and transport
it over ZeroMQ
networking which allow for thread/process/computer movement of data. The
end result is a way to create servers and clients.

It is instructive to show the code for the client and server pipelines is
constructed tp show off the fpipes library. This is not the final design
but moste components are done

Here is the client

(define* (make-client address ip-server? #key (context #f))
  ;; First we setup the zmq networking
  (define ctx    (if context context (make-zmq-context)))
  (define socket (zmq-socket context ZMQ_REQ))

  (if ip-server?
      (zmq-bind      socket address)
      (zmq-connect socket address))

  ;; we will define to fiber channels, channel in = ch1 and channel out =
ch2
  (define-values (ch1 ch2)

     ;; fpipe-construct is the general pipelining macro
    (fpipe-construct

      ;; this is a scheme condition that will match check a message bounded
to it
     (cond
      (#:scm it)

       ;; format of the matcher is (predicate . translatot) where if
predicate is true we will
       ;; push the message to the branching pipline this assumes a message
is the form
       ;; ((list-of-features) . message)
      (((memq 'compress (car it)) #:tr (cdr it))

       ;;  the c-based stremed serializer that integrates nicely with
fibers and streams
       ;;  the message transport is the form scm->bytesteam

       (mk-c-atom->fpipe)

       ;; the zlib compressor node will tarnsport as bytestream->bytestream
       compress-from-fpipe-to-fpipe

       ;; a bytestream->bytestream that will prepend a message with 1 to
indicate that the stream
       ;; has been compressed
       #:prepend #u8(1))

      ;;  if we do not have the compress feature then we will simply
generate the stream and
      ;; prepend a one e.g. not doing any compression
      (else
       (mk-c-atom->fpipe)
       #:prepend #u8(0)))

     ;; transport the message byetstream over the zmq socket this will
retrun in a scheme
      ;; stream where eof will survive as all control messages are and will
initiate the next
      ;; reading from the socket (when the request message has been fully
sent.
     (fpipe->zmq socket)

      ;; so here we get the return message
     (zmq->fpipe socket)

     ;; This is a bytestream cond and has no it part,
     (cond
       ;; We try to match the beginning of the bytestream message and if it
starts with 1
       ;; then we know that the reply message has been compressed
      ((#:match u8(1) #:skip 1)
       decompress-from-pipe-to-pipe)

      ;; else no compression.
      ((else #:skip 1)
       ))

     ;; the final step is to take the bytestream and make a scheme object
and put that
     ;; to the scheme stream and the pipe is finished
     (mk-fpipe->c-atom)))

   ;; fpipe-scheme takes a piplend from scm to scm and creates a function
of it.
   ;; each time the function is called with a scheme object we will send it
ot the server
   ;; from the return message create a scheme object that is returned from
the funciton
  (define action (fpipe-schemer ch1 ch2))

  ;; A little nicer interface and we are finished
  (lambda* (message #:key (compress? #f))
     (action (cons (if compress? '(compress) '()) message))))


;; SERVER
(define* (make-server server-lambda address ip-server? #key (context #f))

  (define schemer (fpipe-schemer ch1 ch2))

  (spawn-fiber
   (lambda ()
     (let lp ()
       (schemer %fpipe-eof%)
       (lp)))))


reply via email to

[Prev in Thread] Current Thread [Next in Thread]