[Top][All Lists]

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Anouncement stis-engine 0.1

From: Stefan Israelsson Tampe
Subject: Anouncement stis-engine 0.1
Date: Tue, 31 Aug 2021 02:08:54 +0200

Hi guilers!

I am now happy to announce the first version 0.1 stis-engine. With this
tool you can manage data streams with quite good throughput.

Consider a case with streaming objects over the network and allow to have a
prefix of the
stream where  you control what kind of serializer are used and if streamed
zipping is done.
Below is the server and client implementation in
module/fibers/stis-parser/q.scm. 1 400MB big
bytevector transports from the client to the server and back with about one
second. I did the transport over a zmq  thread communication link. We reach
quite high throughput as we only do logic in the header of the stream so we
can copy the buffers themselves when we can instead
of copying the bytes.

The sizes of the buffers is quite large as to make sure that the fiber
overhead is not too large
The main constraint is from copying bytes, just the serializer and
deserialiser used to copy an object would be 10x the speed. But this is for
simple data structures like bytevectors. Already a list of numbers instead
of bytevector the serialisation and deserialisation starts to dominate as
bytestream operations is essentially memmove memcpy and those are insanely

Some other examples are audio and image streams. you can glue stream
operations together
in guile and have close to C speed for large amount of data.

Here are the code:
;;We can make an abstractions as such:
(define-fpipe-construct #:scm (zmq->atom socket)
    (zmq->fpipe socket)                      ; read from the network to a
byte stream
    (let ((it)                                           ; pick up the
first byte (it1 ...) means the bytes prefix
            (opt?  (= 1 (logand it 1)))
            (text? (= 2 (logand it 2))))

        (fpipe-skip 1)                              ; skip the prefix

        (when opt?
            (uncompress-from-fpipe-to-fpipe))   ; id the stream was
compressed decompress

       (if text?                                                  ; if text
the transport is in cleartext e.g. scheme

(define-fpipe-construct #:scm (atom->zmq socket)
     (let ( it                                                     ;; This
picks a scheme object (header . payload)
           (header (car it))
           (opt?   (memq 'compress header))
           (text?  (memq 'text     header))
           (tag     (logior (if opt? 1 0) (if text? 2 0))))

       (if text?

       (when opt?
           (compress-from-fpipe-to-fpipe #:level 3))

       (fpipe-prepend tag)                       ; prepend the stream with
the tag ...

       (fpipe->zmq socket)))

;;Now we can make a server and a client out like so,

(define* (make-client address ip-server? #:key (context #f))
  (define ctx    (if context context (zmq-init)))
  (define socket (zmq-socket ctx ZMQ_REQ))

  (if ip-server?
      (zmq-bind    socket address)
      (zmq-connect socket address))

  ;; here we construct the scheme pipeline using the abstraction
  (define-values (ch1 ch2)
     (atom->zmq socket)
     (zmq->atom socket)))

  (define action (fpipe-schemer ch1 ch2))
  (lambda* (message #:key (compress? #f))
    (cdr (action (cons (if compress? '(compress) '()) message)))))

;; the server:
(define* (run-server server-lambda address ip-server? #:key (context #f))
  (define ctx    (if context context (zmq-init)))
  (define socket (zmq-socket ctx ZMQ_REP))
  (define (lam x)
    (call-with-values server-lambda
      (lambda* (message #:key (compress? #f))
         (cons (if compress? '(compress) '()) message))))

  (if ip-server?
      (zmq-bind    socket address)
      (zmq-connect socket address))

  ;; the pipeline using the abstractions
  (define-values (ch1 ch2)
          (zmq->atom socket)
          (fpipe-map server-lambda)
          (atom->zmq socket)))

  (define schemer (fpipe-schemer ch1 ch2))

   (lambda ()
       (let lp ()
           (schemer %fpipe-eof%)
      #:parallel? #f))

reply via email to

[Prev in Thread] Current Thread [Next in Thread]