[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-scheme] 142/324: mq: Implement envelope cancellation callback.
From: |
gnunet |
Subject: |
[gnunet-scheme] 142/324: mq: Implement envelope cancellation callback. |
Date: |
Tue, 21 Sep 2021 13:23:02 +0200 |
This is an automated email from the git hooks/post-receive script.
maxime-devos pushed a commit to branch master
in repository gnunet-scheme.
commit 6103fb386915a6ef4523cce90a2d917c0d74aa93
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Thu Jun 17 22:09:38 2021 +0200
mq: Implement envelope cancellation callback.
* gnu/gnunet/mq.scm
(<message-queue>)[messages/box]: Replace by ...
(<messages+garbage/box>)[messages+garbage/box]: ... this field.
(make-message-queue): Initialise new field appropriately.
(make-one-by-one-sender): Use new field. Decrement the garbage
counter when a cancelled envelope is encountered. Ignore cancelled
envelopes.
(message-queue-length): Use new field.
(%message-queue-garbagitude): New procedure, for the test suite.
(send-message!)[cancel!]: Unstub. If the message queue still
exists, consider removing cancelled envelopes from the queue.
(send-message!): Use new field.
(queue-filter): New procedure.
(increment-garbage&maybe-cleanup): Delete cancelled envelopes
from the queue, but not too often, for efficiency.
---
gnu/gnunet/mq.scm | 154 +++++++++++++++++++++++++++++++++++++++++-------------
1 file changed, 118 insertions(+), 36 deletions(-)
diff --git a/gnu/gnunet/mq.scm b/gnu/gnunet/mq.scm
index 89d5f6e..af2a9fd 100644
--- a/gnu/gnunet/mq.scm
+++ b/gnu/gnunet/mq.scm
@@ -26,7 +26,7 @@
;; Messages are made of bytes. In particular,
;; messages must be prefixed by a /:message-header.
;;
-;; TODO cancelling, injecting messages, message handlers ...
+;; TODO injecting messages, message handlers ...
;; These are not implemented yet or untested, or need
;; more documentation!
(define-library (gnu gnunet mq)
@@ -34,6 +34,7 @@
make-one-by-one-sender
inject-message! send-message!
message-queue-length
+ %message-queue-garbagitude
try-send-again!
&missing-header-error make-missing-header-error
@@ -59,13 +60,16 @@
(only (gnu gnunet netstruct syntactic)
sizeof read%)
(only (guile) define* exact-integer?)
+ (only (ice-9 weak-vector)
+ weak-vector weak-vector-ref)
(only (ice-9 atomic)
make-atomic-box atomic-box-ref
atomic-box-compare-and-swap!)
(only (rnrs base)
lambda assert let begin define
procedure? eq? >= = <= < if quote
- values and let* not)
+ values and let* not cons car cdr
+ cond + - > *)
(only (rnrs control)
when unless)
(only (rnrs conditions)
@@ -74,19 +78,26 @@
(only (rnrs exceptions)
raise raise-continuable)
(only (rnrs records syntactic) define-record-type)
+ (only (srfi srfi-1) filter)
(only (srfi srfi-8) receive)
(only (srfi srfi-39) make-parameter)
(prefix (only (pfds queues)
make-queue dequeue enqueue queue-length
- queue-empty?)
+ queue-empty? queue->list list->queue)
#{pfds:}#))
(begin
(define-record-type (<message-queue> make-message-queue message-queue?)
(fields (immutable handlers message-queue-handlers)
(immutable error-handler message-queue-error-handler)
- ;; Atomic box of a queue of messages to send
- ;; (as @code{<envelope>} objects).
- (immutable messages/box message-queue-messages/box)
+ ;; Atomic box of a queue of messages to send (as @code{<envelope>}
+ ;; objects), together with an over-estimate of how many items in
+ ;; the queue are already cancelled, used as a heuristic for when
+ ;; optimising the message queue is required.
+ ;;
+ ;; It can occassionally be an under-estimate due to marking
+ ;; envelopes as cancelled and updating the estimate not being
+ ;; an atomic operation.
+ (immutable messages+garbage/box
message-queue-messages+garbage/box)
;; A procedure for actually sending the messages.
;; It accepts a single argument, the message queue.
;;
@@ -115,7 +126,7 @@ Messages are sent with @var{sender}. It can be created with
#;(assert (message-handlers? handlers))
#;(assert (message-handler? error-handler))
(%make handlers error-handler
- (make-atomic-box (pfds:make-queue))
+ (make-atomic-box (cons (pfds:make-queue) 0))
sender)))))
(define (make-one-by-one-sender proc)
@@ -133,35 +144,42 @@ messages must be sent in-order (TODO really received
in-order?)."
(lambda (mq)
(assert (message-queue? mq))
(%%bind-atomic-boxen
- ((queue (message-queue-messages/box mq) swap!))
+ ((queue+garbage (message-queue-messages+garbage/box mq) swap!))
;; First extract an envelope ...
- (let spin ((old queue))
+ (let spin ((old queue+garbage))
+ (define old-queue (car old))
+ (define old-garbage (cdr old))
+ (assert (<= 0 old-garbage))
;; ... unless there isn't anything to remove anymore.
;; This check cannot be moved outside the (let spin ...),
;; as message senders may be called at any time
;; (even if there are no messages!). Also, in case of
;; concurrency, the queue may become empty after a spin
;; iteration.
- (unless (pfds:queue-empty? old)
- (receive (envelope new) (pfds:dequeue old)
- (when (eq? old (swap! old new))
- ;; We extracted a message. Now do something
- ;; with it!
- ;;
- ;; Make sure @var{proc} does not return
- ;; any values, as we may want to assign
- ;; meaning to return values later.
- (receive ()
- ;; Process the message.
- (proc envelope)
- 'nothing))
- ;; Process the remaining messages
- ;; / Someone modified the message queue
- ;; before us; retry.
- ;;
+ (unless (pfds:queue-empty? old-queue)
+ (receive (envelope new-queue) (pfds:dequeue old-queue)
+ (cond ((envelope-peek-cancelled? envelope)
+ ;; There is no need to pass already cancelled
+ ;; envelopes to @var{proc} (although passing them
+ ;; anyway should be harmless), so remove them
+ ;; from the queue. Also try to keep the estimate
+ ;; accurate.
+ (swap! old (cons new-queue (- old-garbage 1))))
+ ;; We extracted a (not-yet-cancelled) envelope.
+ ;; Now do something with it!
+ ((eq? old (swap! old (cons new-queue old-garbage)))
+ ;; Make sure @var{proc} does not return
+ ;; any values, as we may want to assign
+ ;; meaning to return values later.
+ (receive ()
+ ;; Process the message.
+ (proc envelope)
+ (values))))
+ ;; Process remaining messages (or retry in case there was
+ ;; a race and we lost it).
;; TODO: if someone else modified the message queue,
;; does that mean we don't have to anymore?
- (spin queue)))))))
+ (spin queue+garbage)))))))
(define (inject-message! mq message)
"Call the message handler that was registered
@@ -215,7 +233,12 @@ of message queues."
(define (message-queue-length mq)
"How many messages are currently in the message queue @var{mq}?"
(pfds:queue-length
- (atomic-box-ref (message-queue-messages/box mq))))
+ (car (atomic-box-ref (message-queue-messages+garbage/box mq)))))
+
+ (define (%message-queue-garbagitude mq)
+ "Return the estimated amount of cancelled envelopes. This procedure
+is not part of the API and is only intended for the test suite."
+ (cdr (atomic-box-ref (message-queue-messages+garbage/box mq))))
;; TODO: should this be a subtype of the not-yet-existing
;; &malformed-message?
@@ -261,13 +284,19 @@ but in case of an exception (for example, an
out-of-memory exception during
the handling of a @code{&overly-full-queue-warning}), it is possible
the envelope isn't returned even though it has been enqueued and it might
perhaps be sent."
+ (define mq/weak
+ (let ((v (weak-vector mq)))
+ (lambda () (weak-vector-ref v 0))))
(define (cancel!)
- (assert (and #f "cancel! not yet implemented")))
+ (let ((mq (mq/weak)))
+ (if mq
+ (increment-garbage&maybe-cleanup mq)
+ (values))))
(assert (and (slice? message)
(exact-integer? priority)
(<= 0 priority) (< priority 512)))
(%%bind-atomic-boxen
- ((queue (message-queue-messages/box mq) swap-queue!))
+ ((queue+garbage (message-queue-messages+garbage/box mq) swap!))
;; Add the message to the queue. Also remember the
;; length of the new queue; we'll need it later.
(let* ((envelope (make-envelope cancel!
@@ -275,11 +304,13 @@ perhaps be sent."
#:priority priority
#:notify-sent! notify-sent!))
(queue-length
- (let spin ((old queue))
- (let ((new (pfds:enqueue old envelope)))
- (if (eq? old (swap-queue! old new))
- (pfds:queue-length new)
- (spin queue))))))
+ (let spin ((old queue+garbage))
+ (let* ((old-queue (car old))
+ (old-garbage (cdr old))
+ (new-queue (pfds:enqueue old-queue envelope)))
+ (if (eq? old (swap! old (cons new-queue old-garbage)))
+ (pfds:queue-length new-queue)
+ (spin queue+garbage))))))
;; The C implementation emits a warning if the queue has
;; many entries, as this may indicate a bug (in the scheduler,
;; in the queue implementation, ...). This seems a good idea.
@@ -297,4 +328,55 @@ perhaps be sent."
(define (try-send-again! mq)
"Try to send messages in the queue @var{mq} that were not yet sent.
This is expected to be called from the message queue implementation."
- ((message-queue-sender mq) mq))))
+ ((message-queue-sender mq) mq))
+
+ (define (queue-filter ? queue)
+ "Construct a queue, based on @var{queue}, restricted to elements
+satisfying the predicate @var{?}."
+ (pfds:list->queue (filter ? (pfds:queue->list queue))))
+
+ (define (increment-garbage&maybe-cleanup mq)
+ "Increment the garbage counter of @var{mq} and perhaps
+take out the trash (i.e., cancelled envelopes still in the queue),
+and if the trash is taken out, reset the garbage counter to zero,
+as an atomic operation."
+ (%%bind-atomic-boxen
+ ((queue+garbage (message-queue-messages+garbage/box mq) swap!))
+ (let loop ((old queue+garbage))
+ (let* ((old-queue (car old))
+ (old-queue-length (pfds:queue-length old-queue))
+ (old-garbage (cdr old))
+ (incremented-garbage (+ 1 old-garbage)))
+ (assert (<= 0 old-garbage))
+ ;; If the messages in the queue are largely
+ ;; garbage, throw the garbage out. The procedure
+ ;; choses to throw the garbage out if the (estimated)
+ ;; ratio of garbage to the queue length is more than
+ ;; 3/4.
+ ;;
+ ;; There are no deep theoretical reasons for choosing
+ ;; the ratio 3/4=0.75, only that it is between 1/2 and
+ ;; 1. Choosing a ratio seemed less arbitrary than, say,
+ ;; only collect garbage if the garbage exceeds some
+ ;; fixed amount.
+ (if (> (* 4 incremented-garbage) (* 3 old-queue-length))
+ ;; It is time to collect garbage!
+ ;; Construct a new queue with all garbage removed.
+ (let ((filtered (queue-filter
+ (lambda (i)
+ (not (envelope-peek-cancelled? i)))
+ old-queue)))
+ ;; Try to write this new queue,
+ ;; resetting the garbage counter.
+ (if (eq? old (swap! old (cons filtered 0)))
+ ;; All garbage has been thrown out! Done!
+ (values)
+ ;; We lost the race, try again!
+ (loop queue+garbage)))
+ ;; Not yet time for garbage collection,
+ ;; just increment the garbage counter
+ (if (eq? old (swap! old (cons old-queue incremented-garbage)))
+ ;; The garbage counter has been incremented! Done!
+ (values)
+ ;; We lost the race, try again!
+ (loop queue+garbage)))))))))
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet-scheme] 99/324: config: Parse configuration lines into one of several types., (continued)
- [gnunet-scheme] 99/324: config: Parse configuration lines into one of several types., gnunet, 2021/09/21
- [gnunet-scheme] 115/324: hat-let: Drop incorrect copyright notices., gnunet, 2021/09/21
- [gnunet-scheme] 125/324: netstruct: Only select the relevant part for writing., gnunet, 2021/09/21
- [gnunet-scheme] 127/324: netstruct: Recurse in ‘part’; allow fields within fields., gnunet, 2021/09/21
- [gnunet-scheme] 109/324: doc: Update ROADMAP.org., gnunet, 2021/09/21
- [gnunet-scheme] 134/324: mq: Clarify how a message handlers are chosen., gnunet, 2021/09/21
- [gnunet-scheme] 135/324: mq: Fix typo in docstring of inject-message!., gnunet, 2021/09/21
- [gnunet-scheme] 136/324: mq: Verify message size during message injection., gnunet, 2021/09/21
- [gnunet-scheme] 92/324: guix: Add guile-quickcheck dependency., gnunet, 2021/09/21
- [gnunet-scheme] 138/324: mq: Fix &who for &overly-full-queue-warning., gnunet, 2021/09/21
- [gnunet-scheme] 142/324: mq: Implement envelope cancellation callback.,
gnunet <=
- [gnunet-scheme] 111/324: config: parser: Remove fixed TODO., gnunet, 2021/09/21
- [gnunet-scheme] 114/324: config: parser: More TODOs about the configuration format., gnunet, 2021/09/21
- [gnunet-scheme] 119/324: netstruct: Correct size of u8., gnunet, 2021/09/21
- [gnunet-scheme] 122/324: netstruct: Pass index for u8 (zero)., gnunet, 2021/09/21
- [gnunet-scheme] 123/324: netstruct: Select a part of the slice before trying to read., gnunet, 2021/09/21
- [gnunet-scheme] 124/324: netstruct: Use the correct size for bounds checking in 'select'., gnunet, 2021/09/21
- [gnunet-scheme] 130/324: mq: New module, replacing message-io., gnunet, 2021/09/21
- [gnunet-scheme] 132/324: mq: Do not hardcode suspicious queue length., gnunet, 2021/09/21
- [gnunet-scheme] 133/324: mq: Pluralise ‘message-queue-handler’., gnunet, 2021/09/21
- [gnunet-scheme] 137/324: mq: Make %suspicious-length a sort-of exported parameter., gnunet, 2021/09/21