gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] 06/07: Taler facade.


From: gnunet
Subject: [libeufin] 06/07: Taler facade.
Date: Fri, 31 Mar 2023 14:28:10 +0200

This is an automated email from the git hooks/post-receive script.

ms pushed a commit to branch master
in repository libeufin.

commit 59a20971438391fb22a536080b27b4db50ea5d31
Author: MS <ms@taler.net>
AuthorDate: Fri Mar 31 14:15:41 2023 +0200

    Taler facade.
    
    Using new interface for DB notifications.
---
 nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt | 108 ++++++++-------------
 1 file changed, 38 insertions(+), 70 deletions(-)

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
index 4b325def..365a4ea5 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
@@ -19,7 +19,6 @@
 
 package tech.libeufin.nexus
 
-import UtilError
 import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
 import io.ktor.server.application.ApplicationCall
 import io.ktor.server.application.call
@@ -34,15 +33,15 @@ import io.ktor.server.routing.Route
 import io.ktor.server.routing.get
 import io.ktor.server.routing.post
 import io.ktor.server.util.*
-import io.ktor.util.*
-import kotlinx.coroutines.*
-import net.taler.wallet.crypto.Base32Crockford
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.async
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.currentCoroutineContext
 import org.jetbrains.exposed.dao.Entity
 import org.jetbrains.exposed.dao.id.IdTable
 import org.jetbrains.exposed.sql.*
 import org.jetbrains.exposed.sql.transactions.TransactionManager
 import org.jetbrains.exposed.sql.transactions.transaction
-import org.postgresql.jdbc.PgConnection
 import tech.libeufin.nexus.bankaccount.addPaymentInitiation
 import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
 import tech.libeufin.nexus.bankaccount.getBankAccount
@@ -50,8 +49,6 @@ import tech.libeufin.nexus.iso20022.*
 import tech.libeufin.nexus.server.*
 import tech.libeufin.util.*
 import java.net.URL
-import java.util.concurrent.atomic.AtomicReference
-import javax.xml.crypto.Data
 import kotlin.math.abs
 import kotlin.math.min
 
@@ -245,10 +242,12 @@ private suspend fun talerTransfer(call: ApplicationCall) {
     )
 }
 
+// Processes new transactions and stores TWG-specific data in
 fun talerFilter(
     payment: NexusBankTransactionEntity,
     txDtls: TransactionDetails
 ) {
+    val channelsToNotify = mutableListOf<String>()
     var isInvalid = false // True when pub is invalid or duplicate.
     val subject = txDtls.unstructuredRemittanceInformation
     val debtorName = txDtls.debtor?.name
@@ -324,11 +323,6 @@ fun talerFilter(
         HttpStatusCode.InternalServerError,
         "talerFilter(): unexpected execution out of a DB transaction"
     )
-    /**
-     * Without COMMIT here, the woken up LISTENer won't
-     * find the record in the database.
-     */
-    dbTx.commit()
     // Only supporting Postgres' NOTIFY.
     if (dbTx.isPostgres()) {
         val channelName = buildChannelName(
@@ -339,11 +333,7 @@ fun talerFilter(
                 " ${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" +
                 " for IBAN: ${payment.bankAccount.iban}.  Resulting channel" +
                 " name: $channelName.")
-        val notifyHandle = PostgresListenNotify(
-            dbTx.getPgConnection(),
-            channelName
-        )
-        notifyHandle.postgresNotify()
+        dbTx.postgresNotify(channelName)
     }
 }
 
@@ -505,75 +495,53 @@ private suspend fun historyIncoming(call: 
ApplicationCall) {
     val start: Long = 
handleStartArgument(call.request.queryParameters["start"], delta)
     val history = TalerIncomingHistory()
     val startCmpOp = getComparisonOperator(delta, start, 
TalerIncomingPaymentsTable)
+    val listenHandle: PostgresListenHandle? = if (isPostgres() && 
longPollTimeout != null) {
+        val notificationChannelName = buildChannelName(
+            NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING,
+            getFacadeBankAccount(facadeId).iban
+        )
+        val handle = PostgresListenHandle(channelName = 
notificationChannelName)
+        handle.postgresListen()
+        handle
+    } else null
+
     /**
-     * The following block checks first for results, and then LISTEN
-     * _only if_ the client gave the long_poll_ms parameter.
+     * NOTE: the LISTEN command MAY also go inside this transaction,
+     * but that uses a connection other than the one provided by the
+     * transaction block.  More facts on the consequences are needed.
      */
-    var resultOrWait: Pair<
-            List<TalerIncomingPaymentEntity>,
-            PostgresListenNotify?
-            > = transaction {
-        val res = TalerIncomingPaymentEntity.find { startCmpOp 
}.orderTaler(delta)
-        // Register to Postgres notifications, if no results arrived.
-        if (res.isEmpty() && this.isPostgres() && longPollTimeout != null) {
-            // Getting the IBAN to build the unique channel name.
-            val f = FacadeEntity.find { FacadesTable.facadeName eq facadeId 
}.firstOrNull()
-            if (f == null) throw internalServerError(
-                "Handling request for facade '$facadeId', but that's not found 
in the database."
-            )
-            val fState = FacadeStateEntity.find {
-                FacadeStateTable.facade eq f.id.value
-            }.firstOrNull()
-            if (fState == null) throw internalServerError(
-                "Facade '$facadeId' exist but has no state."
-            )
-            val bankAccount = getBankAccount(fState.bankAccount)
-            val channelName = buildChannelName(
-                NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING,
-                bankAccount.iban
-            )
-            logger.debug("LISTENing on domain " +
-                    "${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" +
-                    " for IBAN: ${bankAccount.iban} with timeout: 
$longPollTimeoutPar." +
-                    " Resulting channel name: $channelName"
-            )
-            val listenHandle = PostgresListenNotify(
-                this.getPgConnection(),
-                channelName
-            )
-            listenHandle.postrgesListen()
-            return@transaction Pair(res, listenHandle)
-        }
-        Pair(res, null)
+    var result: List<TalerIncomingPaymentEntity> = transaction {
+        TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta)
     }
-    /**
-     * Wait here by releasing the execution, or proceed to response if didn't 
sleep.
-     * The right condition only silences the compiler, because when the 
timeout is null
-     * the left condition is always false (no listen-notify object.)
-     */
-    if (resultOrWait.second != null && longPollTimeout != null) {
-        logger.debug("Waiting for NOTIFY, with timeout: $longPollTimeoutPar 
ms")
-        val listenHandle = resultOrWait.second!!
-        val notificationArrived = 
listenHandle.postgresWaitNotification(longPollTimeout)
+    if (result.isNotEmpty() && listenHandle != null)
+        listenHandle.postgresUnlisten()
+
+    if (result.isEmpty() && listenHandle != null && longPollTimeout != null) {
+        logger.debug("Waiting for NOTIFY on channel 
${listenHandle.channelName}," +
+                " with timeout: $longPollTimeoutPar ms")
+        val notificationArrived = coroutineScope {
+            async(Dispatchers.IO) {
+                listenHandle.postgresGetNotifications(longPollTimeout)
+            }.await()
+        }
         if (notificationArrived) {
-            val likelyNewPayments = transaction {
-                // addLogger(StdOutSqlLogger)
-                TalerIncomingPaymentEntity.find { startCmpOp 
}.orderTaler(delta)
-            }
             /**
              * NOTE: the query can still have zero results despite the
              * notification.  That happens when the 'start' URI param is
              * higher than the ID of the new row in the database.  Not
              * an error.
              */
-            resultOrWait = Pair(likelyNewPayments, null)
+            result = transaction {
+                // addLogger(StdOutSqlLogger)
+                TalerIncomingPaymentEntity.find { startCmpOp 
}.orderTaler(delta)
+            }
         }
     }
     /**
      * Whether because of a timeout or a notification or of never slept, here 
it
      * proceeds to the response (== resultOrWait.first IS EFFECTIVE).
      */
-    val maybeNewPayments = resultOrWait.first
+    val maybeNewPayments = result
     if (maybeNewPayments.isNotEmpty()) {
         transaction {
             maybeNewPayments.subList(

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]