guile-user
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: stis engine


From: Stefan Israelsson Tampe
Subject: Re: stis engine
Date: Wed, 25 Aug 2021 00:31:25 +0200

;; Oups I managed to send the message by accident without finishing it.
;; The server part is similar and I will drop the actual pipeline and
highlight the differenece
;; which is that we will get a message stream it to an scheme object call
server-.lambda below
;; with it and finally from that lambdas return value reply to the client
we will create a loop where
;; the server waits for questions

(define* (make-server server-lambda address ip-server? #key (context #f))

  (define schemer (fpipe-schemer ch1 ch2))

  (spawn-fiber
   (lambda ()
     (let lp ()
       (schemer %fpipe-eof%)
       (lp)))))

Finally the idea is to use it as


(define context    (make-zmq-context))
(define address    "") ;; ZeroMQ address
(define ip-server? #t) ;; if we will use bind or connect

(make-server (lambda (x) (cons '() x)) context address ip-server?)

(define client (make-client context address ip-server?))
  ;; send a mesage that is not compressed
  > (client "abc")
  "abc"

  > (length (client (iota 1000000) #:compress? #t))
  1000000

in a run fibers context

On Wed, Aug 25, 2021 at 12:25 AM Stefan Israelsson Tampe <
stefan.itampe@gmail.com> wrote:

> I have now made a fiber enables stream library called fpipe in my
> stis-engine repository see
>
> https://gitlab.com/tampe/stis-engine/-/tree/master/
>
> The idea is to focus on a high performance byte streaming library on top
> of wingo's fibers library and make heavy use of bytevector buffers. We will
> also gp between a stream of scheme
> values and these byte streams to seamlessly be able to integrate a good
> overview of the data pipeline.
>
> The following code uses a c-based serializer and deserializer of scheme
> data structures
> and allow for optional streamed compression and decompression and
> transport it over ZeroMQ
> networking which allow for thread/process/computer movement of data. The
> end result is a way to create servers and clients.
>
> It is instructive to show the code for the client and server pipelines is
> constructed tp show off the fpipes library. This is not the final design
> but moste components are done
>
> Here is the client
>
> (define* (make-client address ip-server? #key (context #f))
>   ;; First we setup the zmq networking
>   (define ctx    (if context context (make-zmq-context)))
>   (define socket (zmq-socket context ZMQ_REQ))
>
>   (if ip-server?
>       (zmq-bind      socket address)
>       (zmq-connect socket address))
>
>   ;; we will define to fiber channels, channel in = ch1 and channel out =
> ch2
>   (define-values (ch1 ch2)
>
>      ;; fpipe-construct is the general pipelining macro
>     (fpipe-construct
>
>       ;; this is a scheme condition that will match check a message
> bounded to it
>      (cond
>       (#:scm it)
>
>        ;; format of the matcher is (predicate . translatot) where if
> predicate is true we will
>        ;; push the message to the branching pipline this assumes a message
> is the form
>        ;; ((list-of-features) . message)
>       (((memq 'compress (car it)) #:tr (cdr it))
>
>        ;;  the c-based stremed serializer that integrates nicely with
> fibers and streams
>        ;;  the message transport is the form scm->bytesteam
>
>        (mk-c-atom->fpipe)
>
>        ;; the zlib compressor node will tarnsport as bytestream->bytestream
>        compress-from-fpipe-to-fpipe
>
>        ;; a bytestream->bytestream that will prepend a message with 1 to
> indicate that the stream
>        ;; has been compressed
>        #:prepend #u8(1))
>
>       ;;  if we do not have the compress feature then we will simply
> generate the stream and
>       ;; prepend a one e.g. not doing any compression
>       (else
>        (mk-c-atom->fpipe)
>        #:prepend #u8(0)))
>
>      ;; transport the message byetstream over the zmq socket this will
> retrun in a scheme
>       ;; stream where eof will survive as all control messages are and
> will initiate the next
>       ;; reading from the socket (when the request message has been fully
> sent.
>      (fpipe->zmq socket)
>
>       ;; so here we get the return message
>      (zmq->fpipe socket)
>
>      ;; This is a bytestream cond and has no it part,
>      (cond
>        ;; We try to match the beginning of the bytestream message and if
> it starts with 1
>        ;; then we know that the reply message has been compressed
>       ((#:match u8(1) #:skip 1)
>        decompress-from-pipe-to-pipe)
>
>       ;; else no compression.
>       ((else #:skip 1)
>        ))
>
>      ;; the final step is to take the bytestream and make a scheme object
> and put that
>      ;; to the scheme stream and the pipe is finished
>      (mk-fpipe->c-atom)))
>
>    ;; fpipe-scheme takes a piplend from scm to scm and creates a function
> of it.
>    ;; each time the function is called with a scheme object we will send
> it ot the server
>    ;; from the return message create a scheme object that is returned from
> the funciton
>   (define action (fpipe-schemer ch1 ch2))
>
>   ;; A little nicer interface and we are finished
>   (lambda* (message #:key (compress? #f))
>      (action (cons (if compress? '(compress) '()) message))))
>
>
> ;; SERVER
> (define* (make-server server-lambda address ip-server? #key (context #f))
>
>   (define schemer (fpipe-schemer ch1 ch2))
>
>   (spawn-fiber
>    (lambda ()
>      (let lp ()
>        (schemer %fpipe-eof%)
>        (lp)))))
>
>


reply via email to

[Prev in Thread] Current Thread [Next in Thread]