gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] 04/05: Implementing database notifications.


From: gnunet
Subject: [libeufin] 04/05: Implementing database notifications.
Date: Fri, 10 Mar 2023 17:44:34 +0100

This is an automated email from the git hooks/post-receive script.

ms pushed a commit to branch master
in repository libeufin.

commit 889b88faf594891b2eb9fa35be832e74d553e730
Author: MS <ms@taler.net>
AuthorDate: Fri Mar 10 17:37:08 2023 +0100

    Implementing database notifications.
---
 util/src/main/kotlin/DB.kt   | 112 ++++++++++++++++++++++++++-----------------
 util/src/main/kotlin/HTTP.kt |   2 +-
 2 files changed, 68 insertions(+), 46 deletions(-)

diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index 63a213e2..beb5d12f 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -20,16 +20,44 @@
 package tech.libeufin.util
 import UtilError
 import io.ktor.http.*
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.withContext
 import logger
+import net.taler.wallet.crypto.Base32Crockford
+import org.jetbrains.exposed.sql.Transaction
 import org.postgresql.jdbc.PgConnection
-import java.lang.Long.max
 
-// This class abstracts the LISTEN/NOTIFY construct supported
-class PostgresListenNotify(
-    private val conn: PgConnection,
-    private val channel: String
-) {
+fun Transaction.isPostgres(): Boolean {
+    return this.db.vendor == "postgresql"
+}
+
+fun Transaction.getPgConnection(): PgConnection {
+    if (!this.isPostgres()) throw UtilError(
+        HttpStatusCode.InternalServerError,
+        "Unexpected non-postgresql connection: ${this.db.vendor}"
+    )
+    return this.db.connector().connection as PgConnection
+}
+
+// Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance.
+enum class NotificationsChannelDomains(val value: Int) {
+    LIBEUFIN_TALER_INCOMING(3000)
+}
+
+// Helper that builds a LISTEN-NOTIFY channel name.
+fun buildChannelName(
+    domain: NotificationsChannelDomains,
+    iban: String,
+    separator: String = "_"
+): String {
+    val channelElements = "${domain.value}$separator$iban"
+    return 
"X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}"
+}
+
+// This class abstracts Postgres' LISTEN/NOTIFY.
+// FIXME: find facts where Exposed provides always a live 'conn'.
+class PostgresListenNotify(val conn: PgConnection, val channel: String) {
     fun postrgesListen() {
         val stmt = conn.createStatement()
         stmt.execute("LISTEN $channel")
@@ -41,44 +69,38 @@ class PostgresListenNotify(
         stmt.close()
     }
 
-    suspend fun postgresWaitNotification(timeoutMs: Long) {
-        // Splits the checks into 10ms chunks.
-        val sleepTimeMs = 10L
-        var notificationFound = false
-        val iterations = timeoutMs / sleepTimeMs
-        for (i in 0..iterations) {
-            val maybeNotifications = conn.notifications
-            // If a notification arrived, stop fetching for it.
-            if (maybeNotifications.isNotEmpty()) {
-                // Double checking that the channel is correct.
-                // Notification(s) arrived, double-check channel name.
-                maybeNotifications.forEach {
-                    if (it.name != channel) {
-                        throw UtilError(
-                            statusCode = HttpStatusCode.InternalServerError,
-                            reason = "Listener got wrong notification.  
Expected: $channel, but got: ${it.name}"
-                        )
-                    }
-                }
-                notificationFound = true
-                break
-            }
-            /* Notification didn't arrive, release the thread and
-             * retry in the next chunk.  */
-            delay(sleepTimeMs)
-        }
+    fun postgresUnlisten() {
+        val stmt = conn.createStatement()
+        stmt.execute("UNLISTEN $channel")
+        stmt.close()
+    }
 
-        if (!notificationFound) {
-            throw UtilError(
-                statusCode = HttpStatusCode.NotFound,
-                reason = "Timeout expired for notification on channel 
$channel",
-                ec = LibeufinErrorCode.LIBEUFIN_EC_TIMEOUT_EXPIRED
+    /**
+     * Asks Postgres for notifications with a timeout.  Returns
+     * true when there have been, false otherwise.
+     */
+    fun postgresWaitNotification(timeoutMs: Long): Boolean {
+        if (timeoutMs == 0L)
+            logger.warn("Database notification checker has timeout == 0," +
+                    " that waits FOREVER until a notification arrives."
             )
+        val maybeNotifications = conn.getNotifications(timeoutMs.toInt())
+
+        /**
+         * This check works around the apparent API inconsistency
+         * where instead of returning null, a empty array is given
+         * back when there have been no notifications.
+         */
+        val noResultWorkaround = maybeNotifications.isEmpty()
+        /*if (noResultWorkaround) {
+            logger.warn("JDBC+Postgres: empty array from getNotifications() 
despite docs suggest null.")
+        }*/
+        if (maybeNotifications == null || noResultWorkaround) return false
+
+        for (n in maybeNotifications) {
+            if (n.name.lowercase() != this.channel.lowercase())
+                throw internalServerError("Channel ${this.channel} got 
notified from ${n.name}!")
         }
-        /* Notification arrived.  In this current version
-         * we don't pass any data to the caller; the channel
-         * name itself means that the awaited information arrived.
-         * */
-        return
-        }
-    }
\ No newline at end of file
+        return true
+    }
+}
\ No newline at end of file
diff --git a/util/src/main/kotlin/HTTP.kt b/util/src/main/kotlin/HTTP.kt
index 0f70c7e4..6763db79 100644
--- a/util/src/main/kotlin/HTTP.kt
+++ b/util/src/main/kotlin/HTTP.kt
@@ -88,7 +88,7 @@ fun ApplicationRequest.getBaseUrl(): String {
         logger.info("Building X-Forwarded- base URL")
 
         // FIXME: should tolerate a missing X-Forwarded-Prefix.
-        var prefix: String = this.headers.get("X-Forwarded-Prefix")
+        var prefix: String = this.headers["X-Forwarded-Prefix"]
             ?: throw internalServerError("Reverse proxy did not define 
X-Forwarded-Prefix")
         if (!prefix.endsWith("/"))
             prefix += "/"

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]