[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-scheme] 156/324: mq-impl/stream: Implement on top of ports.
From: |
gnunet |
Subject: |
[gnunet-scheme] 156/324: mq-impl/stream: Implement on top of ports. |
Date: |
Tue, 21 Sep 2021 13:23:16 +0200 |
This is an automated email from the git hooks/post-receive script.
maxime-devos pushed a commit to branch master
in repository gnunet-scheme.
commit 1980948c623556395e735d9efa178a2107795cf1
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Sat Jul 17 16:52:19 2021 +0200
mq-impl/stream: Implement on top of ports.
* gnu/gnunet/mq-impl/stream.scm
(write-envelope!, handle-input!, handle-output!): New procedures.
* tests/mq-stream.scm
(no-sender, no-handlers, no-error-handler, check-slice-equal)
(simple-handler, blocking-output-port, blocking-output-port): New
procedures.
("messages + eof are injected in-order")
("overly small message is detected (--> stop)")
("premature eof is detected (--> stop)")
("envelopes are written (no blocking)")
("repeatable conditions can be used (blocking)"): New tests.
---
README.org | 2 +
gnu/gnunet/mq-impl/stream.scm | 132 ++++++++++++++++++++++++
tests/mq-stream.scm | 234 ++++++++++++++++++++++++++++++++++++++++++
3 files changed, 368 insertions(+)
diff --git a/README.org b/README.org
index 757b965..21bb600 100644
--- a/README.org
+++ b/README.org
@@ -86,6 +86,8 @@
capabilities; the interposition can be used to adjust
the ambient authority appropriately.
+ gnu/gnunet/mq.scm: the message queue itself!
+ + gnu/gnunet/mq-impl/stream.scm: generic implementation on top of
+ Guile's port abstraction.
+ TODO actual queues? Maybe we don't need them?
+ TODO filling the queues
diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
new file mode 100644
index 0000000..dcc5706
--- /dev/null
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -0,0 +1,132 @@
+;; This file is part of scheme-GNUnet.
+;; Copyright (C) 2021 Maxime Devos
+;;
+;; scheme-GNUnet is free software: you can redistribute it and/or modify it
+;; under the terms of the GNU Affero General Public License as published
+;; by the Free Software Foundation, either version 3 of the License,
+;; or (at your option) any later version.
+;;
+;; scheme-GNUnet is distributed in the hope that it will be useful, but
+;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;; Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program. If not, see <http://www.gnu.org/licenses/>.
+;;
+;; SPDX-License-Identifier: AGPL3.0-or-later
+
+;; C source: src/util/client.c (not completely ported).
+;; The Scheme implementation is rather different from the C implementation
+;; though.
+;;
+;; This module allows communication between GNUnet services using stream
+;; sockets.
+
+(define-library (gnu gnunet mq-impl stream)
+ (export write-envelope! handle-input! handle-output!)
+ (import (only (gnu gnunet mq)
+ make-one-by-one-sender inject-message! inject-error!)
+ (only (gnu gnunet utils bv-slice)
+ slice-bv slice-offset slice-length
+ slice-readable? bv-slice/read-write
+ slice/read-only)
+ (only (gnu gnunet mq envelope)
+ attempt-irrevocable-sent!)
+ (only (gnu gnunet utils tokeniser)
+ make-tokeniser add-from-port!)
+ (only (gnu gnunet utils hat-let)
+ let^)
+ (only (rnrs base)
+ begin define let values quote
+ assert)
+ (only (guile)
+ error)
+ (only (rnrs io ports)
+ put-bytevector)
+ (srfi srfi-26))
+ (begin
+ (define (write-envelope! output envelope)
+ "Write the envelope @var{envelope} to the output port @var{output},
+unless it is cancelled. @var{envelope} may not be already sent. This
+can block and raise I/O errors, depending on the port @var{output} and
+(in Guile) the current write waiter. As such, the caller might need to
+parameterise the current write waiter and install exception handlers."
+ (attempt-irrevocable-sent!
+ envelope
+ ((go message priority)
+ (assert (slice-readable? message))
+ (put-bytevector output (slice-bv message)
+ (slice-offset message) (slice-length message))
+ (values))
+ ((cancelled) (values))
+ ((already-sent) (error "tried to send an envelope twice"))))
+
+ ;; TODO: maybe note that this procedure blocks?
+ (define (handle-input! mq input)
+ "Keep reading message from the input port @var{input}.
+
+Feed each read message in-order to @var{mq} with @code{inject-message!}.
+This procedure might inject errors by its own as usual (e.g. when
+no appropriate message handler exists).
+
+If a message with an overly small message size it its header
+is encountered, inject the error @code{input:overly-small type size}
+into @var{mq}, where @var{type} is the message type as an integer
+(or @code{#f} if it could not be determined) and @var{size} is the
+message size in the header.
+
+When the first [1] end-of-file has been reached, inject the error
+@code{input:regular-end-of-file} into @var{mq}. If the end-of-file
+happened while inside a (partial) message, inject
+@code{input:premature-end-of-file} instead. In case of an I/O error,
+TODO.
+
+In these exceptional cases, the call to this procedure also returns
+after injecting the error. TODO closing message queues."
+ (let^ ((! tok (make-tokeniser))
+ (! (handle/message bv offset length)
+ (inject-message!
+ mq
+ ;; TODO: this allocates memory
+ (slice/read-only
+ (bv-slice/read-write bv offset length))))
+ (! (return/overly-small type size)
+ (inject-error! mq 'input:overly-small type size)
+ (values))
+ (! (return/premature-eof)
+ (inject-error! mq 'input:premature-end-of-file)
+ (values))
+ (! (return/done-eof)
+ (inject-error! mq 'input:regular-end-of-file)
+ (values)))
+ (add-from-port! tok input handle/message return/overly-small
+ return/done-eof return/premature-eof)))
+
+ (define (handle-output! mq output wait!)
+ "Keep sending message envelopes over the output port @var{output}.
+
+The messages to send are taken in-order from the message queue @var{mq}.
+In case of an I/O error, ???. When the message queue is (temporarily)
+empty, the thunk @var{wait!} is called. It should return when messages
+have been added to the queue.
+
+When using guile-fibers, @var{wait!} can be implemented with
+@code{await-trigger!} and by calling @code{trigger-condition!}
+from the ‘message sender’ of @var{mq}.
+
+TODO: closing, destroying @var{mq}, @var{output}."
+ (define (one-by-one-proc ev)
+ (write-envelope! output ev))
+ (define send-round
+ (cute (make-one-by-one-sender one-by-one-proc)
+ mq))
+ (let loop ()
+ ;; Doing 'wait!' or 'send-round' the other way around
+ ;; should be acceptable as well.
+ (send-round)
+ (wait!)
+ (loop)))
+
+ ;; TODO connecting to TCP ports, Unix domain sockets ...?
+ ))
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
new file mode 100644
index 0000000..fa70a1d
--- /dev/null
+++ b/tests/mq-stream.scm
@@ -0,0 +1,234 @@
+;; This file is part of scheme-GNUnet.
+;; Copyright (C) 2021 Maxime Devos
+;;
+;; scheme-GNUnet is free software: you can redistribute it and/or modify it
+;; under the terms of the GNU Affero General Public License as published
+;; by the Free Software Foundation, either version 3 of the License,
+;; or (at your option) any later version.
+;;
+;; scheme-GNUnet is distributed in the hope that it will be useful, but
+;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;; Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program. If not, see <http://www.gnu.org/licenses/>.
+;;
+;; SPDX-License-Identifier: AGPL3.0-or-later
+
+(use-modules (gnu gnunet mq-impl stream)
+ (gnu gnunet mq)
+ (gnu gnunet mq handler)
+ (gnu gnunet utils hat-let)
+ (gnu gnunet utils bv-slice)
+ (gnu gnunet concurrency repeated-condition)
+ (fibers conditions)
+ (fibers operations)
+ (fibers)
+ (rnrs bytevectors)
+ ((rnrs io ports) #:select (open-bytevector-input-port))
+ ((rnrs base) #:select (assert))
+ (srfi srfi-26)
+ (srfi srfi-43)
+ (rnrs io ports)
+ (ice-9 binary-ports)
+ (ice-9 suspendable-ports)
+ (ice-9 control))
+
+(define (no-sender . _)
+ (error "no sender!"))
+
+(define no-handlers (message-handlers))
+
+(define (no-error-handler . _)
+ (error "no error handler!"))
+
+(test-begin "mq-stream")
+
+(define (check-slice-equal slice bv)
+ (let^ ((!! (assert (= (slice-length slice)
+ (bytevector-length bv))))
+ (! slice-copy (make-bytevector (slice-length slice)))
+ (! copy (bv-slice/read-write slice-copy))
+ (<-- () (slice-copy! slice copy))
+ (!! (bytevector=? slice-copy bv)))
+ (values)))
+
+;; Without interposition, and the verifier always
+;; returns #t.
+(define (simple-handler type handle)
+ (make-message-handler
+ type
+ (lambda (thunk) (thunk))
+ (const #t)
+ handle))
+
+(test-assert "messages + eof are injected in-order"
+ (let^ ((! input/bv #vu8(0 4 0 1 ; Message type 1, size 4
+ 0 5 0 2 1 ; Message type 2, size 6
+ 0 6 0 3 2 1)) ; Message type 3, size 7
+ (! input (open-bytevector-input-port input/bv))
+ (! received 0)
+ (! (make-handler type expected-received expected-bv)
+ (simple-handler
+ type
+ (lambda (slice)
+ (assert (equal? received expected-received))
+ (check-slice-equal slice expected-bv)
+ (set! received (+ 1 received)))))
+ (! handler/1 (make-handler 1 0 #vu8(0 4 0 1)))
+ (! handler/2 (make-handler 2 1 #vu8(0 5 0 2 1)))
+ (! handler/3 (make-handler 3 2 #vu8(0 6 0 3 2 1)))
+ (! handlers
+ (message-handlers handler/1 handler/2 handler/3))
+ (! (error-handler . arguments)
+ (assert (equal? received 3))
+ (assert (equal? arguments '(input:regular-end-of-file)))
+ (set! received 'end-of-file))
+ (! mq (make-message-queue handlers error-handler no-sender))
+ (<-- () (handle-input! mq input)))
+ ;; TODO: should the port be closed?
+ (assert (equal? received 'end-of-file))))
+
+(test-assert "overly small message is detected (--> stop)"
+ (let^ ((! input/bv #vu8(0 4 0 0 ; Message type 0, size 4
+ 0 3 9 ; Overly small message, size 3, type != 0
+ 0 4 0 1)) ; Message type 1, size 4
+ ;; The first message is well-formatted and should therefore
+ ;; be injected. The second one isn't, so an appropriate error should
+ ;; injected. Then the message stream is broken, so the third
+ ;; message shouldn't be injected.
+ (! input (open-bytevector-input-port input/bv))
+ (! received 0)
+ (! handler/0
+ (simple-handler 0
+ (lambda (slice)
+ (assert (equal? received 0))
+ (check-slice-equal slice #vu8(0 4 0 0))
+ (set! received 1))))
+ (! handlers
+ (message-handlers handler/0))
+ (! (error-handler . arguments)
+ (assert (equal? received 1))
+ ;; Whether this malformed even has a message type is dubious,
+ ;; but if it has one, it will be (* 256 9).
+ (assert (equal? arguments `(input:overly-small ,(* 256 9) 3)))
+ (set! received 'overly-small))
+ (! mq (make-message-queue handlers error-handler no-sender))
+ (<-- () (handle-input! mq input)))
+ (assert (equal? received 'overly-small))))
+
+(test-assert "premature eof is detected (--> stop)"
+ (let^ ((! input/bv #vu8(0 8 7 6 5 4))
+ (! input (open-bytevector-input-port input/bv))
+ (! received #f)
+ (! (error-handler . arguments)
+ (assert (eq? received #f))
+ (assert (equal? arguments '(input:premature-end-of-file)))
+ (set! received #t))
+ (! mq (make-message-queue no-handlers error-handler no-sender))
+ (<-- () (handle-input! mq input)))
+ (assert (equal? received #t))))
+
+(test-equal "envelopes are written (no blocking)"
+ ;; Three messages
+ #vu8(0 4 0 1
+ 0 4 0 2
+ 0 4 0 3)
+ (let^ ((! messages #(#vu8(0 4 0 1)
+ #vu8(0 4 0 2)
+ #vu8(0 4 0 3)))
+ (<-- (port get-bytevector) (open-bytevector-output-port))
+ (! mq (make-message-queue no-handlers no-error-handler
+ (lambda (_) (values))))
+ (! (insert-message index message)
+ (send-message! mq (slice/read-only (bv-slice/read-write message))))
+ (<-- ()
+ (begin
+ (vector-for-each insert-message messages)
+ (values)))
+ (<-- ()
+ ;; The implementation detail that 'send-round'
+ ;; is called before 'wait!' is assumed here.
+ (let/ec ec
+ (handle-output! mq port ec)
+ (error "unreachable"))))
+ (get-bytevector)))
+
+(define (blocking-output-port port . block-positions)
+ (define (close)
+ (close-port port))
+ (define (write! bv index length)
+ (define p (port-position port))
+ (if (or (null? block-positions)
+ (< (+ p length) (car block-positions)))
+ (begin (put-bytevector port bv index length) length)
+ (let ((short (- (car block-positions) p)))
+ (put-bytevector port bv index short)
+ ((current-write-waiter) port/blocking)
+ (set! block-positions (cdr block-positions))
+ short)))
+ (define port/blocking
+ (make-custom-binary-output-port "" write! #f #f close))
+ (setvbuf port/blocking 'none)
+ port/blocking)
+
+;; The ‘blocking’ is to make this test case more interesting.
+;; It does not currently have any effect, but it is expected
+;; that the implementation of handle-output! will be changed
+;; to react to blocking, for implementing message queue
+;; shutdown.
+
+(test-equal "repeatable conditions can be used (blocking)"
+ '(#vu8(0 4 0 1 0 4 0 2) . 4) ; 4: number of times writing blocks
+ (let^ ((! rcvar (make-repeated-condition))
+ (! stop? (make-condition))
+ (! stopped? (make-condition))
+ (! (interrupt! mq)
+ (trigger-condition! rcvar))
+ (! escape/output (make-parameter #f))
+ (<-- (out/internal get-bytevector)
+ (open-bytevector-output-port))
+ ;; block writing a few times
+ (! out (blocking-output-port out/internal 0 1 3 7))
+ (! (wait!)
+ (perform-operation
+ (apply choice-operation
+ (prepare-await-trigger! rcvar)
+ (if (>= 8 (port-position out/internal))
+ (list (wrap-operation
+ (wait-operation stop?)
+ (lambda () ((escape/output)))))
+ '()))))
+ (! mq (make-message-queue no-handlers no-error-handler interrupt!))
+ (! n/blocked 0)
+ (! message/1 #vu8(0 4 0 1))
+ (! message/2 #vu8(0 4 0 2)))
+ (run-fibers
+ (lambda ()
+ (spawn-fiber
+ (lambda ()
+ (let/ec ec
+ (parameterize ((escape/output ec)
+ (current-write-waiter
+ (lambda (port)
+ (cond ((eq? port out)
+ (set! n/blocked (+ n/blocked 1)))
+ ((file-port? port)
+ ;; XXX ‘Attempt to suspend fiber within
+ ;; continuaton barrier’
+ #;((@@ (fibers) wait-for-writable)
port)
+ (select '() (list port) '()))))))
+ (handle-output! mq out wait!)))
+ (signal-condition! stopped?)))
+ (send-message! mq (bv-slice/read-write message/1))
+ (sleep 0.001)
+ (send-message! mq (bv-slice/read-write message/2))
+ (sleep 0.001)
+ (signal-condition! stop?)
+ (wait stopped?)
+ (cons (get-bytevector) n/blocked))
+ #:parallelism 1
+ #:hz 0)))
+
+(test-end "mq-stream")
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet-scheme] 128/324: netstruct: Raise &unwritable, not an &unreadable, in set%!., (continued)
- [gnunet-scheme] 128/324: netstruct: Raise &unwritable, not an &unreadable, in set%!., gnunet, 2021/09/21
- [gnunet-scheme] 131/324: tests: mq: Work-around guile-fibers bug., gnunet, 2021/09/21
- [gnunet-scheme] 143/324: tests: Extract conservative-gc? in a library., gnunet, 2021/09/21
- [gnunet-scheme] 150/324: Merge branch 'master' into proper-mq, gnunet, 2021/09/21
- [gnunet-scheme] 148/324: utils: tokeniser: Split message streams into individual messages., gnunet, 2021/09/21
- [gnunet-scheme] 149/324: mq: Delete unused and obsolete message-io module., gnunet, 2021/09/21
- [gnunet-scheme] 155/324: enum: Fix compilation error on Guile 3.0.7., gnunet, 2021/09/21
- [gnunet-scheme] 160/324: enum: symbol->value: Return #f if the symbol doesn't exist., gnunet, 2021/09/21
- [gnunet-scheme] 159/324: Makefile.am: Correct file name of test., gnunet, 2021/09/21
- [gnunet-scheme] 154/324: mq: Implement 'inject-error!'., gnunet, 2021/09/21
- [gnunet-scheme] 156/324: mq-impl/stream: Implement on top of ports.,
gnunet <=
- [gnunet-scheme] 162/324: Merge branch 'proper-mq', gnunet, 2021/09/21
- [gnunet-scheme] 161/324: guix: Patch 'guile' to fix some bugs., gnunet, 2021/09/21
- [gnunet-scheme] 152/324: utils: tokeniser: Implement 'add-from-port!'., gnunet, 2021/09/21
- [gnunet-scheme] 157/324: README.org: Note Guile 3.0.7 cannot be used for compilation., gnunet, 2021/09/21
- [gnunet-scheme] 151/324: utils: tokeniser: Some extra documentation., gnunet, 2021/09/21
- [gnunet-scheme] 153/324: concurrency: repeated-conditions: New module., gnunet, 2021/09/21
- [gnunet-scheme] 158/324: mq-impl/stream: Add modules and tests to 'Makefile.am'., gnunet, 2021/09/21
- [gnunet-scheme] 167/324: README: Graduate 'Message queues' to :test:good:, gnunet, 2021/09/21
- [gnunet-scheme] 146/324: mq: envelope: Allow testing whether an envelope is cancelled., gnunet, 2021/09/21
- [gnunet-scheme] 173/324: netstruct/procedural: Support IEEE doubles., gnunet, 2021/09/21