[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[no subject]
From: |
Ludovic Courtès |
Date: |
Wed, 13 Sep 2023 13:05:39 -0400 (EDT) |
branch: wip-actors
commit 7fdd780623493aa5a5d2bff4bbed06b15b731521
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Wed Sep 13 11:15:40 2023 +0200
base: Move evaluations to a separate actor.
* src/cuirass/base.scm (start-evaluation, jobset-evaluator)
(spawn-jobset-evaluator): New procedures.
(jobset-monitor): Add #:evaluator. Replace inline evaluation with a
message to EVALUATOR.
(spawn-jobset-monitor): Add #:evaluator and pass honor it.
(jobset-registry, spawn-jobset-registry): Likewise.
* src/cuirass/scripts/register.scm (cuirass-register): Call
‘spawn-jobset-evaluator’ and pass it to ‘spawn-jobset-registry’.
---
src/cuirass/base.scm | 142 ++++++++++++++++++++++++++-------------
src/cuirass/scripts/register.scm | 5 +-
2 files changed, 100 insertions(+), 47 deletions(-)
diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index 6da1fd2..932af4d 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -75,6 +75,7 @@
build-packages
prepare-git
spawn-channel-update-service
+ spawn-jobset-evaluator
spawn-jobset-registry
lookup-jobset
register-jobset
@@ -726,6 +727,78 @@ channels, and return its communication channel."
(spawn-fiber (channel-update-service channel))
channel))
+(define (start-evaluation spec instances timestamp)
+ "Start an evaluation of SPEC using the given channel INSTANCES. Return #f if
+nothing has changed (and thus no new evaluation was created), otherwise return
+the ID of the new evaluation."
+ (let* ((channels (map channel-instance-channel instances))
+ (new-spec (specification
+ (inherit spec)
+ ;; Include possible channel dependencies
+ (channels channels)))
+ (checkouttime (time-second (current-time time-utc)))
+ (eval-id (db-add-evaluation (specification-name spec) instances
+ #:timestamp timestamp
+ #:checkouttime checkouttime)))
+
+ (and eval-id
+ (guard (c ((evaluation-error? c)
+ (log-error "failed to evaluate spec '~a'; see ~a"
+ (evaluation-error-spec-name c)
+ (evaluation-log-file
+ (evaluation-error-id c)))
+ #f))
+ (log-info "evaluating spec '~a'" (specification-name spec))
+
+ ;; The LATEST-CHANNEL-INSTANCES procedure may return channel
+ ;; dependencies that are not declared in the initial specification
+ ;; channels. Update the given SPEC to take them into account.
+ (db-add-or-update-specification new-spec)
+ (evaluate spec eval-id)
+ (db-set-evaluation-time eval-id)
+
+ eval-id))))
+
+(define* (jobset-evaluator channel
+ #:key (max-parallel-evaluations
+ (current-processor-count)))
+ (define pool
+ (make-resource-pool (iota max-parallel-evaluations)))
+
+ (lambda ()
+ (log-info "will perform up to ~a evaluations concurrently"
+ max-parallel-evaluations)
+ (let loop ()
+ (match (get-message channel)
+ (`(evaluate ,spec ,instances ,timestamp)
+ ;; Take a token a perform the given evaluation.
+ (spawn-fiber
+ (lambda ()
+ (define eval-id
+ (with-resource-from-pool pool token
+ (log-info "evaluating '~a' with token #~a"
+ (specification-name spec) token)
+ (start-evaluation spec instances timestamp)))
+
+ (when eval-id
+ (log-info "new evaluation ~a of jobset '~a'"
+ eval-id (specification-name spec))
+ (with-store/non-blocking store
+ (build-packages store eval-id)))))
+ (loop))))))
+
+
+(define* (spawn-jobset-evaluator #:key (max-parallel-evaluations
+ (current-processor-count)))
+ "Spawn the actor responsible for evaluating jobsets for a given spec and set
+of channel instances. The actor performs at most MAX-PARALLEL-EVALUATIONS
+concurrently."
+ (let ((channel (make-channel)))
+ (spawn-fiber (jobset-evaluator channel
+ #:max-parallel-evaluations
+ max-parallel-evaluations))
+ channel))
+
(define %jobset-trigger-rate-window
;; Window (seconds) over which the jobset trigger rate is computed.
(* 5 60)) ;5 minutes
@@ -734,8 +807,9 @@ channels, and return its communication channel."
;; Maximum rate (triggers per seconds) at which jobsets may be triggered.
(/ 3 (* 2 60.))) ;3 times in 2 minutes
-(define* (jobset-monitor channel spec update-service
- #:key (polling-period 60))
+(define* (jobset-monitor channel spec
+ #:key (polling-period 60)
+ update-service evaluator)
(define period
(if (> (specification-period spec) 0)
(specification-period spec)
@@ -782,57 +856,28 @@ channels, and return its communication channel."
(instances
(log-info "fetched channels for '~a':~{ ~a~}"
name (map channel-name channels))
- (let* ((channels (map channel-instance-channel instances))
- (new-spec (specification
- (inherit spec)
- ;; Include possible channel dependencies
- (channels channels)))
- (checkouttime (time-second (current-time time-utc)))
- (eval-id (db-add-evaluation name instances
- #:timestamp timestamp
- #:checkouttime
checkouttime)))
-
- (when eval-id
- (spawn-fiber
- (lambda ()
- ;; TODO: Move this to an evaluation actor that limits
- ;; parallelism.
- (guard (c ((evaluation-error? c)
- (log-error "failed to evaluate spec '~a';
see ~a"
- (evaluation-error-spec-name c)
- (evaluation-log-file
- (evaluation-error-id c)))
- #f))
- (log-info "evaluating spec '~a'" name)
-
- ;; The LATEST-CHANNEL-INSTANCES procedure may return
channel
- ;; dependencies that are not declared in the initial
- ;; specification channels. Update the given SPEC to
take
- ;; them into account.
- (db-add-or-update-specification new-spec)
- (evaluate spec eval-id)
- (db-set-evaluation-time eval-id)
- (with-store/non-blocking store
- (build-packages store eval-id)))))
-
- ;; 'spawn-fiber' returns zero values but we need one.
- *unspecified*))))
+ (put-message evaluator
+ `(evaluate ,spec ,instances ,timestamp))))
(loop (cons timestamp (take-while recent? last-updates)))))))))
-(define* (spawn-jobset-monitor spec update-service
- #:key (polling-period 60))
+(define* (spawn-jobset-monitor spec
+ #:key (polling-period 60)
+ update-service evaluator)
"Spawn an actor responsible for monitoring the jobset corresponding to SPEC,
a <specification> record, and return it. The actor will send messages to
UPDATE-SERVICE anytime it needs Guix channels to be updated, at most every
POLLING-PERIOD seconds."
(let ((channel (make-channel)))
- (spawn-fiber (jobset-monitor channel spec update-service
+ (spawn-fiber (jobset-monitor channel spec
+ #:update-service update-service
+ #:evaluator evaluator
#:polling-period polling-period))
channel))
-(define* (jobset-registry channel update-service
- #:key (polling-period 60))
+(define* (jobset-registry channel
+ #:key (polling-period 60)
+ update-service evaluator)
(lambda ()
(spawn-fiber
(lambda ()
@@ -853,7 +898,10 @@ POLLING-PERIOD seconds."
(`(register ,spec)
(match (vhash-assq (specification-name spec) registry)
(#f
- (let ((monitor (spawn-jobset-monitor spec update-service
+ (let ((monitor (spawn-jobset-monitor spec
+ #:update-service
+ update-service
+ #:evaluator evaluator
#:polling-period
polling-period))
(name (specification-name spec)))
@@ -865,12 +913,14 @@ POLLING-PERIOD seconds."
(specification-name spec))
(loop registry))))))))
-(define* (spawn-jobset-registry update-service
- #:key (polling-period 60))
+(define* (spawn-jobset-registry #:key (polling-period 60)
+ update-service evaluator)
"Spawn a jobset registry. In turn, the registry creates a new jobset
monitoring actor for each 'register' message it receives."
(let ((channel (make-channel)))
- (spawn-fiber (jobset-registry channel update-service
+ (spawn-fiber (jobset-registry channel
+ #:update-service update-service
+ #:evaluator evaluator
#:polling-period polling-period))
channel))
diff --git a/src/cuirass/scripts/register.scm b/src/cuirass/scripts/register.scm
index 81755b1..70d54e2 100644
--- a/src/cuirass/scripts/register.scm
+++ b/src/cuirass/scripts/register.scm
@@ -205,6 +205,8 @@
(if one-shot?
(leave (G_ "'--one-shot' is currently unimplemented~%"))
(let ((exit-channel (make-channel))
+ (evaluator (spawn-jobset-evaluator
+ #:max-parallel-evaluations threads))
(update-service (spawn-channel-update-service)))
(clear-build-queue)
@@ -225,7 +227,8 @@
;; Spawn one monitoring actor for each jobset.
(let ((registry (spawn-jobset-registry
- update-service
+ #:update-service update-service
+ #:evaluator evaluator
#:polling-period interval)))
;; Spawn the bridge through which other 'cuirass'
;; processes, such as 'cuirass web', may talk to the
- [no subject], (continued)
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject],
Ludovic Courtès <=
- [no subject], Ludovic Courtès, 2023/09/13
- [no subject], Ludovic Courtès, 2023/09/13