[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[no subject]
From: |
Ludovic Courtès |
Date: |
Tue, 26 Sep 2023 06:26:31 -0400 (EDT) |
branch: master
commit e62012bae91df4d49c54a7ee20cd8eb69d3c05d1
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Tue Sep 26 11:16:46 2023 +0200
remote-server: Use a channel instead of a zmq inproc socket for downloads.
* src/cuirass/scripts/remote-server.scm (zmq-fetch-workers-endpoint)
(zmq-fetch-worker-socket, start-fetch-worker): Remove.
(fetch-worker, spawn-fetch-worker): New procedures.
(zmq-start-proxy): Add ‘fetch-worker’ parameter. Remove
‘fetch-socket’. Send message on ‘fetch-worker’ instead of
‘fetch-socket’.
(cuirass-remote-server): Call ‘spawn-fetch-worker’ only once. Pass the
result to ‘zmq-start-proxy’.
---
src/cuirass/scripts/remote-server.scm | 93 +++++++++++++++--------------------
1 file changed, 39 insertions(+), 54 deletions(-)
diff --git a/src/cuirass/scripts/remote-server.scm
b/src/cuirass/scripts/remote-server.scm
index e3a683d..f9589f4 100644
--- a/src/cuirass/scripts/remote-server.scm
+++ b/src/cuirass/scripts/remote-server.scm
@@ -61,12 +61,10 @@
#:use-module (srfi srfi-71)
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
- #:use-module (ice-9 q)
- #:use-module (ice-9 rdelim)
- #:use-module (ice-9 regex)
#:use-module ((ice-9 threads)
#:select (current-processor-count join-thread))
#:use-module (fibers)
+ #:use-module (fibers channels)
#:export (cuirass-remote-server))
;; Indicate if the process has to be stopped.
@@ -95,10 +93,6 @@
(define service-name
"Cuirass remote server")
-;; The number of fetch worker threads.
-(define %fetch-workers
- (make-parameter 8))
-
;; The number of queued fetch requests.
(define %fetch-queue-size
(make-atomic-box 0))
@@ -313,16 +307,6 @@ be used to reply to the worker."
;;; Fetch workers.
;;;
-(define (zmq-fetch-workers-endpoint)
- "inproc://fetch-workers")
-
-(define (zmq-fetch-worker-socket)
- "Return a socket used to communicate with the fetch workers."
- (let ((socket (zmq-create-socket %zmq-context ZMQ_PULL))
- (endpoint (zmq-fetch-workers-endpoint)))
- (zmq-connect socket endpoint)
- socket))
-
(define (url-fetch* url file)
(parameterize ((current-output-port (%make-void-port "w"))
(current-error-port (%make-void-port "w")))
@@ -420,18 +404,28 @@ directory."
(log-info "build failed: '~a'" drv)
(db-update-build-status! drv (build-status failed)))))
-(define (start-fetch-worker name)
- "Start a fetch worker fiber, which takes care of downloading build outputs.
-It communicates with the remote worker using a ZMQ socket."
- (spawn-fiber
- (lambda ()
- (use-modules (cuirass parameters)) ;XXX: Needed for mu-debug variable.
- (let ((socket (zmq-fetch-worker-socket)))
- (log-debug "starting fetch worker ~s" name)
- (let loop ()
- (run-fetch (receive-message socket))
- (atomic-box-fetch-and-dec! %fetch-queue-size)
- (loop))))))
+(define (fetch-worker channel max-parallel-downloads)
+ (lambda ()
+ (let ((pool (make-resource-pool (iota max-parallel-downloads))))
+ (log-info "starting fetch worker with up to ~a concurrent downloads"
+ max-parallel-downloads)
+ (let loop ()
+ (let ((message (get-message channel)))
+ (atomic-box-fetch-and-inc! %fetch-queue-size)
+ (spawn-fiber
+ (lambda ()
+ (with-resource-from-pool pool token
+ (log-debug "fetching with token #~a" token)
+ (run-fetch message)
+ (atomic-box-fetch-and-dec! %fetch-queue-size)))))
+ (loop)))))
+
+(define* (spawn-fetch-worker #:key (max-parallel-downloads 8))
+ "Spawn a fetch worker fiber, which takes care of downloading build outputs as
+requested received on its channel."
+ (let ((channel (make-channel)))
+ (spawn-fiber (fetch-worker channel max-parallel-downloads))
+ channel))
;;;
@@ -465,20 +459,17 @@ It communicates with the remote worker using a ZMQ
socket."
all network interfaces."
(string-append "tcp://*:" (number->string backend-port)))
-(define (zmq-start-proxy backend-port)
- "This procedure starts a proxy between client connections from the IPC
-frontend to the workers connected through the TCP backend."
- (let* ((build-socket
- (zmq-create-socket %zmq-context ZMQ_ROUTER))
- (fetch-socket
- (zmq-create-socket %zmq-context ZMQ_PUSH)))
+(define (zmq-start-proxy backend-port fetch-worker)
+ "Open a zmq socket on BACKEND-PORT and listen for messages coming from
+'cuirass remote-worker' messages. When a message denoting a successful build
+is received, pass it on to FETCH-WORKER to download the build's output(s)."
+ (let ((build-socket (zmq-create-socket %zmq-context ZMQ_ROUTER)))
;; Send bootstrap messages on worker connection to wake up the workers
;; that were hanging waiting for request-work responses.
(zmq-set-socket-option build-socket ZMQ_PROBE_ROUTER 1)
(zmq-bind-socket build-socket (zmq-backend-endpoint backend-port))
- (zmq-bind-socket fetch-socket (zmq-fetch-workers-endpoint))
(spawn-fiber
(lambda ()
@@ -497,9 +488,7 @@ frontend to the workers connected through the TCP backend."
(send-message build-socket message
#:recipient sender))))
(if (need-fetching? command)
- (begin
- (atomic-box-fetch-and-inc! %fetch-queue-size)
- (send-message fetch-socket command))
+ (put-message fetch-worker command)
(read-worker-exp command
#:peer-address sender-address
#:reply-worker reply-worker))
@@ -648,19 +637,15 @@ exiting."
(receive-logs log-port (%cache-directory))
(spawn-notification-fiber)
(spawn-periodic-updates-fiber)
- (for-each (lambda (number)
- (start-fetch-worker
- (string-append "fetch-worker-"
- (number->string number))))
- (iota (%fetch-workers)))
-
- (catch 'zmq-error
- (lambda ()
- (zmq-start-proxy backend-port))
- (lambda (key errno message . _)
- (log-error (G_ "failed to start worker/database proxy: ~a")
- message)
- (terminate-helper-processes)
- (primitive-exit 1)))))
+
+ (let ((fetch-worker (spawn-fetch-worker)))
+ (catch 'zmq-error
+ (lambda ()
+ (zmq-start-proxy backend-port fetch-worker))
+ (lambda (key errno message . _)
+ (log-error (G_ "failed to start worker/database proxy: ~a")
+ message)
+ (terminate-helper-processes)
+ (primitive-exit 1))))))
#:hz 0
#:parallelism (min 8 (current-processor-count)))))))
- master updated (b97ea9d -> 317d8fc), Ludovic Courtès, 2023/09/26
- [no subject], Ludovic Courtès, 2023/09/26
- [no subject], Ludovic Courtès, 2023/09/26
- [no subject], Ludovic Courtès, 2023/09/26
- [no subject],
Ludovic Courtès <=
- [no subject], Ludovic Courtès, 2023/09/26
- [no subject], Ludovic Courtès, 2023/09/26
- [no subject], Ludovic Courtès, 2023/09/26
- [no subject], Ludovic Courtès, 2023/09/26