guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

04/06: Rework the shortlived PostgreSQL specific connection channel


From: Christopher Baines
Subject: 04/06: Rework the shortlived PostgreSQL specific connection channel
Date: Sat, 3 Oct 2020 16:43:18 -0400 (EDT)

cbaines pushed a commit to branch master
in repository data-service.

commit e2e55c69de1eceb77998ab059a943711ef7779fd
Author: Christopher Baines <mail@cbaines.net>
AuthorDate: Sat Oct 3 21:32:46 2020 +0100

    Rework the shortlived PostgreSQL specific connection channel
    
    In to a generic thing more like (ice-9 futures). Including copying some bits
    from the (ice-9 threads) module and adapting them to work with this fibers
    approach, rather than futures. The advantage being that using fibers 
channels
    doesn't block the threads being used by fibers, whereas futures would.
---
 .dir-locals.el                      |   4 +-
 guix-data-service/data-deletion.scm |  17 +++---
 guix-data-service/database.scm      |  95 ---------------------------------
 guix-data-service/utils.scm         | 102 +++++++++++++++++++++++++++++++++++-
 4 files changed, 112 insertions(+), 106 deletions(-)

diff --git a/.dir-locals.el b/.dir-locals.el
index 5d052d8..f7cbfb5 100644
--- a/.dir-locals.el
+++ b/.dir-locals.el
@@ -7,7 +7,9 @@
  (scheme-mode
   (indent-tabs-mode)
   (eval put 'with-time-logging 'scheme-indent-function 1)
-  (eval put 'make-parameter 'scheme-indent-function 1))
+  (eval put 'make-parameter 'scheme-indent-function 1)
+  (eval put 'letpar 'scheme-indent-function 1)
+  (eval put 'letpar& 'scheme-indent-function 1))
  (texinfo-mode
   (indent-tabs-mode)
   (fill-column . 72)))
diff --git a/guix-data-service/data-deletion.scm 
b/guix-data-service/data-deletion.scm
index 6c4e0b9..197cef1 100644
--- a/guix-data-service/data-deletion.scm
+++ b/guix-data-service/data-deletion.scm
@@ -448,17 +448,16 @@ WHERE NOT EXISTS (
                  (lambda (count result)
                    (+ result count))
                  0
-                 (par-map (lambda (derivation-id)
-                            (with-postgresql-transaction/through-channel
-                             conn-channel
-                             (lambda (conn)
-                               (exec-query
-                                conn
-                                "
+                 (par-map& (lambda (derivation-id)
+                             (with-thread-postgresql-connection
+                              (lambda (conn)
+                                (exec-query
+                                 conn
+                                 "
 SET CONSTRAINTS derivations_by_output_details_set_derivation_id_fkey DEFERRED")
 
-                               (maybe-delete-derivation conn derivation-id))))
-                          derivations))))
+                                (maybe-delete-derivation conn derivation-id))))
+                           derivations))))
            (simple-format (current-error-port)
                           "Deleted ~A derivations\n"
                           deleted-count)
diff --git a/guix-data-service/database.scm b/guix-data-service/database.scm
index 89b1a09..4d1001b 100644
--- a/guix-data-service/database.scm
+++ b/guix-data-service/database.scm
@@ -20,9 +20,6 @@
   #:use-module (ice-9 match)
   #:use-module (ice-9 threads)
   #:use-module (squee)
-  #:use-module (fibers)
-  #:use-module (fibers channels)
-  #:use-module (fibers conditions)
   #:use-module (guix-data-service config)
   #:export (with-postgresql-connection
 
@@ -136,98 +133,6 @@
 
           (f conn)))))
 
-(define* (make-postgresql-connection-channel name
-                                             #:key
-                                             (statement-timeout #f)
-                                             (threads 2))
-  (parameterize (((@@ (fibers internal) current-fiber) #f))
-    (let ((channel (make-channel)))
-      (for-each
-       (lambda _
-         (call-with-new-thread
-          (lambda ()
-            (with-postgresql-connection
-             name
-             (lambda (conn)
-               (let loop ()
-                 (match (get-message channel)
-                   (((? channel? reply) f (? boolean? allways-rollback?))
-                    (put-message
-                     reply
-                     (with-exception-handler
-                         (lambda (exn)
-                           (cons 'worker-thread-error exn))
-                       (lambda ()
-                         (with-exception-handler
-                             (lambda (exn)
-                               (simple-format
-                                (current-error-port)
-                                "postgresql connection thread: exception: ~A\n"
-                                exn)
-                               (backtrace)
-                               (raise-exception exn))
-                           (lambda ()
-                             (call-with-values
-                                 (lambda ()
-                                   (with-postgresql-transaction
-                                    conn
-                                    (lambda (conn)
-                                      (f conn))))
-                               (lambda vals vals)))))
-                       #:unwind? #t))
-                    (loop))
-                   (((? channel? reply) . (? list? args))
-                    (put-message
-                     reply
-                     (with-exception-handler
-                         (lambda (exn)
-                           (cons 'worker-thread-error exn))
-                       (lambda ()
-                         (with-exception-handler
-                             (lambda (exn)
-                               (simple-format
-                                (current-error-port)
-                                "postgresql connection thread: exception: ~A\n"
-                                exn)
-                               (backtrace)
-                               (raise-exception exn))
-                           (lambda ()
-                             (call-with-values
-                                 (lambda ()
-                                   (apply exec-query
-                                          conn
-                                          args))
-                               (lambda vals vals)))))
-                       #:unwind? #t))
-                    (loop))
-                   (_ #f))))
-             #:statement-timeout statement-timeout))))
-       (iota threads))
-      channel)))
-
-(define (close-postgresql-connection-channel channel)
-  (put-message channel #f))
-
-(define (exec-query/through-channel channel . args)
-  (let ((reply (make-channel)))
-    (put-message channel (cons reply args))
-    (match (get-message reply)
-      (('worker-thread-error . exn)
-       (raise-exception exn))
-      (result
-       (apply values result)))))
-
-(define* (with-postgresql-transaction/through-channel channel
-                                                      f
-                                                      #:key always-rollback?)
-  (let ((reply (make-channel)))
-    (put-message channel (list reply f always-rollback?))
-    (match (get-message reply)
-      (('worker-thread-error . exn)
-       (raise-exception exn))
-      (result
-       (apply values result)))))
-
 (define* (with-postgresql-transaction conn f
                                       #:key always-rollback?)
   (exec-query conn "BEGIN;")
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm
index 738f839..855c819 100644
--- a/guix-data-service/utils.scm
+++ b/guix-data-service/utils.scm
@@ -17,9 +17,18 @@
 
 (define-module (guix-data-service utils)
   #:use-module (srfi srfi-11)
+  #:use-module (ice-9 match)
+  #:use-module (ice-9 threads)
+  #:use-module (fibers)
+  #:use-module (fibers channels)
+  #:use-module (fibers conditions)
   #:export (call-with-time-logging
             with-time-logging
-            prevent-inlining-for-tests))
+            prevent-inlining-for-tests
+
+            parallel-via-thread-pool-channel
+            par-map&
+            letpar&))
 
 (define (call-with-time-logging action thunk)
   (simple-format #t "debug: Starting ~A\n" action)
@@ -38,3 +47,94 @@
 
 (define-syntax-rule (prevent-inlining-for-tests var)
   (set! var var))
+
+
+(define* (make-thread-pool-channel #:key (threads 8))
+  (parameterize (((@@ (fibers internal) current-fiber) #f))
+    (let ((channel (make-channel)))
+      (for-each
+       (lambda _
+         (call-with-new-thread
+          (lambda ()
+            (let loop ()
+              (match (get-message channel)
+                (((? channel? reply) . (? procedure? proc))
+                 (put-message
+                  reply
+                  (with-exception-handler
+                      (lambda (exn)
+                        (cons 'worker-thread-error exn))
+                    (lambda ()
+                      (with-exception-handler
+                          (lambda (exn)
+                            (simple-format
+                             (current-error-port)
+                             "worker thread: exception: ~A\n"
+                             exn)
+                            (backtrace)
+                            (raise-exception exn))
+                        (lambda ()
+                          (call-with-values
+                              proc
+                            (lambda vals
+                              vals)))))
+                    #:unwind? #t))
+                 (loop))
+                (_ #f))))))
+       (iota threads))
+      channel)))
+
+(define %thread-pool-mutex (make-mutex))
+(define %thread-pool-channel #f)
+
+(define (make-thread-pool-channel!')
+  (with-mutex %thread-pool-mutex
+    (unless %thread-pool-channel
+      (set! %thread-pool-channel (make-thread-pool-channel))
+      (set! make-thread-pool-channel! (lambda () #t)))))
+
+(define make-thread-pool-channel!
+  (lambda () (make-thread-pool-channel!')))
+
+(define (defer-to-thread-pool-channel thunk)
+  (make-thread-pool-channel!)
+  (let ((reply (make-channel)))
+    (put-message %thread-pool-channel (cons reply thunk))
+    reply))
+
+(define (fetch-result-of-defered-thunk reply-channel)
+  (match (get-message reply-channel)
+    (('worker-thread-error . exn)
+     (raise-exception exn))
+    (result
+     (apply values result))))
+
+(define-syntax parallel-via-thread-pool-channel
+  (lambda (x)
+    (syntax-case x ()
+      ((_ e0 ...)
+       (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
+         #'(let ((tmp0 (defer-to-thread-pool-channel
+                         (lambda ()
+                           e0)))
+                 ...)
+             (values (fetch-result-of-defered-thunk tmp0) ...)))))))
+
+(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
+  (call-with-values
+      (lambda () (parallel-via-thread-pool-channel e ...))
+    (lambda (v ...)
+      b0 b1 ...)))
+
+(define (par-mapper' mapper cons)
+  (lambda (proc . lists)
+    (let loop ((lists lists))
+      (match lists
+        (((heads tails ...) ...)
+         (let ((tail (defer-to-thread-pool-channel (loop tails)))
+               (head (apply proc heads)))
+           (cons head (fetch-result-of-defered-thunk tail))))
+        (_
+         '())))))
+
+(define par-map& (par-mapper' map cons))



reply via email to

[Prev in Thread] Current Thread [Next in Thread]