[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[no subject]
From: |
Ludovic Courtès |
Date: |
Fri, 1 Sep 2023 17:43:55 -0400 (EDT) |
branch: wip-actors
commit c3120f4ab153843994d7c40806b3594ffbd80f41
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Fri Sep 1 19:26:08 2023 +0200
base: Split jobset evaluation into several actors.
* src/cuirass/base.scm (process-specs): Remove.
(channel-update-service, spawn-channel-update-service)
(jobset-monitor, spawn-jobset-monitor)
(jobset-registry, spawn-jobset-registry)
(lookup-jobset, register-jobset): New procedures.
* src/cuirass/http.scm (url-handler): Add FIXMEs.
* src/cuirass/scripts/register.scm (cuirass-register): Remove 'build'
fiber and call 'spawn-jobset-registry' instead. Error out on
'one-shot?'.
---
src/cuirass/base.scm | 274 +++++++++++++++++++++++++++------------
src/cuirass/http.scm | 6 +
src/cuirass/scripts/register.scm | 17 +--
3 files changed, 202 insertions(+), 95 deletions(-)
diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index 075c49d..5adbe5e 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -52,6 +52,7 @@
#:use-module (ice-9 atomic)
#:use-module (ice-9 ftw)
#:use-module (ice-9 threads)
+ #:use-module (ice-9 vlist)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-11)
#:use-module (srfi srfi-19)
@@ -73,7 +74,10 @@
restart-builds
build-packages
prepare-git
- process-specs
+ spawn-channel-update-service
+ spawn-jobset-registry
+ lookup-jobset
+ register-jobset
evaluation-log-file
latest-checkouts
@@ -652,87 +656,189 @@ specification."
(define exception-with-kind-and-args?
(exception-predicate &exception-with-kind-and-args))
-(define (process-specs jobspecs)
- "Evaluate and build JOBSPECS and store results in the database."
- (define (new-eval? spec)
- (let ((name (specification-name spec))
- (period (specification-period spec)))
- (or (= period 0)
- (let ((time
- (db-get-time-since-previous-eval name)))
- (cond
- ((not time) #t)
- ((> time period) #t)
- (else #f))))))
-
- (define (process spec)
+(define (channel-update-service channel)
+ "Return a thunk (an actor) that reads messages on CHANNEL and is responsible
+to update Git checkouts, effectively serializing all Git operations."
+ ;; Note: All Git operations are serialized when in fact it would be enough
+ ;; to serialize operations with the same URL (because they are cached in the
+ ;; same directory).
+ (define (fetch store channels)
+ (let/ec return
+ (with-exception-handler
+ (lambda (exception)
+ (if (exception-with-kind-and-args? exception)
+ (match (exception-kind exception)
+ ('git-error
+ (log-error "Git error while fetching channels from~{ ~a~}:
~a"
+ (map channel-url channels)
+ (git-error-message
+ (first (exception-args exception)))))
+ ('system-error
+ (log-error "while processing '~a': ~s"
+ (strerror
+ (system-error-errno
+ (cons 'system-error
+ (exception-args exception))))))
+ (kind
+ (log-error "uncaught '~a' exception: ~s"
+ kind (exception-args exception))))
+ (log-error "uncaught exception: ~s" exception))
+ (return #f))
+ (lambda ()
+ (non-blocking
+ (latest-channel-instances* store channels))))))
+
+ (lambda ()
(with-store store
- (let* ((name (specification-name spec))
- (timestamp (time-second (current-time time-utc)))
- (channels (specification-channels spec))
- (instances (non-blocking
- (log-info "fetching channels for spec '~a'" name)
- (latest-channel-instances* store channels)))
- (new-channels (let ((channels (map channel-instance-channel
- instances)))
- (log-info "fetched channels for '~a':~{ ~a~}"
- name (map channel-name channels))
- channels))
- (new-spec (specification
- (inherit spec)
- (channels new-channels))) ;include possible channel
- ;dependencies.
- (checkouttime (time-second (current-time time-utc)))
- (eval-id (db-add-evaluation name instances
- #:timestamp timestamp
- #:checkouttime checkouttime)))
- (when eval-id
- (spawn-fiber
- (lambda ()
- (guard (c ((evaluation-error? c)
- (log-error "failed to evaluate spec '~a'; see ~a"
- (evaluation-error-spec-name c)
- (evaluation-log-file
- (evaluation-error-id c)))
- #f))
- (log-info "evaluating spec '~a'" name)
-
- ;; The LATEST-CHANNEL-INSTANCES procedure may return channel
- ;; dependencies that are not declared in the initial
- ;; specification channels. Update the given SPEC to take
- ;; them into account.
- (db-add-or-update-specification new-spec)
- (evaluate spec eval-id)
- (db-set-evaluation-time eval-id)
- (build-packages store eval-id))))
-
- ;; 'spawn-fiber' returns zero values but we need one.
- *unspecified*))))
-
- (for-each (lambda (spec)
- ;; Catch Git errors, which might be transient, and keep going.
- (let/ec return
- (with-exception-handler
- (lambda (exception)
- (if (exception-with-kind-and-args? exception)
- (match (exception-kind exception)
- ('git-error
- (log-error "Git error while fetching inputs of
'~a': ~a"
- (specification-name spec)
- (git-error-message
- (first (exception-args exception)))))
- ('system-error
- (log-error "while processing '~a': ~s"
- (strerror
- (system-error-errno
- (cons 'system-error
- (exception-args exception))))))
- (kind
- (log-error "uncaught '~a' exception: ~s"
- kind (exception-args exception))))
- (log-error "uncaught exception: ~s" exception))
- (return #f))
- (lambda ()
- (and (new-eval? spec)
- (process spec))))))
- jobspecs))
+ (let loop ()
+ (match (get-message channel)
+ (`(fetch ,channels ,reply)
+ (log-info "fetching channels:~{ '~a'~}"
+ (map channel-name channels))
+ (let ((result (fetch store channels)))
+ (if result
+ (log-info "pulled commits~{ ~a~}"
+ (zip (map (compose channel-name
+ channel-instance-channel)
+ result)
+ (map channel-instance-commit result)))
+ (log-info "failed to fetch channels~{ '~a'~}"
+ (map channel-name channels)))
+ (put-message reply result))
+ (loop)))))))
+
+(define (spawn-channel-update-service)
+ "Spawn an actor responsible for fetching the latest revisions of a set of
Guix
+channels, and return its communication channel."
+ (let ((channel (make-channel)))
+ (spawn-fiber (channel-update-service channel))
+ channel))
+
+(define* (jobset-monitor channel ;currently unused
+ spec update-service
+ #:key (polling-period 60))
+ (define period
+ (if (> (specification-period spec) 0)
+ (specification-period spec)
+ polling-period))
+
+ (define name (specification-name spec))
+ (define channels (specification-channels spec))
+
+ (lambda ()
+ (log-info "starting monitor for spec '~a'" name)
+ (let loop ()
+ (let ((timestamp (time-second (current-time time-utc))))
+ (match (let ((reply (make-channel)))
+ (log-info "fetching channels for spec '~a'" name)
+ (put-message update-service
+ `(fetch ,channels ,reply))
+ (get-message reply))
+ (#f
+ (log-warning "failed to fetch channels for '~a'" name))
+ (instances
+ (log-info "fetched channels for '~a':~{ ~a~}"
+ name (map channel-name channels))
+ (let* ((channels (map channel-instance-channel instances))
+ (new-spec (specification
+ (inherit spec)
+ ;; Include possible channel dependencies
+ (channels channels)))
+ (checkouttime (time-second (current-time time-utc)))
+ (eval-id (db-add-evaluation name instances
+ #:timestamp timestamp
+ #:checkouttime checkouttime)))
+
+ (when eval-id
+ (spawn-fiber
+ (lambda ()
+ ;; TODO: Move this to an evaluation actor that limits
+ ;; parallelism.
+ (guard (c ((evaluation-error? c)
+ (log-error "failed to evaluate spec '~a'; see ~a"
+ (evaluation-error-spec-name c)
+ (evaluation-log-file
+ (evaluation-error-id c)))
+ #f))
+ (log-info "evaluating spec '~a'" name)
+
+ ;; The LATEST-CHANNEL-INSTANCES procedure may return
channel
+ ;; dependencies that are not declared in the initial
+ ;; specification channels. Update the given SPEC to take
+ ;; them into account.
+ (db-add-or-update-specification new-spec)
+ (evaluate spec eval-id)
+ (db-set-evaluation-time eval-id)
+ (with-store/non-blocking store
+ (build-packages store eval-id)))))
+
+ ;; 'spawn-fiber' returns zero values but we need one.
+ *unspecified*))))
+
+ (log-info "polling '~a' channels in ~a seconds" name period)
+ (sleep period)
+ (loop)))))
+
+(define* (spawn-jobset-monitor spec update-service
+ #:key (polling-period 60))
+ "Spawn an actor responsible for monitoring the jobset corresponding to SPEC,
+a <specification> record, and return it. The actor will send messages to
+UPDATE-SERVICE anytime it needs Guix channels to be updated, at most every
+POLLING-PERIOD seconds."
+ (let ((channel (make-channel)))
+ (spawn-fiber (jobset-monitor channel spec update-service
+ #:polling-period polling-period))
+ channel))
+
+(define* (jobset-registry channel update-service
+ #:key (polling-period 60))
+ (lambda ()
+ (spawn-fiber
+ (lambda ()
+ (let ((specs (db-get-specifications)))
+ (log-info "registering ~a jobsets" (length specs))
+ (for-each (lambda (spec)
+ (register-jobset channel spec))
+ specs))))
+
+ (let loop ((registry vlist-null))
+ (match (get-message channel)
+ (`(lookup ,jobset ,reply)
+ (put-message reply
+ (match (vhash-assq jobset registry)
+ (#f #f)
+ ((_ . actor) actor)))
+ (loop registry))
+ (`(register ,spec)
+ (match (vhash-assq (specification-name spec) registry)
+ (#f
+ (let ((monitor (spawn-jobset-monitor spec update-service
+ #:polling-period
+ polling-period))
+ (name (specification-name spec)))
+ (log-info "registering new jobset '~a'" name)
+ (loop (vhash-consq name monitor registry))))
+ ((_ . monitor)
+ (log-info "jobset '~a' was already registered"
+ (specification-name spec))
+ (loop registry))))))))
+
+(define* (spawn-jobset-registry update-service
+ #:key (polling-period 60))
+ "Spawn a jobset registry. In turn, the registry creates a new jobset
+monitoring actor for each 'register' message it receives."
+ (let ((channel (make-channel)))
+ (spawn-fiber (jobset-registry channel update-service
+ #:polling-period polling-period))
+ channel))
+
+(define* (lookup-jobset registry jobset)
+ "Return the monitor of JOBSET, a specification name (symbol)."
+ (let ((reply (make-channel)))
+ (put-message registry `(lookup ,jobset ,reply))
+ (get-message reply)))
+
+(define (register-jobset registry spec)
+ "Register a new jobset of SPEC. REGISTRY is the channel returned by
+'spawn-jobset-registry'."
+ (put-message registry `(register ,spec)))
diff --git a/src/cuirass/http.scm b/src/cuirass/http.scm
index 2350be2..92d187e 100644
--- a/src/cuirass/http.scm
+++ b/src/cuirass/http.scm
@@ -648,6 +648,10 @@ passed, only display JOBS targeting this SYSTEM."
#:code 400)
(begin
(db-add-or-update-specification spec)
+ ;; FIXME: Notify the jobset registry in the 'cuirass register'
+ ;; process.
+ ;;
+ ;; (register-jobset jobset-registry spec)
(respond
(build-response #:code 302
#:headers
@@ -663,6 +667,8 @@ passed, only display JOBS targeting this SYSTEM."
;; XXX: It is not possible yet to edit build outputs and notifications
;; using the web interface. Use the outputs and notifications from the
;; existing specification.
+
+ ;; FIXME: Notify the jobset registry in the 'cuirass register' process.
(db-add-or-update-specification
(specification
(inherit spec)
diff --git a/src/cuirass/scripts/register.scm b/src/cuirass/scripts/register.scm
index 09488a1..0373e5f 100644
--- a/src/cuirass/scripts/register.scm
+++ b/src/cuirass/scripts/register.scm
@@ -126,8 +126,9 @@
(and paramfile (read-parameters paramfile))
(if one-shot?
- (process-specs (db-get-specifications))
- (let ((exit-channel (make-channel)))
+ (leave (G_ "'--one-shot' is currently unimplemented~%"))
+ (let ((exit-channel (make-channel))
+ (update-service (spawn-channel-update-service)))
(clear-build-queue)
;; If Cuirass was stopped during an evaluation,
@@ -145,15 +146,9 @@
(lambda ()
(restart-builds))))
- (spawn-fiber
- (essential-task
- 'build exit-channel
- (lambda ()
- (while #t
- (process-specs (db-get-specifications))
- (log-info
- "next evaluation in ~a seconds" interval)
- (sleep interval)))))
+ ;; Spawn one monitoring actor for each jobset.
+ (spawn-jobset-registry update-service
+ #:polling-period interval)
(spawn-fiber
(essential-task