gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] 05/05: Long-polling.


From: gnunet
Subject: [libeufin] 05/05: Long-polling.
Date: Fri, 10 Mar 2023 17:44:35 +0100

This is an automated email from the git hooks/post-receive script.

ms pushed a commit to branch master
in repository libeufin.

commit cc87817b4adab5172aa7cb3a86580892cfc71c56
Author: MS <ms@taler.net>
AuthorDate: Fri Mar 10 17:43:23 2023 +0100

    Long-polling.
    
    Using DB notifications for /history/incoming at
    the Taler facade.
---
 nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt  |  12 +-
 nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt    |   3 +-
 nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt | 161 +++++++++++++++++----
 .../tech/libeufin/nexus/server/NexusServer.kt      |   1 -
 4 files changed, 144 insertions(+), 33 deletions(-)

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt
index 3decc126..bdaa0ec3 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt
@@ -60,13 +60,13 @@ fun findPermission(p: Permission): NexusPermissionEntity? {
  * Require that the authenticated user has at least one of the listed 
permissions.
  *
  * Throws a NexusError if the authenticated user for the request doesn't have 
any of
- * listed the permissions.
+ * listed the permissions.  It returns the username of the authorized user.
  */
-fun ApplicationRequest.requirePermission(vararg perms: PermissionQuery) {
-    transaction {
+fun ApplicationRequest.requirePermission(vararg perms: PermissionQuery): 
String {
+    val username = transaction {
         val user = authenticateRequest(this@requirePermission)
         if (user.superuser) {
-            return@transaction
+            return@transaction user.username
         }
         var foundPermission = false
         for (pr in perms) {
@@ -82,8 +82,10 @@ fun ApplicationRequest.requirePermission(vararg perms: 
PermissionQuery) {
                 perms.joinToString(" | ") { "${it.resourceId} 
${it.resourceType} ${it.permissionName}" }
             throw NexusError(
                 HttpStatusCode.Forbidden,
-                "User ${user.id.value} has insufficient permissions (needs 
$possiblePerms."
+                "User ${user.username} has insufficient permissions (needs 
$possiblePerms)."
             )
         }
+        user.username
     }
+    return username
 }
\ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
index b1ddbd17..f8ce7bd4 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
@@ -29,6 +29,7 @@ import tech.libeufin.nexus.iso20022.EntryStatus
 import tech.libeufin.util.EbicsInitState
 import java.sql.Connection
 
+
 /**
  * This table holds the values that exchange gave to issue a payment,
  * plus a reference to the prepared pain.001 version of.  Note that
@@ -101,7 +102,6 @@ object TalerIncomingPaymentsTable : LongIdTable() {
     val debtorPaytoUri = text("incomingPaytoUri")
 }
 
-
 class TalerIncomingPaymentEntity(id: EntityID<Long>) : LongEntity(id) {
     companion object : 
LongEntityClass<TalerIncomingPaymentEntity>(TalerIncomingPaymentsTable)
 
@@ -343,7 +343,6 @@ class EbicsSubscriberEntity(id: EntityID<Long>) : 
LongEntity(id) {
 }
 
 object NexusUsersTable : LongIdTable() {
-
     val username = text("username")
     val passwordHash = text("password")
     val superuser = bool("superuser")
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
index 52dca293..c9e73662 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
@@ -19,6 +19,7 @@
 
 package tech.libeufin.nexus
 
+import UtilError
 import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
 import io.ktor.server.application.ApplicationCall
 import io.ktor.server.application.call
@@ -33,16 +34,24 @@ import io.ktor.server.routing.Route
 import io.ktor.server.routing.get
 import io.ktor.server.routing.post
 import io.ktor.server.util.*
+import io.ktor.util.*
+import kotlinx.coroutines.*
+import net.taler.wallet.crypto.Base32Crockford
 import org.jetbrains.exposed.dao.Entity
 import org.jetbrains.exposed.dao.id.IdTable
 import org.jetbrains.exposed.sql.*
+import org.jetbrains.exposed.sql.transactions.TransactionManager
 import org.jetbrains.exposed.sql.transactions.transaction
+import org.postgresql.jdbc.PgConnection
 import tech.libeufin.nexus.bankaccount.addPaymentInitiation
 import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
+import tech.libeufin.nexus.bankaccount.getBankAccount
 import tech.libeufin.nexus.iso20022.*
 import tech.libeufin.nexus.server.*
 import tech.libeufin.util.*
 import java.net.URL
+import java.util.concurrent.atomic.AtomicReference
+import javax.xml.crypto.Data
 import kotlin.math.abs
 import kotlin.math.min
 
@@ -66,7 +75,8 @@ data class TalerTransferResponse(
 )
 
 /**
- * History accounting data structures
+ * History accounting data structures, typically
+ * used to build JSON responses.
  */
 data class TalerIncomingBankTransaction(
     val row_id: Long,
@@ -122,21 +132,16 @@ fun getComparisonOperator(delta: Int, start: Long, table: 
IdTable<Long>): Op<Boo
 }
 
 fun expectLong(param: String?): Long {
-    if (param == null) {
-        throw EbicsProtocolError(HttpStatusCode.BadRequest, "'$param' is not 
Long")
-    }
-    return try {
-        param.toLong()
-    } catch (e: Exception) {
-        throw EbicsProtocolError(HttpStatusCode.BadRequest, "'$param' is not 
Long")
+    if (param == null) throw badRequest("'$param' is not Long")
+    return try { param.toLong() } catch (e: Exception) {
+        throw badRequest("'$param' is not Long")
     }
 }
 
-/** Helper handling 'start' being optional and its dependence on 'delta'.  */
+// Helper handling 'start' being optional and its dependence on 'delta'.
 fun handleStartArgument(start: String?, delta: Int): Long {
     if (start == null) {
-        if (delta >= 0)
-            return -1
+        if (delta >= 0) return -1
         return Long.MAX_VALUE
     }
     return expectLong(start)
@@ -307,8 +312,35 @@ fun talerFilter(
         reservePublicKey = reservePub
         timestampMs = System.currentTimeMillis()
         debtorPaytoUri = buildIbanPaytoUri(
-            debtorIban, debtorAgent.bic, debtorName
+            debtorIban,
+            debtorAgent.bic,
+            debtorName
+        )
+    }
+    val dbTx = TransactionManager.currentOrNull() ?: throw NexusError(
+        HttpStatusCode.InternalServerError,
+        "talerFilter(): unexpected execution out of a DB transaction"
+    )
+    /**
+     * Without COMMIT here, the woken up LISTENer won't
+     * find the record in the database.
+     */
+    dbTx.commit()
+    // Only supporting Postgres' NOTIFY.
+    if (dbTx.isPostgres()) {
+        val channelName = buildChannelName(
+            NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING,
+            payment.bankAccount.iban
+        )
+        logger.debug("NOTIFYing on domain" +
+                " ${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" +
+                " for IBAN: ${payment.bankAccount.iban}.  Resulting channel" +
+                " name: $channelName.")
+        val notifyHandle = PostgresListenNotify(
+            dbTx.getPgConnection(),
+            channelName
         )
+        notifyHandle.postgresNotify()
     }
 }
 
@@ -445,27 +477,106 @@ private suspend fun historyOutgoing(call: 
ApplicationCall) {
     )
 }
 
-/**
- * Handle a /taler-wire-gateway/history/incoming request.
- */
+// Handle a /taler-wire-gateway/history/incoming request.
 private suspend fun historyIncoming(call: ApplicationCall) {
     val facadeId = expectNonNull(call.parameters["fcid"])
-    call.request.requirePermission(PermissionQuery("facade", facadeId, 
"facade.talerwiregateway.history"))
+    val username = call.request.requirePermission(
+        PermissionQuery(
+            "facade",
+            facadeId,
+            "facade.talerwiregateway.history"
+        )
+    )
+    val longPollTimeoutPar = call.parameters["long_poll_ms"]
+    val longPollTimeout = if (longPollTimeoutPar != null) {
+        val longPollTimeoutValue = try { longPollTimeoutPar.toLong() }
+        catch (e: Exception) {
+            throw badRequest("long_poll_ms value is invalid")
+        }
+        longPollTimeoutValue
+    } else null
     val param = call.expectUrlParameter("delta")
-    val delta: Int = try {
-        param.toInt()
-    } catch (e: Exception) {
+    val delta: Int = try { param.toInt() } catch (e: Exception) {
         throw EbicsProtocolError(HttpStatusCode.BadRequest, "'${param}' is not 
Int")
     }
     val start: Long = 
handleStartArgument(call.request.queryParameters["start"], delta)
     val history = TalerIncomingHistory()
     val startCmpOp = getComparisonOperator(delta, start, 
TalerIncomingPaymentsTable)
-    transaction {
-        val orderedPayments = TalerIncomingPaymentEntity.find {
-            startCmpOp
-        }.orderTaler(delta)
-        if (orderedPayments.isNotEmpty()) {
-            orderedPayments.subList(0, min(abs(delta), 
orderedPayments.size)).forEach {
+    /**
+     * The following block checks first for results, and then LISTEN
+     * _only if_ the client gave the long_poll_ms parameter.
+     */
+    var resultOrWait: Pair<
+            List<TalerIncomingPaymentEntity>,
+            PostgresListenNotify?
+            > = transaction {
+        val res = TalerIncomingPaymentEntity.find { startCmpOp 
}.orderTaler(delta)
+        // Register to Postgres notifications, if no results arrived.
+        if (res.isEmpty() && this.isPostgres() && longPollTimeout != null) {
+            // Getting the IBAN to build the unique channel name.
+            val f = FacadeEntity.find { FacadesTable.facadeName eq facadeId 
}.firstOrNull()
+            if (f == null) throw internalServerError(
+                "Handling request for facade '$facadeId', but that's not found 
in the database."
+            )
+            val fState = FacadeStateEntity.find {
+                FacadeStateTable.facade eq f.id.value
+            }.firstOrNull()
+            if (fState == null) throw internalServerError(
+                "Facade '$facadeId' exist but has no state."
+            )
+            val bankAccount = getBankAccount(fState.bankAccount)
+            val channelName = buildChannelName(
+                NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING,
+                bankAccount.iban
+            )
+            logger.debug("LISTENing on domain " +
+                    "${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" +
+                    " for IBAN: ${bankAccount.iban} with timeout: 
$longPollTimeoutPar." +
+                    " Resulting channel name: $channelName"
+            )
+            val listenHandle = PostgresListenNotify(
+                this.getPgConnection(),
+                channelName
+            )
+            listenHandle.postrgesListen()
+            return@transaction Pair(res, listenHandle)
+        }
+        Pair(res, null)
+    }
+    /**
+     * Wait here by releasing the execution, or proceed to response if didn't 
sleep.
+     * The right condition only silences the compiler, because when the 
timeout is null
+     * the left condition is always false (no listen-notify object.)
+     */
+    if (resultOrWait.second != null && longPollTimeout != null) {
+        logger.debug("Waiting for NOTIFY, with timeout: $longPollTimeoutPar 
ms")
+        val listenHandle = resultOrWait.second!!
+        val notificationArrived = 
listenHandle.postgresWaitNotification(longPollTimeout)
+        if (notificationArrived) {
+            val likelyNewPayments = transaction {
+                // addLogger(StdOutSqlLogger)
+                TalerIncomingPaymentEntity.find { startCmpOp 
}.orderTaler(delta)
+            }
+            /**
+             * NOTE: the query can still have zero results despite the
+             * notification.  That happens when the 'start' URI param is
+             * higher than the ID of the new row in the database.  Not
+             * an error.
+             */
+            resultOrWait = Pair(likelyNewPayments, null)
+        }
+    }
+    /**
+     * Whether because of a timeout or a notification or of never slept, here 
it
+     * proceeds to the response (== resultOrWait.first IS EFFECTIVE).
+     */
+    val maybeNewPayments = resultOrWait.first
+    if (maybeNewPayments.isNotEmpty()) {
+        transaction {
+            maybeNewPayments.subList(
+                0,
+                min(abs(delta), maybeNewPayments.size)
+            ).forEach {
                 history.incoming_transactions.add(
                     TalerIncomingBankTransaction(
                         // Rounded timestamp
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
index 79ab9adf..2632100f 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
@@ -57,7 +57,6 @@ import tech.libeufin.util.*
 import java.net.BindException
 import java.net.URLEncoder
 import kotlin.system.exitProcess
-
 /**
  * Return facade state depending on the type.
  */

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]