(define-module (fibers-pool)) (use-modules ;; FIFO queue, not functional, using mutation ;; https://www.gnu.org/software/guile/manual/html_node/Queues.html (ice-9 q) (ice-9 match) (ice-9 threads) (rnrs exceptions) (rnrs conditions) ;; fibers internals are needed for creating schedulers without running anything ;; in them immediately (fibers) (fibers channels) (fibers internal)) (define displayln (lambda (msg) (display msg) (newline))) (define work-distributor (lambda (channel-receive) ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor started") ;; (displayln "[WORK-DISTRIBUTOR]: starting work-distributor message loop") (let loop ([work-queue (make-q)] [worker-channels-queue (make-q)]) (displayln "[WORK-DISTRIBUTOR]: work-distributor is listening for messages") (display "[WORK-DISTRIBUTOR]: number of ready workers in queue: ") (displayln (q-length worker-channels-queue)) (display "[WORK-DISTRIBUTOR]: number of works in queue: ") (displayln (q-length work-queue)) (match (pk 'work-distributor-received-msg (get-message channel-receive)) [('worker-ready . channel-worker) (displayln "[WORK-DISTRIBUTOR]: work-distributor received ready worker channel") ;; If there is no work for the ready worker, enqueue the worker, ;; otherwise give it work. (cond [(q-empty? work-queue) ;; (displayln "[WORK-DISTRIBUTOR]: work queue is empty") (enq! worker-channels-queue channel-worker)] [else ;; (displayln "[WORK-DISTRIBUTOR]: work queue has work") (let ([some-work (deq! work-queue)]) ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor will put work on channel") (put-message channel-worker (cons 'work some-work)) ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor did put work on channel") )]) (loop work-queue worker-channels-queue)] [('work . work) ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor received work") ;; ~work~ is always a pair of a thunk to be run and a return channel, ;; on which the result shall be put. ;; If there is no worker ready, enqueue the work, otherwise distribute ;; the work to a ready worker. (cond [(q-empty? worker-channels-queue) ;; (displayln "[WORK-DISTRIBUTOR]: worker queue is empty") (enq! work-queue work)] [else ;; (displayln "[WORK-DISTRIBUTOR]: ready workers available") (let ([channel-worker (deq! worker-channels-queue)]) ;; (displayln "[WORK-DISTRIBUTOR]: will put work on channel") (put-message channel-worker (cons 'work work)) ;; (displayln "[WORK-DISTRIBUTOR]: did put work on channel") )]) (loop work-queue worker-channels-queue)] ;; On any other message raise a condition. [other (raise (condition (make-error) (make-message-condition "work-distributor received unrecognized message") (make-irritants-condition (list other))))])))) (define worker (lambda (worker-index channel-receive) (let ([channel-worker (make-channel)]) (displayln "[WORKER]: before worker message loop") (let loop () ;; Report as ready. Give my own channel to the work-distributor to let ;; it send me work. (put-message channel-receive (cons 'worker-ready channel-worker)) ;; Get messages sent to me by the distributor on my own channel. (match (pk 'worker-got-msg (get-message channel-worker)) ;; If I receive work, do the work and return it on the channel-return. [('work . (thunk . channel-return)) ;; Put the result on the return channel, so that anyone, who has the ;; a binding of the return channel, can access the result. (put-message channel-return (thunk)) (loop)] ;; On any other message raise a condition. [other (raise (condition (make-error) (make-message-condition "worker received unrecognized message") (make-irritants-condition (list other))))]))))) (define pool-initializer (lambda* (#:key (parallelism (current-processor-count))) ;; (define run-fibers-in-scheduler ;; (displayln "[POOL INIT]: runnning pool-initializer") (let ([channel-receive (make-channel)] [scheduler (make-scheduler #:parallelism parallelism)]) ;; start as many workers as are desired ;; TODO: PROBLEM: ~run-fibers~ blocks. So we need a new thread to run the ;; fibers in a non-blocking way. LOOKUP: How to start fibers without ;; waiting for them to finish? ;; (displayln "[POOL INIT]: will run-fibers with new thread") (call-with-new-thread (lambda () ;; (displayln "[POOL INIT THREAD]: running") (run-fibers (lambda () ;; (displayln "[POOL INIT THREAD]: will start some fibers") ;; (display "[POOL INIT THREAD]: parallelism is: ") (displayln parallelism) (let loop ([index parallelism]) (unless (zero? index) ;; using fibers: ;; TODO: use created scheduler ;; (displayln "[POOL INIT THREAD]: there are more fibers to spawn") (display "[POOL INIT THREAD]: will spawn fiber ") (displayln index) (spawn-fiber (lambda () (worker index channel-receive)) ) ;; We do not need to spawn new fibers in the same scheduler later. The ;; fibers should stay alive for the whole duration the program is ;; running. (displayln "[POOL INIT THREAD]: fiber spawned") (loop (- index 1))))) #:scheduler scheduler) (displayln "[POOL INIT]: pool init thread returning") )) (displayln "[POOL INIT]: will start work-distributor") (call-with-new-thread (lambda () (work-distributor channel-receive))) ;; (displayln "[POOL INIT]: work-distributor is now running in new thread") ;; Return the channel for receiving work, so that the outside context can ;; make use of it when calling ~publish~ to publish work. ;; (displayln "[POOL INIT]: returning channel-receive") channel-receive))) (define publish (lambda (work-as-thunk channel-receive) ;; The result of the computation can be taken from ~channel-return~. (let ([channel-return (make-channel)]) ;; Put work tagged as work on the receive channel of the work-distributor. (let ([work-message (cons 'work (cons work-as-thunk channel-return))]) (display (simple-format #f "[PUBLISHER]: will publish the following work: ~a\n" work-message)) (put-message channel-receive work-message)) (displayln "[PUBLISHER]: work published") ;; Return the ~channel-return~, so that the outside context can get ;; results from it. channel-return))) (define busy-work (lambda () (let loop ([i 0]) (cond [(< i 5e8) (loop (+ i 1))] [else i])))) (define c-rec (pool-initializer #:parallelism 2)) (define c-ret-2 (publish (lambda () (busy-work)) c-rec)) (define c-ret-1 (publish (lambda () (busy-work)) c-rec)) ;; (get-message c-ret-2) ;; (get-message c-ret-1)