gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] 04/07: Postgres notifications.


From: gnunet
Subject: [libeufin] 04/07: Postgres notifications.
Date: Fri, 31 Mar 2023 14:28:08 +0200

This is an automated email from the git hooks/post-receive script.

ms pushed a commit to branch master
in repository libeufin.

commit 2d45653e2ceaca856e678987b1f8c5501fc29c82
Author: MS <ms@taler.net>
AuthorDate: Fri Mar 31 14:10:04 2023 +0200

    Postgres notifications.
    
    Closing the connection after delivery or unlisten.
---
 util/src/main/kotlin/DB.kt | 97 +++++++++++++++++++++++++++-------------------
 1 file changed, 57 insertions(+), 40 deletions(-)

diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index beb5d12f..90189257 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -20,24 +20,28 @@
 package tech.libeufin.util
 import UtilError
 import io.ktor.http.*
-import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.async
 import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.withContext
 import logger
 import net.taler.wallet.crypto.Base32Crockford
+import org.jetbrains.exposed.sql.Database
 import org.jetbrains.exposed.sql.Transaction
+import org.jetbrains.exposed.sql.transactions.TransactionManager
+import org.jetbrains.exposed.sql.transactions.transaction
+import org.jetbrains.exposed.sql.transactions.transactionManager
+import org.postgresql.PGNotification
 import org.postgresql.jdbc.PgConnection
 
 fun Transaction.isPostgres(): Boolean {
     return this.db.vendor == "postgresql"
 }
 
-fun Transaction.getPgConnection(): PgConnection {
-    if (!this.isPostgres()) throw UtilError(
-        HttpStatusCode.InternalServerError,
-        "Unexpected non-postgresql connection: ${this.db.vendor}"
+fun isPostgres(): Boolean {
+    val db = TransactionManager.defaultDatabase ?: throw internalServerError(
+        "Could not find the default database, can't check if that's Postgres."
     )
-    return this.db.connector().connection as PgConnection
+    return db.vendor == "postgresql"
+
 }
 
 // Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance.
@@ -52,55 +56,68 @@ fun buildChannelName(
     separator: String = "_"
 ): String {
     val channelElements = "${domain.value}$separator$iban"
-    return 
"X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}"
+    val ret = 
"X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}"
+    logger.debug("Defining db channel name for IBAN: $iban, domain: 
${domain.name}, resulting in: $ret")
+    return ret
 }
 
-// This class abstracts Postgres' LISTEN/NOTIFY.
-// FIXME: find facts where Exposed provides always a live 'conn'.
-class PostgresListenNotify(val conn: PgConnection, val channel: String) {
-    fun postrgesListen() {
-        val stmt = conn.createStatement()
-        stmt.execute("LISTEN $channel")
-        stmt.close()
-    }
-    fun postgresNotify() {
+fun Transaction.postgresNotify(channel: String) {
+    this.exec("NOTIFY $channel")
+}
+
+/**
+ * postgresListen() and postgresGetNotifications() appear to have
+ * to use the same connection, in order for the notifications to
+ * arrive.  Therefore, calling LISTEN inside one "transaction {}"
+ * and postgresGetNotifications() outside of it did NOT work because
+ * Exposed _closes_ the connection as soon as the transaction block
+ * completes. OTOH, calling postgresGetNotifications() _inside_ the
+ * same transaction block as LISTEN's would lead to keep the database
+ * locked for the timeout duration.
+ *
+ * For this reason, opening and keeping one connection open for the
+ * lifetime of this object and only executing postgresListen() and
+ * postgresGetNotifications() _on that connection_ makes the event
+ * delivery more reliable.
+ */
+class PostgresListenHandle(val channelName: String) {
+    private val db = TransactionManager.defaultDatabase ?: throw 
internalServerError(
+        "Could not find the default database, won't get Postgres 
notifications."
+    )
+    private val conn = db.connector().connection as PgConnection
+
+    fun postgresListen() {
         val stmt = conn.createStatement()
-        stmt.execute("NOTIFY $channel")
+        stmt.execute("LISTEN $channelName")
         stmt.close()
+        logger.debug("LISTENing on channel: $channelName")
     }
-
     fun postgresUnlisten() {
         val stmt = conn.createStatement()
-        stmt.execute("UNLISTEN $channel")
+        stmt.execute("UNLISTEN $channelName")
         stmt.close()
+        logger.debug("UNLISTENing on channel: $channelName")
+        conn.close()
     }
 
-    /**
-     * Asks Postgres for notifications with a timeout.  Returns
-     * true when there have been, false otherwise.
-     */
-    fun postgresWaitNotification(timeoutMs: Long): Boolean {
+    fun postgresGetNotifications(timeoutMs: Long): Boolean {
         if (timeoutMs == 0L)
             logger.warn("Database notification checker has timeout == 0," +
                     " that waits FOREVER until a notification arrives."
             )
-        val maybeNotifications = conn.getNotifications(timeoutMs.toInt())
-
-        /**
-         * This check works around the apparent API inconsistency
-         * where instead of returning null, a empty array is given
-         * back when there have been no notifications.
-         */
-        val noResultWorkaround = maybeNotifications.isEmpty()
-        /*if (noResultWorkaround) {
-            logger.warn("JDBC+Postgres: empty array from getNotifications() 
despite docs suggest null.")
-        }*/
-        if (maybeNotifications == null || noResultWorkaround) return false
-
+        logger.debug("Waiting Postgres notifications on channel " +
+                "'$channelName' for $timeoutMs millis.")
+        val maybeNotifications = this.conn.getNotifications(timeoutMs.toInt())
+        if (maybeNotifications == null || maybeNotifications.isEmpty()) {
+            logger.debug("DB notification channel $channelName was found 
empty.")
+            return false
+        }
         for (n in maybeNotifications) {
-            if (n.name.lowercase() != this.channel.lowercase())
-                throw internalServerError("Channel ${this.channel} got 
notified from ${n.name}!")
+            if (n.name.lowercase() != channelName.lowercase()) {
+                throw internalServerError("Channel $channelName got notified 
from ${n.name}!")
+            }
         }
+        logger.debug("Found DB notifications on channel $channelName")
         return true
     }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]