[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[no subject]
From: |
Ludovic Courtès |
Date: |
Sat, 2 Sep 2023 16:51:38 -0400 (EDT) |
branch: wip-actors
commit edb8571e601c0cbca39e1ce51ce4446094987d2f
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Sat Sep 2 21:30:00 2023 +0200
http: Add /jobset/SPEC/hook/push route to trigger a jobset.
* src/cuirass/base.scm (%jobset-trigger-rate-window)
(%jobset-trigger-maximum-rate): New variables.
(jobset-monitor): Replace 'sleep' call with 'get-message*'. Define
'recent?' and 'rate'; use it to check whether the trigger rate is
exceeded.
* src/cuirass/http.scm (url-handler): Add /jobset/SPEC/hook/push route.
* src/cuirass/scripts/register.scm (bridge): Handle 'trigger-jobset'
requests.
---
src/cuirass/base.scm | 134 +++++++++++++++++++++++----------------
src/cuirass/http.scm | 15 +++++
src/cuirass/scripts/register.scm | 11 ++++
3 files changed, 107 insertions(+), 53 deletions(-)
diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index c7633e8..cc5c358 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -725,8 +725,15 @@ channels, and return its communication channel."
(spawn-fiber (channel-update-service channel))
channel))
-(define* (jobset-monitor channel ;currently unused
- spec update-service
+(define %jobset-trigger-rate-window
+ ;; Window (seconds) over which the jobset trigger rate is computed.
+ (* 5 60)) ;5 minutes
+
+(define %jobset-trigger-maximum-rate
+ ;; Maximum rate (triggers per seconds) at which jobsets may be triggered.
+ (/ 3 (* 2 60.))) ;3 times in 2 minutes
+
+(define* (jobset-monitor channel spec update-service
#:key (polling-period 60))
(define period
(if (> (specification-period spec) 0)
@@ -738,57 +745,78 @@ channels, and return its communication channel."
(lambda ()
(log-info "starting monitor for spec '~a'" name)
- (let loop ()
- (let ((timestamp (time-second (current-time time-utc))))
- (match (let ((reply (make-channel)))
- (log-info "fetching channels for spec '~a'" name)
- (put-message update-service
- `(fetch ,channels ,reply))
- (get-message reply))
- (#f
- (log-warning "failed to fetch channels for '~a'" name))
- (instances
- (log-info "fetched channels for '~a':~{ ~a~}"
- name (map channel-name channels))
- (let* ((channels (map channel-instance-channel instances))
- (new-spec (specification
- (inherit spec)
- ;; Include possible channel dependencies
- (channels channels)))
- (checkouttime (time-second (current-time time-utc)))
- (eval-id (db-add-evaluation name instances
- #:timestamp timestamp
- #:checkouttime checkouttime)))
-
- (when eval-id
- (spawn-fiber
- (lambda ()
- ;; TODO: Move this to an evaluation actor that limits
- ;; parallelism.
- (guard (c ((evaluation-error? c)
- (log-error "failed to evaluate spec '~a'; see ~a"
- (evaluation-error-spec-name c)
- (evaluation-log-file
- (evaluation-error-id c)))
- #f))
- (log-info "evaluating spec '~a'" name)
-
- ;; The LATEST-CHANNEL-INSTANCES procedure may return
channel
- ;; dependencies that are not declared in the initial
- ;; specification channels. Update the given SPEC to take
- ;; them into account.
- (db-add-or-update-specification new-spec)
- (evaluate spec eval-id)
- (db-set-evaluation-time eval-id)
- (with-store/non-blocking store
- (build-packages store eval-id)))))
-
- ;; 'spawn-fiber' returns zero values but we need one.
- *unspecified*))))
-
- (log-info "polling '~a' channels in ~a seconds" name period)
- (sleep period)
- (loop)))))
+ (let loop ((last-updates '()))
+ (unless (null? last-updates) ;first time?
+ (match (get-message* channel polling-period 'timeout)
+ ('timeout
+ (log-info "polling jobset '~a' after ~as timeout expiry"
+ name polling-period))
+ ('trigger
+ (log-info "triggered update of jobset '~a'" name))
+ (message
+ (log-warning "jobset '~a' got bogus message: ~s"
+ name message))))
+
+ (let* ((timestamp (time-second (current-time time-utc)))
+ (recent? (lambda (time)
+ (>= time (- timestamp %jobset-trigger-rate-window)))))
+ (define (rate lst)
+ ;; Return the (approximate) trigger rate (triggers per second).
+ (/ (count recent? lst) %jobset-trigger-rate-window 1.))
+
+ (if (> (rate last-updates) %jobset-trigger-maximum-rate)
+ (begin
+ (log-warning "trigger rate for jobset '~a' exceeded; skipping"
+ name)
+ (loop last-updates))
+ (begin
+ (match (let ((reply (make-channel)))
+ (log-info "fetching channels for spec '~a'" name)
+ (put-message update-service
+ `(fetch ,channels ,reply))
+ (get-message reply))
+ (#f
+ (log-warning "failed to fetch channels for '~a'" name))
+ (instances
+ (log-info "fetched channels for '~a':~{ ~a~}"
+ name (map channel-name channels))
+ (let* ((channels (map channel-instance-channel instances))
+ (new-spec (specification
+ (inherit spec)
+ ;; Include possible channel dependencies
+ (channels channels)))
+ (checkouttime (time-second (current-time time-utc)))
+ (eval-id (db-add-evaluation name instances
+ #:timestamp timestamp
+ #:checkouttime
checkouttime)))
+
+ (when eval-id
+ (spawn-fiber
+ (lambda ()
+ ;; TODO: Move this to an evaluation actor that limits
+ ;; parallelism.
+ (guard (c ((evaluation-error? c)
+ (log-error "failed to evaluate spec '~a';
see ~a"
+ (evaluation-error-spec-name c)
+ (evaluation-log-file
+ (evaluation-error-id c)))
+ #f))
+ (log-info "evaluating spec '~a'" name)
+
+ ;; The LATEST-CHANNEL-INSTANCES procedure may return
channel
+ ;; dependencies that are not declared in the initial
+ ;; specification channels. Update the given SPEC to
take
+ ;; them into account.
+ (db-add-or-update-specification new-spec)
+ (evaluate spec eval-id)
+ (db-set-evaluation-time eval-id)
+ (with-store/non-blocking store
+ (build-packages store eval-id)))))
+
+ ;; 'spawn-fiber' returns zero values but we need one.
+ *unspecified*))))
+
+ (loop (cons timestamp (take-while recent? last-updates)))))))))
(define* (spawn-jobset-monitor spec update-service
#:key (polling-period 60))
diff --git a/src/cuirass/http.scm b/src/cuirass/http.scm
index 4655e66..dfe1341 100644
--- a/src/cuirass/http.scm
+++ b/src/cuirass/http.scm
@@ -1169,6 +1169,21 @@ passed, only display JOBS targeting this SYSTEM."
(badge-svg spec badge-string summary
#:type (or type 0)))))
+ (('GET "jobset" spec "hook" "push")
+ (let ((spec (db-get-specification spec)))
+ (if spec
+ (if bridge
+ (let ((name (specification-name spec)))
+ (write `(trigger-jobset ,(string->symbol name))
+ bridge)
+ (newline bridge)
+ (respond-json
+ (scm->json-string `((jobset . ,name)))))
+ (begin
+ (log-warning "push hook disabled")
+ (respond-json-with-error 500 "Push hook disabled.")))
+ (respond-json-with-error 404 "Jobset not found."))))
+
(('GET "workers")
(respond-html
(html-page
diff --git a/src/cuirass/scripts/register.scm b/src/cuirass/scripts/register.scm
index 67fd905..a8dc2ea 100644
--- a/src/cuirass/scripts/register.scm
+++ b/src/cuirass/scripts/register.scm
@@ -123,6 +123,17 @@
(match (db-get-specification name)
(#f (log-warning "requested spec '~a' not found" name))
(spec (register-jobset registry spec))))
+ (`(trigger-jobset ,name)
+ (match (lookup-jobset registry name)
+ (#f (log-warning "requested jobset '~a' not found" name))
+ (jobset
+ ;; Trigger a jobset update. Since the jobset might take a
+ ;; while to get our message (it might be waiting for a
+ ;; previous pull to complete), send it in a separate fiber.
+ (spawn-fiber
+ (lambda ()
+ (log-info "triggering jobset '~a'" name)
+ (put-message jobset 'trigger))))))
(_
#f))
(loop (+ 1 count))))))
- branch wip-actors created (now edb8571), Ludovic Courtès, 2023/09/02
- [no subject], Ludovic Courtès, 2023/09/02
- [no subject], Ludovic Courtès, 2023/09/02
- [no subject],
Ludovic Courtès <=
- [no subject], Ludovic Courtès, 2023/09/02
- [no subject], Ludovic Courtès, 2023/09/02
- [no subject], Ludovic Courtès, 2023/09/02