guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

02/05: Add some utilities to use PostgreSQL/Squee through a channel


From: Christopher Baines
Subject: 02/05: Add some utilities to use PostgreSQL/Squee through a channel
Date: Thu, 1 Oct 2020 14:16:44 -0400 (EDT)

cbaines pushed a commit to branch master
in repository data-service.

commit 614f9888a58fbd7b2e708cbbf0262f3eb42d2d49
Author: Christopher Baines <mail@cbaines.net>
AuthorDate: Thu Oct 1 19:13:30 2020 +0100

    Add some utilities to use PostgreSQL/Squee through a channel
    
    To allow for some concurrency.
---
 guix-data-service/database.scm | 102 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 102 insertions(+)

diff --git a/guix-data-service/database.scm b/guix-data-service/database.scm
index df4daac..8298b93 100644
--- a/guix-data-service/database.scm
+++ b/guix-data-service/database.scm
@@ -18,9 +18,19 @@
 (define-module (guix-data-service database)
   #:use-module (system foreign)
   #:use-module (ice-9 match)
+  #:use-module (ice-9 threads)
   #:use-module (squee)
+  #:use-module (fibers)
+  #:use-module (fibers channels)
+  #:use-module (fibers conditions)
   #:use-module (guix-data-service config)
   #:export (with-postgresql-connection
+
+            make-postgresql-connection-channel
+            close-postgresql-connection-channel
+            exec-query/through-channel
+            with-postgresql-transaction/through-channel
+
             with-postgresql-transaction
 
             check-test-database!
@@ -61,6 +71,98 @@
       (lambda (key . args)
         (pg-conn-finish conn)))))
 
+(define* (make-postgresql-connection-channel name
+                                             #:key
+                                             (statement-timeout #f)
+                                             (threads 4))
+  (parameterize (((@@ (fibers internal) current-fiber) #f))
+    (let ((channel (make-channel)))
+      (for-each
+       (lambda _
+         (call-with-new-thread
+          (lambda ()
+            (with-postgresql-connection
+             name
+             (lambda (conn)
+               (let loop ()
+                 (match (get-message channel)
+                   (((? channel? reply) f (? boolean? allways-rollback?))
+                    (put-message
+                     reply
+                     (with-exception-handler
+                         (lambda (exn)
+                           (cons 'worker-thread-error exn))
+                       (lambda ()
+                         (with-exception-handler
+                             (lambda (exn)
+                               (simple-format
+                                (current-error-port)
+                                "postgresql connection thread: exception: ~A\n"
+                                exn)
+                               (backtrace)
+                               (raise-exception exn))
+                           (lambda ()
+                             (call-with-values
+                                 (lambda ()
+                                   (with-postgresql-transaction
+                                    conn
+                                    (lambda (conn)
+                                      (f conn))))
+                               (lambda vals vals)))))
+                       #:unwind? #t))
+                    (loop))
+                   (((? channel? reply) . (? list? args))
+                    (put-message
+                     reply
+                     (with-exception-handler
+                         (lambda (exn)
+                           (cons 'worker-thread-error exn))
+                       (lambda ()
+                         (with-exception-handler
+                             (lambda (exn)
+                               (simple-format
+                                (current-error-port)
+                                "postgresql connection thread: exception: ~A\n"
+                                exn)
+                               (backtrace)
+                               (raise-exception exn))
+                           (lambda ()
+                             (call-with-values
+                                 (lambda ()
+                                   (apply exec-query
+                                          conn
+                                          args))
+                               (lambda vals vals)))))
+                       #:unwind? #t))
+                    (loop))
+                   (_ #f))))
+             #:statement-timeout statement-timeout))))
+       (iota threads))
+      channel)))
+
+(define (close-postgresql-connection-channel channel)
+  (put-message channel #f))
+
+(define (exec-query/through-channel channel . args)
+  (let ((reply (make-channel)))
+    (put-message channel (cons reply args))
+    (match (get-message reply)
+      (('worker-thread-error . exn)
+       (raise-exception exn))
+      (result
+       (apply values result)))))
+
+(define* (with-postgresql-transaction/through-channel channel
+                                                      f
+                                                      #:key always-rollback?)
+  (let ((reply (make-channel)))
+    (put-message channel (list reply f always-rollback?))
+    (match (get-message reply)
+      (('worker-thread-error . exn)
+       (raise-exception exn))
+      (result
+       (apply values result)))))
+
 (define* (with-postgresql-transaction conn f
                                       #:key always-rollback?)
   (exec-query conn "BEGIN;")



reply via email to

[Prev in Thread] Current Thread [Next in Thread]