guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[no subject]


From: Ludovic Courtès
Date: Tue, 26 Sep 2023 06:26:31 -0400 (EDT)

branch: master
commit e62012bae91df4d49c54a7ee20cd8eb69d3c05d1
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Tue Sep 26 11:16:46 2023 +0200

    remote-server: Use a channel instead of a zmq inproc socket for downloads.
    
    * src/cuirass/scripts/remote-server.scm (zmq-fetch-workers-endpoint)
    (zmq-fetch-worker-socket, start-fetch-worker): Remove.
    (fetch-worker, spawn-fetch-worker): New procedures.
    (zmq-start-proxy): Add ‘fetch-worker’ parameter.  Remove
    ‘fetch-socket’.  Send message on ‘fetch-worker’ instead of
    ‘fetch-socket’.
    (cuirass-remote-server): Call ‘spawn-fetch-worker’ only once.  Pass the
    result to ‘zmq-start-proxy’.
---
 src/cuirass/scripts/remote-server.scm | 93 +++++++++++++++--------------------
 1 file changed, 39 insertions(+), 54 deletions(-)

diff --git a/src/cuirass/scripts/remote-server.scm 
b/src/cuirass/scripts/remote-server.scm
index e3a683d..f9589f4 100644
--- a/src/cuirass/scripts/remote-server.scm
+++ b/src/cuirass/scripts/remote-server.scm
@@ -61,12 +61,10 @@
   #:use-module (srfi srfi-71)
   #:use-module (ice-9 atomic)
   #:use-module (ice-9 match)
-  #:use-module (ice-9 q)
-  #:use-module (ice-9 rdelim)
-  #:use-module (ice-9 regex)
   #:use-module ((ice-9 threads)
                 #:select (current-processor-count join-thread))
   #:use-module (fibers)
+  #:use-module (fibers channels)
   #:export (cuirass-remote-server))
 
 ;; Indicate if the process has to be stopped.
@@ -95,10 +93,6 @@
 (define service-name
   "Cuirass remote server")
 
-;; The number of fetch worker threads.
-(define %fetch-workers
-  (make-parameter 8))
-
 ;; The number of queued fetch requests.
 (define %fetch-queue-size
   (make-atomic-box 0))
@@ -313,16 +307,6 @@ be used to reply to the worker."
 ;;; Fetch workers.
 ;;;
 
-(define (zmq-fetch-workers-endpoint)
-  "inproc://fetch-workers")
-
-(define (zmq-fetch-worker-socket)
-  "Return a socket used to communicate with the fetch workers."
-  (let ((socket (zmq-create-socket %zmq-context ZMQ_PULL))
-        (endpoint (zmq-fetch-workers-endpoint)))
-    (zmq-connect socket endpoint)
-    socket))
-
 (define (url-fetch* url file)
   (parameterize ((current-output-port (%make-void-port "w"))
                  (current-error-port (%make-void-port "w")))
@@ -420,18 +404,28 @@ directory."
      (log-info "build failed: '~a'" drv)
      (db-update-build-status! drv (build-status failed)))))
 
-(define (start-fetch-worker name)
-  "Start a fetch worker fiber, which takes care of downloading build outputs.
-It communicates with the remote worker using a ZMQ socket."
-  (spawn-fiber
-   (lambda ()
-     (use-modules (cuirass parameters)) ;XXX: Needed for mu-debug variable.
-     (let ((socket (zmq-fetch-worker-socket)))
-       (log-debug "starting fetch worker ~s" name)
-       (let loop ()
-         (run-fetch (receive-message socket))
-         (atomic-box-fetch-and-dec! %fetch-queue-size)
-         (loop))))))
+(define (fetch-worker channel max-parallel-downloads)
+  (lambda ()
+    (let ((pool (make-resource-pool (iota max-parallel-downloads))))
+      (log-info "starting fetch worker with up to ~a concurrent downloads"
+                max-parallel-downloads)
+      (let loop ()
+        (let ((message (get-message channel)))
+          (atomic-box-fetch-and-inc! %fetch-queue-size)
+          (spawn-fiber
+           (lambda ()
+             (with-resource-from-pool pool token
+               (log-debug "fetching with token #~a" token)
+               (run-fetch message)
+               (atomic-box-fetch-and-dec! %fetch-queue-size)))))
+        (loop)))))
+
+(define* (spawn-fetch-worker #:key (max-parallel-downloads 8))
+  "Spawn a fetch worker fiber, which takes care of downloading build outputs as
+requested received on its channel."
+  (let ((channel (make-channel)))
+    (spawn-fiber (fetch-worker channel max-parallel-downloads))
+    channel))
 
 
 ;;;
@@ -465,20 +459,17 @@ It communicates with the remote worker using a ZMQ 
socket."
 all network interfaces."
   (string-append "tcp://*:" (number->string backend-port)))
 
-(define (zmq-start-proxy backend-port)
-  "This procedure starts a proxy between client connections from the IPC
-frontend to the workers connected through the TCP backend."
-  (let* ((build-socket
-          (zmq-create-socket %zmq-context ZMQ_ROUTER))
-         (fetch-socket
-          (zmq-create-socket %zmq-context ZMQ_PUSH)))
+(define (zmq-start-proxy backend-port fetch-worker)
+  "Open a zmq socket on BACKEND-PORT and listen for messages coming from
+'cuirass remote-worker' messages.  When a message denoting a successful build
+is received, pass it on to FETCH-WORKER to download the build's output(s)."
+  (let ((build-socket (zmq-create-socket %zmq-context ZMQ_ROUTER)))
 
     ;; Send bootstrap messages on worker connection to wake up the workers
     ;; that were hanging waiting for request-work responses.
     (zmq-set-socket-option build-socket ZMQ_PROBE_ROUTER 1)
 
     (zmq-bind-socket build-socket (zmq-backend-endpoint backend-port))
-    (zmq-bind-socket fetch-socket (zmq-fetch-workers-endpoint))
 
     (spawn-fiber
      (lambda ()
@@ -497,9 +488,7 @@ frontend to the workers connected through the TCP backend."
                              (send-message build-socket message
                                            #:recipient sender))))
         (if (need-fetching? command)
-            (begin
-              (atomic-box-fetch-and-inc! %fetch-queue-size)
-              (send-message fetch-socket command))
+            (put-message fetch-worker command)
             (read-worker-exp command
                              #:peer-address sender-address
                              #:reply-worker reply-worker))
@@ -648,19 +637,15 @@ exiting."
              (receive-logs log-port (%cache-directory))
              (spawn-notification-fiber)
              (spawn-periodic-updates-fiber)
-             (for-each (lambda (number)
-                         (start-fetch-worker
-                          (string-append "fetch-worker-"
-                                         (number->string number))))
-                       (iota (%fetch-workers)))
-
-             (catch 'zmq-error
-               (lambda ()
-                 (zmq-start-proxy backend-port))
-               (lambda (key errno message . _)
-                 (log-error (G_ "failed to start worker/database proxy: ~a")
-                            message)
-                 (terminate-helper-processes)
-                 (primitive-exit 1)))))
+
+             (let ((fetch-worker (spawn-fetch-worker)))
+               (catch 'zmq-error
+                 (lambda ()
+                   (zmq-start-proxy backend-port fetch-worker))
+                 (lambda (key errno message . _)
+                   (log-error (G_ "failed to start worker/database proxy: ~a")
+                              message)
+                   (terminate-helper-processes)
+                   (primitive-exit 1))))))
          #:hz 0
          #:parallelism (min 8 (current-processor-count)))))))



reply via email to

[Prev in Thread] Current Thread [Next in Thread]