guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[no subject]


From: Mathieu Othacehe
Date: Wed, 14 Oct 2020 08:23:22 -0400 (EDT)

branch: master
commit b67f38a7b91c8605a3ae9eba1e2bd3da4b579622
Author: Mathieu Othacehe <othacehe@gnu.org>
AuthorDate: Wed Oct 14 13:49:03 2020 +0200

    Queue write operations.
    
    SQLite only allows one concurrent write query operation. Having multiple
    database workers calling "db-update-build-status!", will thus increase 
worker
    starvation. Every write operation will also be done is a single transaction.
    
    For those reasons, create a database worker dedicated to write queries. Have
    this worker queue work and issue all the queued work queries in a single
    transaction.
    
    * .dir-locals.el: Add with-db-writer-worker-thread.
    * src/cuirass/database.scm (with-queue-writer-worker): Rename
    "with-registration-workers" macro.
    (%db-writer-channel): Rename "%db-registration-channel" variable.
    (with-queue-writer-worker): Rename "with-registration-workers".
    (db-register-builds): Use "with-db-writer-worker-thread" instead of
    "with-db-registration-worker-thread".
    (db-update-build-status!): Ditto
    * src/cuirass/utils.scm (make-worker-thread-channel): Add "queue-size" and
    "queue-proc" arguments.
    (call-with-worker-thread): Add "options" argument.
    * bin/cuirass.in (main): Use "with-queue-writer-worker" instead of
    "with-registration-workers". Modify the macro scope to include all the
    possible write operations.
---
 .dir-locals.el           |  3 +-
 bin/cuirass.in           | 92 +++++++++++++++++++++++-------------------------
 src/cuirass/database.scm | 58 ++++++++++++++++++------------
 src/cuirass/utils.scm    | 61 +++++++++++++++++++++++++-------
 4 files changed, 131 insertions(+), 83 deletions(-)

diff --git a/.dir-locals.el b/.dir-locals.el
index a62798b..0e5705d 100644
--- a/.dir-locals.el
+++ b/.dir-locals.el
@@ -13,7 +13,8 @@
   (eval put 'test-error 'scheme-indent-function 1)
   (eval put 'make-parameter 'scheme-indent-function 1)
   (eval put 'with-database 'scheme-indent-function 0)
-  (eval put 'with-db-worker-thread 'scheme-indent-function 1))
+  (eval put 'with-db-worker-thread 'scheme-indent-function 1)
+  (eval put 'with-db-writer-worker-thread 'scheme-indent-function 1))
  (texinfo-mode
   (indent-tabs-mode)
   (fill-column . 72)
diff --git a/bin/cuirass.in b/bin/cuirass.in
index 8da9369..23d8c68 100644
--- a/bin/cuirass.in
+++ b/bin/cuirass.in
@@ -171,53 +171,51 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" 
"$@"
                                  (while #t
                                    (log-monitoring-stats)
                                    (sleep 600))))))
-                           (begin
-
-                             (clear-build-queue)
-
-                             ;; If Cuirass was stopped during an evaluation,
-                             ;; abort it. Builds that were not registered
-                             ;; during this evaluation will be registered
-                             ;; during the next evaluation.
-                             (db-abort-pending-evaluations)
-
-                             ;; First off, restart builds that had not
-                             ;; completed or were not even started on a
-                             ;; previous run.
-                             (spawn-fiber
-                              (essential-task
-                               'restart-builds exit-channel
-                               (lambda ()
-                                 (restart-builds))))
-
-                             (spawn-fiber
-                              (essential-task
-                               'build exit-channel
-                               (lambda ()
-                                 (with-registration-workers
-                                  (while #t
-                                    (process-specs (db-get-specifications))
-                                    (log-message
-                                     "next evaluation in ~a seconds" interval)
-                                    (sleep interval))))))
-
-                             (spawn-fiber
-                              (essential-task
-                               'metrics exit-channel
-                               (lambda ()
-                                 (while #t
-                                   (with-time-logging
-                                    "Metrics update"
-                                    (db-update-metrics))
-                                   (sleep 3600)))))
-
-                             (spawn-fiber
-                              (essential-task
-                               'monitor exit-channel
-                               (lambda ()
-                                 (while #t
-                                   (log-monitoring-stats)
-                                   (sleep 600)))))))
+                           (with-queue-writer-worker
+                            (clear-build-queue)
+
+                            ;; If Cuirass was stopped during an evaluation,
+                            ;; abort it. Builds that were not registered
+                            ;; during this evaluation will be registered
+                            ;; during the next evaluation.
+                            (db-abort-pending-evaluations)
+
+                            ;; First off, restart builds that had not
+                            ;; completed or were not even started on a
+                            ;; previous run.
+                            (spawn-fiber
+                             (essential-task
+                              'restart-builds exit-channel
+                              (lambda ()
+                                (restart-builds))))
+
+                            (spawn-fiber
+                             (essential-task
+                              'build exit-channel
+                              (lambda ()
+                                (while #t
+                                  (process-specs (db-get-specifications))
+                                  (log-message
+                                   "next evaluation in ~a seconds" interval)
+                                  (sleep interval)))))
+
+                            (spawn-fiber
+                             (essential-task
+                              'metrics exit-channel
+                              (lambda ()
+                                (while #t
+                                  (with-time-logging
+                                   "Metrics update"
+                                   (db-update-metrics))
+                                  (sleep 3600)))))
+
+                            (spawn-fiber
+                             (essential-task
+                              'monitor exit-channel
+                              (lambda ()
+                                (while #t
+                                  (log-monitoring-stats)
+                                  (sleep 600)))))))
                        (primitive-exit (get-message exit-channel))))))
 
            ;; Most of our code is I/O so preemption doesn't matter much (it
diff --git a/src/cuirass/database.scm b/src/cuirass/database.scm
index 43d24a9..9c5317e 100644
--- a/src/cuirass/database.scm
+++ b/src/cuirass/database.scm
@@ -93,7 +93,7 @@
             ;; Macros.
             with-db-worker-thread
             with-database
-            with-registration-workers))
+            with-queue-writer-worker))
 
 (define (%sqlite-exec db sql . args)
   "Evaluate the given SQL query with the given ARGS.  Return the list of
@@ -188,7 +188,7 @@ specified."
 (define %db-channel
   (make-parameter #f))
 
-(define %db-registration-channel
+(define %db-writer-channel
   (make-parameter #f))
 
 (define %record-events?
@@ -219,14 +219,17 @@ connection."
                 (number->string receive-timeout)
                 caller-name))))))
 
-(define-syntax-rule (with-db-registration-worker-thread db exp ...)
-  "Similar to WITH-DB-WORKER-THREAD but evaluates EXP in database workers
-dedicated to evaluation registration.  It is expected those workers to be busy
-for long durations as registration involves running a large number of SQL
-queries.  For this reason, do not setup a timeout here."
-  (call-with-worker-thread
-   (%db-registration-channel)
-   (lambda (db) exp ...)))
+(define-syntax with-db-writer-worker-thread
+  (syntax-rules ()
+    "Similar to WITH-DB-WORKER-THREAD but evaluates EXP in a database worker
+dedicated to writing.  EXP evaluation is queued unless #:force? is set."
+    ((_ db #:force? force exp ...)
+     (call-with-worker-thread
+      (%db-writer-channel)
+      (lambda (db) exp ...)
+      #:options `((#:force? . ,force))))
+    ((_ db exp ...)
+     (with-db-writer-worker-thread db #:force? #f exp ...))))
 
 (define (read-sql-file file-name)
   "Return a list of string containing SQL instructions from FILE-NAME."
@@ -553,16 +556,26 @@ fibers."
                    (min (current-processor-count) 4))))
     body ...))
 
-(define-syntax-rule (with-registration-workers body ...)
-  "Run BODY with %DB-REGISTRATION-CHANNEL being dynamically bound to a channel
-providing worker threads that allow registration database operations to run
-without interfering with fibers."
-  (parameterize ((%db-registration-channel
+(define-syntax-rule (with-queue-writer-worker body ...)
+  "Run BODY with %DB-WRITER-CHANNEL being dynamically bound to a channel
+providing a worker thread that allow database write operations to run
+without interfering with fibers.
+
+The worker will queue write operations and run them in a single transaction
+when the queue is full. As write operations are exclusive in SQLite, do not
+allocate more than one worker."
+  (parameterize ((%db-writer-channel
                   (make-worker-thread-channel
                    (lambda ()
                      (list (db-open)))
-                   #:parallelism
-                   (min (current-processor-count) 4))))
+                   #:parallelism 1
+                   #:queue-size 100
+                   #:queue-proc
+                   (lambda (db run-queue)
+                     (log-message "Running writer queue.")
+                     (sqlite-exec db "BEGIN TRANSACTION;")
+                     (run-queue)
+                     (sqlite-exec db "COMMIT;")))))
     body ...))
 
 (define* (read-quoted-string #:optional (port (current-input-port)))
@@ -693,10 +706,11 @@ path) VALUES ("
                           (#:stoptime . 0))))
              (db-add-build build)))))
 
-  ;; New builds registration involves running a large number of SQL queries.
-  ;; To keep database workers available, use specific database workers
-  ;; dedicated to evaluation registration.
-  (with-db-registration-worker-thread db
+  ;; Use the database worker dedicated to write queries.  We don't want this
+  ;; query to be queued as it is already a quite large transaction by itself,
+  ;; so pass the #:FORCE? option.
+  (with-db-writer-worker-thread db
+    #:force? #t
     (log-message "Registering builds for evaluation ~a." eval-id)
     (sqlite-exec db "BEGIN TRANSACTION;")
     (let ((derivations (filter-map register jobs)))
@@ -717,7 +731,7 @@ log file for DRV."
       (,(build-status failed-other)      . "failed (other)")
       (,(build-status canceled)          . "canceled")))
 
-  (with-db-worker-thread db
+  (with-db-writer-worker-thread db
     (if (= status (build-status started))
         (begin
           (sqlite-exec db "UPDATE Builds SET starttime=" now ", status="
diff --git a/src/cuirass/utils.scm b/src/cuirass/utils.scm
index 8eb0ed2..f32e3a1 100644
--- a/src/cuirass/utils.scm
+++ b/src/cuirass/utils.scm
@@ -106,9 +106,19 @@ delimited continuations and fibers."
   (make-parameter #f))
 
 (define* (make-worker-thread-channel initializer
-                                     #:key (parallelism 1))
+                                     #:key
+                                     (parallelism 1)
+                                     queue-size
+                                     (queue-proc (const #t)))
   "Return a channel used to offload work to a dedicated thread.  ARGS are the
-arguments of the worker thread procedure."
+arguments of the worker thread procedure.  This procedure supports deferring
+work sent to the worker.  If QUEUE-SIZE is set, each work query will be
+appended to a queue that will be run once it reaches QUEUE-SIZE elements.
+
+When that happens, the QUEUE-PROC procedure is called with %WORKER-THREAD-ARGS
+and a procedure running the queued work as arguments.  The worker thread can
+be passed options.  When #:FORCE? option is set, the worker runs the sent work
+immediately even if QUEUE-SIZE has been set."
   (parameterize (((@@ (fibers internal) current-fiber) #f))
     (let ((channel (make-channel)))
       (for-each
@@ -117,16 +127,37 @@ arguments of the worker thread procedure."
            (call-with-new-thread
             (lambda ()
               (parameterize ((%worker-thread-args args))
-                (let loop ()
+                (let loop ((queue '()))
                   (match (get-message channel)
-                    (((? channel? reply) . (? procedure? proc))
-                     (put-message reply
-                                  (catch #t
-                                    (lambda ()
-                                      (apply proc args))
-                                    (lambda (key . args)
-                                      (cons* 'worker-thread-error key 
args))))))
-                  (loop)))))))
+                    (((? channel? reply) options (? procedure? proc))
+                     (put-message
+                      reply
+                      (catch #t
+                        (lambda ()
+                          (cond
+                           ((or (not queue-size)
+                                (assq-ref options #:force?))
+                            (apply proc args))
+                           (else
+                            (length queue))))
+                        (lambda (key . args)
+                          (cons* 'worker-thread-error key args))))
+                     (let ((new-queue
+                         (cond
+                          ((or (not queue-size)
+                               (assq-ref options #:force?))
+                           '())
+                          ((= (1+ (length queue)) queue-size)
+                           (let ((run-queue
+                                  (lambda ()
+                                    (for-each (lambda (thunk)
+                                                (apply thunk args))
+                                              (append queue (list proc))))))
+                             (apply queue-proc (append args (list run-queue)))
+                             '()))
+                           (else
+                            (append queue (list proc))))))
+                    (loop new-queue))))))))))
        (iota parallelism))
       channel)))
 
@@ -194,6 +225,7 @@ put-operation until it succeeds."
 
 (define* (call-with-worker-thread channel proc
                                   #:key
+                                  options
                                   send-timeout
                                   send-timeout-proc
                                   receive-timeout
@@ -207,12 +239,15 @@ to a worker thread.
 
 The same goes for RECEIVE-TIMEOUT and RECEIVE-TIMEOUT-PROC, except that the
 timer expires if there is no response from the database worker PROC was sent
-to."
+to.
+
+OPTIONS are forwarded to the worker thread.  See MAKE-WORKER-THREAD-CHANNEL
+for a description of the supported options."
   (let ((args (%worker-thread-args)))
     (if args
         (apply proc args)
         (let* ((reply (make-channel))
-               (message (cons reply proc)))
+               (message (list reply options proc)))
           (if (and send-timeout (current-fiber))
               (put-message-with-timeout channel message
                                         #:seconds send-timeout



reply via email to

[Prev in Thread] Current Thread [Next in Thread]