[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[no subject]
From: |
Ludovic Courtès |
Date: |
Wed, 13 Sep 2023 13:05:39 -0400 (EDT) |
branch: wip-actors
commit 0ddd43fd3098f85396fd9ff506c08d1499c6a139
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Wed Sep 13 12:48:12 2023 +0200
base: Implement ‘update-jobset’.
* src/cuirass/base.scm (jobset-monitor): Thread SPEC through the loop.
Introduce ‘perform-update’. Implement ‘update-spec’ message handling.
(jobset-registry): Handle ‘update’ messages.
(update-jobset): New procedure.
---
src/cuirass/base.scm | 119 +++++++++++++++++++++++++++++++--------------------
1 file changed, 73 insertions(+), 46 deletions(-)
diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index 932af4d..97a7ddf 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -77,8 +77,11 @@
spawn-channel-update-service
spawn-jobset-evaluator
spawn-jobset-registry
+
lookup-jobset
register-jobset
+ update-jobset
+
evaluation-log-file
latest-checkouts
@@ -810,56 +813,68 @@ concurrently."
(define* (jobset-monitor channel spec
#:key (polling-period 60)
update-service evaluator)
- (define period
- (if (> (specification-period spec) 0)
- (specification-period spec)
- polling-period))
-
(define name (specification-name spec))
- (define channels (specification-channels spec))
(lambda ()
(log-info "starting monitor for spec '~a'" name)
- (let loop ((last-updates '()))
- (unless (null? last-updates) ;first time?
- (match (get-message* channel polling-period 'timeout)
- ('timeout
- (log-info "polling jobset '~a' after ~as timeout expiry"
- name polling-period))
- ('trigger
- (log-info "triggered update of jobset '~a'" name))
- (message
- (log-warning "jobset '~a' got bogus message: ~s"
- name message))))
-
- (let* ((timestamp (time-second (current-time time-utc)))
- (recent? (lambda (time)
- (>= time (- timestamp %jobset-trigger-rate-window)))))
- (define (rate lst)
- ;; Return the (approximate) trigger rate (triggers per second).
- (/ (count recent? lst) %jobset-trigger-rate-window 1.))
-
- ;; Mitigate the risk of a DoS attack by rejecting frequent requests.
- (if (> (rate last-updates) %jobset-trigger-maximum-rate)
- (begin
- (log-warning "trigger rate for jobset '~a' exceeded; skipping"
- name)
- (loop last-updates))
- (begin
- (match (let ((reply (make-channel)))
- (log-info "fetching channels for spec '~a'" name)
- (put-message update-service
- `(fetch ,channels ,reply))
- (get-message reply))
- (#f
- (log-warning "failed to fetch channels for '~a'" name))
- (instances
- (log-info "fetched channels for '~a':~{ ~a~}"
- name (map channel-name channels))
- (put-message evaluator
- `(evaluate ,spec ,instances ,timestamp))))
-
- (loop (cons timestamp (take-while recent? last-updates)))))))))
+ (let loop ((spec spec)
+ (last-updates '()))
+ (define period
+ (if (> (specification-period spec) 0)
+ (specification-period spec)
+ polling-period))
+
+ (define channels
+ (specification-channels spec))
+
+ (define (perform-update)
+ (let* ((timestamp (time-second (current-time time-utc)))
+ (recent? (lambda (time)
+ (>= time (- timestamp
%jobset-trigger-rate-window)))))
+ (define (rate lst)
+ ;; Return the (approximate) trigger rate (triggers per second).
+ (/ (count recent? lst) %jobset-trigger-rate-window 1.))
+
+ ;; Mitigate the risk of a DoS attack by rejecting frequent requests.
+ (if (> (rate last-updates) %jobset-trigger-maximum-rate)
+ (begin
+ (log-warning "trigger rate for jobset '~a' exceeded; skipping"
+ name)
+ (loop spec last-updates))
+ (begin
+ (match (let ((reply (make-channel)))
+ (log-info "fetching channels for spec '~a'" name)
+ (put-message update-service
+ `(fetch ,channels ,reply))
+ (get-message reply))
+ (#f
+ (log-warning "failed to fetch channels for '~a'" name))
+ (instances
+ (log-info "fetched channels for '~a':~{ ~a~}"
+ name (map channel-name channels))
+ (put-message evaluator
+ `(evaluate ,spec ,instances ,timestamp))))
+
+ (loop spec
+ (cons timestamp (take-while recent? last-updates)))))))
+
+ (if (null? last-updates) ;first time?
+ (perform-update)
+ (match (get-message* channel polling-period 'timeout)
+ ('timeout
+ (log-info "polling jobset '~a' after ~as timeout expiry"
+ name polling-period)
+ (perform-update))
+ ('trigger
+ (log-info "triggered update of jobset '~a'" name)
+ (perform-update))
+ (`(update-spec ,spec)
+ (log-info "updating spec of jobset '~a'" name)
+ (loop spec last-updates))
+ (message
+ (log-warning "jobset '~a' got bogus message: ~s"
+ name message)
+ (loop spec last-updates)))))))
(define* (spawn-jobset-monitor spec
#:key (polling-period 60)
@@ -895,6 +910,14 @@ POLLING-PERIOD seconds."
(#f #f)
((_ . actor) actor)))
(loop registry))
+ (`(update ,spec)
+ (let ((name (string->symbol (specification-name spec))))
+ (match (vhash-assq name registry)
+ (#f
+ (log-error "cannot update non-existent spec '~s'" name))
+ ((_ . monitor)
+ (put-message monitor `(update-spec ,spec)))))
+ (loop registry))
(`(register ,spec)
(match (vhash-assq (specification-name spec) registry)
(#f
@@ -934,3 +957,7 @@ monitoring actor for each 'register' message it receives."
"Register a new jobset of SPEC. REGISTRY is the channel returned by
'spawn-jobset-registry'."
(put-message registry `(register ,spec)))
+
+(define* (update-jobset registry spec)
+ "Update SPEC, so far known under FORMER-NAME, in REGISTRY."
+ (put-message registry `(update ,spec)))
- [no subject], (continued)
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject],
Ludovic Courtès <=
- [no subject], Ludovic Courtès, 2023/09/13