gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] branch master updated (325b16de -> cc87817b)


From: gnunet
Subject: [libeufin] branch master updated (325b16de -> cc87817b)
Date: Fri, 10 Mar 2023 17:44:30 +0100

This is an automated email from the git hooks/post-receive script.

ms pushed a change to branch master
in repository libeufin.

    from 325b16de fix Taler withdraw URI formation
     new 1eec7fe3 Testing.
     new 26d382d7 Introducing TWG tests.
     new 8188af59 removing useless test
     new 889b88fa Implementing database notifications.
     new cc87817b Long-polling.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 cli/tests/launch_services.sh                       |  44 +++++-
 cli/tests/wire-transfer.sh                         |  13 ++
 cli/tests/wirewatch.conf                           |  11 ++
 nexus/build.gradle                                 |   4 +
 nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt  |  12 +-
 nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt    |   3 +-
 nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt | 161 +++++++++++++++++----
 .../tech/libeufin/nexus/server/NexusServer.kt      |   1 -
 nexus/src/test/kotlin/DBTest.kt                    |  31 ----
 nexus/src/test/kotlin/MakeEnv.kt                   |  36 ++++-
 nexus/src/test/kotlin/TalerTest.kt                 |  42 ++++--
 util/src/main/kotlin/DB.kt                         | 112 ++++++++------
 util/src/main/kotlin/HTTP.kt                       |   2 +-
 13 files changed, 333 insertions(+), 139 deletions(-)
 create mode 100755 cli/tests/wire-transfer.sh
 create mode 100644 cli/tests/wirewatch.conf
 delete mode 100644 nexus/src/test/kotlin/DBTest.kt

diff --git a/cli/tests/launch_services.sh b/cli/tests/launch_services.sh
index fdd5a74d..30ba44e1 100755
--- a/cli/tests/launch_services.sh
+++ b/cli/tests/launch_services.sh
@@ -4,6 +4,8 @@
 # EBICS pair, in order to try CLI commands.
 set -eu
 
+WITH_TASKS=1
+# WITH_TASKS=0
 function exit_cleanup()
 {
   echo "Running exit-cleanup"
@@ -20,15 +22,23 @@ echo RUNNING SANDBOX-NEXUS EBICS PAIR
 jq --version &> /dev/null || (echo "'jq' command not found"; exit 77)
 curl --version &> /dev/null || (echo "'curl' command not found"; exit 77)
 
-DB_PATH=/tmp/libeufin-cli-test.sqlite3
-export LIBEUFIN_SANDBOX_DB_CONNECTION=jdbc:sqlite:$DB_PATH
+SQLITE_FILE_PATH=/tmp/libeufin-cli-test.sqlite3
+DB_CONN=jdbc:postgresql://localhost:5432/taler?user=$(whoami)
+# export LIBEUFIN_SANDBOX_DB_CONNECTION=jdbc:sqlite:$SQLITE_FILE_PATH
+export LIBEUFIN_SANDBOX_DB_CONNECTION=$DB_CONN
 
 echo -n Delete previous data...
-rm -f $DB_PATH
+rm -f $SQLITE_FILE_PATH
 echo DONE
 echo -n Configure the default demobank with MANA...
 libeufin-sandbox config --with-signup-bonus --currency MANA default
 echo DONE
+echo -n Setting the default exchange at Sandbox...
+libeufin-sandbox \
+  default-exchange \
+  "https://exchange.example.com/"; \
+  "payto://iban/NOTUSED"
+echo DONE
 echo -n Start the bank...
 export LIBEUFIN_SANDBOX_ADMIN_PASSWORD=foo
 libeufin-sandbox serve &> sandbox.log &
@@ -38,7 +48,7 @@ echo -n Wait for the bank...
 curl --max-time 2 --retry-connrefused --retry-delay 1 --retry 10 
http://localhost:5000/ &> /dev/null
 echo DONE
 echo -n Make one superuser at Nexus...
-export LIBEUFIN_NEXUS_DB_CONNECTION=jdbc:sqlite:$DB_PATH
+export LIBEUFIN_NEXUS_DB_CONNECTION=$DB_CONN
 libeufin-nexus superuser test-user --password x
 echo DONE
 echo -n Launching Nexus...
@@ -100,7 +110,27 @@ libeufin-cli facades \
   --currency TESTKUDOS --facade-name test-facade \
   wwwconn www-nexus
 echo OK
-echo -n "Ticking, to let statements be generated..."
-libeufin-sandbox camt053tick
-echo OK
+
+if test 1 = $WITH_TASKS; then
+  echo -n Creating submit transactions task..
+  libeufin-cli accounts task-schedule \
+    --task-type submit \
+    --task-name www-payments \
+    --task-cronspec "* * *" \
+    www-nexus || true
+  # Tries every second.  Ask C52
+  echo OK
+  echo -n Creating fetch transactions task..
+  # Not idempotent, FIXME #7739
+  libeufin-cli accounts task-schedule \
+    --task-type fetch \
+    --task-name www-history \
+    --task-cronspec "* * *" \
+    --task-param-level report \
+    --task-param-range-type latest \
+    www-nexus || true
+  echo OK
+else
+  echo NOT creating backound tasks!
+fi
 read -p "Press Enter to terminate..."
diff --git a/cli/tests/wire-transfer.sh b/cli/tests/wire-transfer.sh
new file mode 100755
index 00000000..c29aae30
--- /dev/null
+++ b/cli/tests/wire-transfer.sh
@@ -0,0 +1,13 @@
+#!/bin/bash
+
+set -eu
+# Pays the www Sandbox user, usually owned by the Exchange.
+RESERVE_PUB=$(gnunet-ecc -g1 /tmp/www &> /dev/null && gnunet-ecc -p /tmp/www)
+# Must match the one from launch_services.sh
+export 
LIBEUFIN_SANDBOX_DB_CONNECTION=jdbc:postgresql://localhost:5432/taler?user=$(whoami)
+libeufin-sandbox \
+  make-transaction \
+    --credit-account=www \
+    --debit-account=admin MANA:2 \
+   $RESERVE_PUB 
+echo Now paid reserve $RESERVE_PUB
diff --git a/cli/tests/wirewatch.conf b/cli/tests/wirewatch.conf
new file mode 100644
index 00000000..b82f9e11
--- /dev/null
+++ b/cli/tests/wirewatch.conf
@@ -0,0 +1,11 @@
+[exchange-accountcredentials-1]
+WIRE_GATEWAY_URL = 
"http://localhost:5001/facades/test-facade/taler-wire-gateway/";
+WIRE_GATEWAY_AUTH_METHOD = basic
+USERNAME = test-user
+PASSWORD = x
+
+[exchange-account-1]
+# What is the account URL?
+PAYTO_URI = "payto://iban/NOTUSED"
+ENABLE_DEBIT = YES
+ENABLE_CREDIT = YES
diff --git a/nexus/build.gradle b/nexus/build.gradle
index dafe94c6..39e896c5 100644
--- a/nexus/build.gradle
+++ b/nexus/build.gradle
@@ -105,6 +105,10 @@ test {
     testLogging.showStandardStreams = false
     environment.put("LIBEUFIN_SANDBOX_ADMIN_PASSWORD", "foo")
     environment.put("LIBEUFIN_CASHOUT_TEST_TAN", "foo")
+    environment.put(
+            "LIBEUFIN_NEXUS_DB_CONNECTION",
+            
"jdbc:postgresql://localhost:5432/libeufincheck?user=${System.properties["user.name"]}"
+    )
 }
 
 application {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt
index 3decc126..bdaa0ec3 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt
@@ -60,13 +60,13 @@ fun findPermission(p: Permission): NexusPermissionEntity? {
  * Require that the authenticated user has at least one of the listed 
permissions.
  *
  * Throws a NexusError if the authenticated user for the request doesn't have 
any of
- * listed the permissions.
+ * listed the permissions.  It returns the username of the authorized user.
  */
-fun ApplicationRequest.requirePermission(vararg perms: PermissionQuery) {
-    transaction {
+fun ApplicationRequest.requirePermission(vararg perms: PermissionQuery): 
String {
+    val username = transaction {
         val user = authenticateRequest(this@requirePermission)
         if (user.superuser) {
-            return@transaction
+            return@transaction user.username
         }
         var foundPermission = false
         for (pr in perms) {
@@ -82,8 +82,10 @@ fun ApplicationRequest.requirePermission(vararg perms: 
PermissionQuery) {
                 perms.joinToString(" | ") { "${it.resourceId} 
${it.resourceType} ${it.permissionName}" }
             throw NexusError(
                 HttpStatusCode.Forbidden,
-                "User ${user.id.value} has insufficient permissions (needs 
$possiblePerms."
+                "User ${user.username} has insufficient permissions (needs 
$possiblePerms)."
             )
         }
+        user.username
     }
+    return username
 }
\ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
index b1ddbd17..f8ce7bd4 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
@@ -29,6 +29,7 @@ import tech.libeufin.nexus.iso20022.EntryStatus
 import tech.libeufin.util.EbicsInitState
 import java.sql.Connection
 
+
 /**
  * This table holds the values that exchange gave to issue a payment,
  * plus a reference to the prepared pain.001 version of.  Note that
@@ -101,7 +102,6 @@ object TalerIncomingPaymentsTable : LongIdTable() {
     val debtorPaytoUri = text("incomingPaytoUri")
 }
 
-
 class TalerIncomingPaymentEntity(id: EntityID<Long>) : LongEntity(id) {
     companion object : 
LongEntityClass<TalerIncomingPaymentEntity>(TalerIncomingPaymentsTable)
 
@@ -343,7 +343,6 @@ class EbicsSubscriberEntity(id: EntityID<Long>) : 
LongEntity(id) {
 }
 
 object NexusUsersTable : LongIdTable() {
-
     val username = text("username")
     val passwordHash = text("password")
     val superuser = bool("superuser")
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
index 52dca293..c9e73662 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
@@ -19,6 +19,7 @@
 
 package tech.libeufin.nexus
 
+import UtilError
 import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
 import io.ktor.server.application.ApplicationCall
 import io.ktor.server.application.call
@@ -33,16 +34,24 @@ import io.ktor.server.routing.Route
 import io.ktor.server.routing.get
 import io.ktor.server.routing.post
 import io.ktor.server.util.*
+import io.ktor.util.*
+import kotlinx.coroutines.*
+import net.taler.wallet.crypto.Base32Crockford
 import org.jetbrains.exposed.dao.Entity
 import org.jetbrains.exposed.dao.id.IdTable
 import org.jetbrains.exposed.sql.*
+import org.jetbrains.exposed.sql.transactions.TransactionManager
 import org.jetbrains.exposed.sql.transactions.transaction
+import org.postgresql.jdbc.PgConnection
 import tech.libeufin.nexus.bankaccount.addPaymentInitiation
 import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
+import tech.libeufin.nexus.bankaccount.getBankAccount
 import tech.libeufin.nexus.iso20022.*
 import tech.libeufin.nexus.server.*
 import tech.libeufin.util.*
 import java.net.URL
+import java.util.concurrent.atomic.AtomicReference
+import javax.xml.crypto.Data
 import kotlin.math.abs
 import kotlin.math.min
 
@@ -66,7 +75,8 @@ data class TalerTransferResponse(
 )
 
 /**
- * History accounting data structures
+ * History accounting data structures, typically
+ * used to build JSON responses.
  */
 data class TalerIncomingBankTransaction(
     val row_id: Long,
@@ -122,21 +132,16 @@ fun getComparisonOperator(delta: Int, start: Long, table: 
IdTable<Long>): Op<Boo
 }
 
 fun expectLong(param: String?): Long {
-    if (param == null) {
-        throw EbicsProtocolError(HttpStatusCode.BadRequest, "'$param' is not 
Long")
-    }
-    return try {
-        param.toLong()
-    } catch (e: Exception) {
-        throw EbicsProtocolError(HttpStatusCode.BadRequest, "'$param' is not 
Long")
+    if (param == null) throw badRequest("'$param' is not Long")
+    return try { param.toLong() } catch (e: Exception) {
+        throw badRequest("'$param' is not Long")
     }
 }
 
-/** Helper handling 'start' being optional and its dependence on 'delta'.  */
+// Helper handling 'start' being optional and its dependence on 'delta'.
 fun handleStartArgument(start: String?, delta: Int): Long {
     if (start == null) {
-        if (delta >= 0)
-            return -1
+        if (delta >= 0) return -1
         return Long.MAX_VALUE
     }
     return expectLong(start)
@@ -307,8 +312,35 @@ fun talerFilter(
         reservePublicKey = reservePub
         timestampMs = System.currentTimeMillis()
         debtorPaytoUri = buildIbanPaytoUri(
-            debtorIban, debtorAgent.bic, debtorName
+            debtorIban,
+            debtorAgent.bic,
+            debtorName
+        )
+    }
+    val dbTx = TransactionManager.currentOrNull() ?: throw NexusError(
+        HttpStatusCode.InternalServerError,
+        "talerFilter(): unexpected execution out of a DB transaction"
+    )
+    /**
+     * Without COMMIT here, the woken up LISTENer won't
+     * find the record in the database.
+     */
+    dbTx.commit()
+    // Only supporting Postgres' NOTIFY.
+    if (dbTx.isPostgres()) {
+        val channelName = buildChannelName(
+            NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING,
+            payment.bankAccount.iban
+        )
+        logger.debug("NOTIFYing on domain" +
+                " ${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" +
+                " for IBAN: ${payment.bankAccount.iban}.  Resulting channel" +
+                " name: $channelName.")
+        val notifyHandle = PostgresListenNotify(
+            dbTx.getPgConnection(),
+            channelName
         )
+        notifyHandle.postgresNotify()
     }
 }
 
@@ -445,27 +477,106 @@ private suspend fun historyOutgoing(call: 
ApplicationCall) {
     )
 }
 
-/**
- * Handle a /taler-wire-gateway/history/incoming request.
- */
+// Handle a /taler-wire-gateway/history/incoming request.
 private suspend fun historyIncoming(call: ApplicationCall) {
     val facadeId = expectNonNull(call.parameters["fcid"])
-    call.request.requirePermission(PermissionQuery("facade", facadeId, 
"facade.talerwiregateway.history"))
+    val username = call.request.requirePermission(
+        PermissionQuery(
+            "facade",
+            facadeId,
+            "facade.talerwiregateway.history"
+        )
+    )
+    val longPollTimeoutPar = call.parameters["long_poll_ms"]
+    val longPollTimeout = if (longPollTimeoutPar != null) {
+        val longPollTimeoutValue = try { longPollTimeoutPar.toLong() }
+        catch (e: Exception) {
+            throw badRequest("long_poll_ms value is invalid")
+        }
+        longPollTimeoutValue
+    } else null
     val param = call.expectUrlParameter("delta")
-    val delta: Int = try {
-        param.toInt()
-    } catch (e: Exception) {
+    val delta: Int = try { param.toInt() } catch (e: Exception) {
         throw EbicsProtocolError(HttpStatusCode.BadRequest, "'${param}' is not 
Int")
     }
     val start: Long = 
handleStartArgument(call.request.queryParameters["start"], delta)
     val history = TalerIncomingHistory()
     val startCmpOp = getComparisonOperator(delta, start, 
TalerIncomingPaymentsTable)
-    transaction {
-        val orderedPayments = TalerIncomingPaymentEntity.find {
-            startCmpOp
-        }.orderTaler(delta)
-        if (orderedPayments.isNotEmpty()) {
-            orderedPayments.subList(0, min(abs(delta), 
orderedPayments.size)).forEach {
+    /**
+     * The following block checks first for results, and then LISTEN
+     * _only if_ the client gave the long_poll_ms parameter.
+     */
+    var resultOrWait: Pair<
+            List<TalerIncomingPaymentEntity>,
+            PostgresListenNotify?
+            > = transaction {
+        val res = TalerIncomingPaymentEntity.find { startCmpOp 
}.orderTaler(delta)
+        // Register to Postgres notifications, if no results arrived.
+        if (res.isEmpty() && this.isPostgres() && longPollTimeout != null) {
+            // Getting the IBAN to build the unique channel name.
+            val f = FacadeEntity.find { FacadesTable.facadeName eq facadeId 
}.firstOrNull()
+            if (f == null) throw internalServerError(
+                "Handling request for facade '$facadeId', but that's not found 
in the database."
+            )
+            val fState = FacadeStateEntity.find {
+                FacadeStateTable.facade eq f.id.value
+            }.firstOrNull()
+            if (fState == null) throw internalServerError(
+                "Facade '$facadeId' exist but has no state."
+            )
+            val bankAccount = getBankAccount(fState.bankAccount)
+            val channelName = buildChannelName(
+                NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING,
+                bankAccount.iban
+            )
+            logger.debug("LISTENing on domain " +
+                    "${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" +
+                    " for IBAN: ${bankAccount.iban} with timeout: 
$longPollTimeoutPar." +
+                    " Resulting channel name: $channelName"
+            )
+            val listenHandle = PostgresListenNotify(
+                this.getPgConnection(),
+                channelName
+            )
+            listenHandle.postrgesListen()
+            return@transaction Pair(res, listenHandle)
+        }
+        Pair(res, null)
+    }
+    /**
+     * Wait here by releasing the execution, or proceed to response if didn't 
sleep.
+     * The right condition only silences the compiler, because when the 
timeout is null
+     * the left condition is always false (no listen-notify object.)
+     */
+    if (resultOrWait.second != null && longPollTimeout != null) {
+        logger.debug("Waiting for NOTIFY, with timeout: $longPollTimeoutPar 
ms")
+        val listenHandle = resultOrWait.second!!
+        val notificationArrived = 
listenHandle.postgresWaitNotification(longPollTimeout)
+        if (notificationArrived) {
+            val likelyNewPayments = transaction {
+                // addLogger(StdOutSqlLogger)
+                TalerIncomingPaymentEntity.find { startCmpOp 
}.orderTaler(delta)
+            }
+            /**
+             * NOTE: the query can still have zero results despite the
+             * notification.  That happens when the 'start' URI param is
+             * higher than the ID of the new row in the database.  Not
+             * an error.
+             */
+            resultOrWait = Pair(likelyNewPayments, null)
+        }
+    }
+    /**
+     * Whether because of a timeout or a notification or of never slept, here 
it
+     * proceeds to the response (== resultOrWait.first IS EFFECTIVE).
+     */
+    val maybeNewPayments = resultOrWait.first
+    if (maybeNewPayments.isNotEmpty()) {
+        transaction {
+            maybeNewPayments.subList(
+                0,
+                min(abs(delta), maybeNewPayments.size)
+            ).forEach {
                 history.incoming_transactions.add(
                     TalerIncomingBankTransaction(
                         // Rounded timestamp
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
index 79ab9adf..2632100f 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
@@ -57,7 +57,6 @@ import tech.libeufin.util.*
 import java.net.BindException
 import java.net.URLEncoder
 import kotlin.system.exitProcess
-
 /**
  * Return facade state depending on the type.
  */
diff --git a/nexus/src/test/kotlin/DBTest.kt b/nexus/src/test/kotlin/DBTest.kt
deleted file mode 100644
index ff044a3e..00000000
--- a/nexus/src/test/kotlin/DBTest.kt
+++ /dev/null
@@ -1,31 +0,0 @@
-package tech.libeufin.nexus
-
-import kotlinx.coroutines.*
-import org.jetbrains.exposed.dao.flushCache
-import org.jetbrains.exposed.exceptions.ExposedSQLException
-import org.jetbrains.exposed.sql.*
-import org.jetbrains.exposed.sql.statements.api.ExposedConnection
-import org.jetbrains.exposed.sql.transactions.TransactionManager
-import org.jetbrains.exposed.sql.transactions.transaction
-import org.jetbrains.exposed.sql.transactions.transactionManager
-import org.junit.Test
-import org.postgresql.PGConnection
-import org.postgresql.jdbc.PgConnection
-import tech.libeufin.util.PostgresListenNotify
-import withTestDatabase
-import java.sql.Connection
-import java.sql.DriverManager
-
-class DBTest {
-
-    // Testing database notifications (only postgresql)
-    @Test
-    fun notifications() {
-        val genCon = 
DriverManager.getConnection("jdbc:postgresql://localhost:5432/talercheck?user=job")
-        val pgCon = genCon.unwrap(org.postgresql.jdbc.PgConnection::class.java)
-        val ln = PostgresListenNotify(pgCon, "x")
-        ln.postrgesListen()
-        ln.postgresNotify()
-        runBlocking { ln.postgresWaitNotification(2000L) }
-    }
-}
\ No newline at end of file
diff --git a/nexus/src/test/kotlin/MakeEnv.kt b/nexus/src/test/kotlin/MakeEnv.kt
index 8d391151..31cc5704 100644
--- a/nexus/src/test/kotlin/MakeEnv.kt
+++ b/nexus/src/test/kotlin/MakeEnv.kt
@@ -25,7 +25,8 @@ data class EbicsKeys(
 const val TEST_DB_FILE = "/tmp/nexus-test.sqlite3"
 // const val TEST_DB_CONN = "jdbc:sqlite:$TEST_DB_FILE"
 // Convenience DB connection to switch to Postgresql:
-const val TEST_DB_CONN = "jdbc:postgresql://localhost:5432/talercheck?user=job"
+val currentUser = System.getProperty("user.name")
+val TEST_DB_CONN = 
"jdbc:postgresql://localhost:5432/libeufincheck?user=$currentUser"
 val BANK_IBAN = getIban()
 val FOO_USER_IBAN = getIban()
 val BAR_USER_IBAN = getIban()
@@ -296,7 +297,7 @@ fun withSandboxTestDatabase(f: () -> Unit) {
     }
 }
 
-fun talerIncomingForFoo(currency: String, value: String, subject: String) {
+fun newNexusBankTransaction(currency: String, value: String, subject: String) {
     transaction {
         val inc = NexusBankTransactionEntity.new {
             bankAccount = NexusBankAccountEntity.findByName("foo")!!
@@ -314,12 +315,12 @@ fun talerIncomingForFoo(currency: String, value: String, 
subject: String) {
                 )
             )
         }
-        TalerIncomingPaymentEntity.new {
+        /*TalerIncomingPaymentEntity.new {
             payment = inc
             reservePublicKey = "mock"
             timestampMs = 0L
             debtorPaytoUri = "mock"
-        }
+        }*/
     }
 }
 
@@ -350,9 +351,30 @@ fun genNexusIncomingPayment(
                         creditDebitIndicator = CreditDebitIndicator.CRDT,
                         details = TransactionDetails(
                             unstructuredRemittanceInformation = subject,
-                            debtor = null,
-                            debtorAccount = null,
-                            debtorAgent = null,
+                            debtor = PartyIdentification(
+                                name = "Mock Payer",
+                                countryOfResidence = null,
+                                privateId = null,
+                                organizationId = null,
+                                postalAddress = null,
+                                otherId = null
+                            ),
+                            debtorAccount = CashAccount(
+                                iban = "MOCK-IBAN",
+                                name = null,
+                                currency = null,
+                                otherId = null
+                            ),
+                            debtorAgent = AgentIdentification(
+                                bic = "MOCK-BIC",
+                                lei = null,
+                                clearingSystemMemberId = null,
+                                clearingSystemCode = null,
+                                proprietaryClearingSystemCode = null,
+                                postalAddress = null,
+                                otherId = null,
+                                name = null
+                            ),
                             creditor = null,
                             creditorAccount = null,
                             creditorAgent = null,
diff --git a/nexus/src/test/kotlin/TalerTest.kt 
b/nexus/src/test/kotlin/TalerTest.kt
index 36c656f1..7eadeb25 100644
--- a/nexus/src/test/kotlin/TalerTest.kt
+++ b/nexus/src/test/kotlin/TalerTest.kt
@@ -1,45 +1,57 @@
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.ktor.client.call.*
 import io.ktor.client.plugins.*
 import io.ktor.client.request.*
 import io.ktor.client.statement.*
 import io.ktor.http.*
 import io.ktor.server.testing.*
 import kotlinx.coroutines.*
-import kotlinx.coroutines.future.future
-import org.jetbrains.exposed.sql.transactions.TransactionManager
 import org.jetbrains.exposed.sql.transactions.transaction
 import org.junit.Ignore
 import org.junit.Test
-import tech.libeufin.nexus.*
 import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
-import tech.libeufin.nexus.iso20022.EntryStatus
+import tech.libeufin.nexus.ingestFacadeTransactions
+import tech.libeufin.nexus.maybeTalerRefunds
 import tech.libeufin.nexus.server.*
+import tech.libeufin.nexus.talerFilter
 import tech.libeufin.sandbox.sandboxApp
 import tech.libeufin.sandbox.wireTransfer
+import tech.libeufin.util.NotificationsChannelDomains
 
 // This class tests the features related to the Taler facade.
 class TalerTest {
+    val mapper = ObjectMapper()
 
-    /**
-     * Tests that a client (normally represented by the wire-watch)
-     * gets incoming transactions.
-     */
+    // Checking that a correct wire transfer (with Taler-compatible subject)
+    // is responded by the Taler facade.
     @Test
     fun historyIncomingTest() {
+        val reservePub = "GX5H5RME193FDRCM1HZKERXXQ2K21KH7788CKQM8X6MYKYRBP8F0"
         withNexusAndSandboxUser {
             testApplication {
                 application(nexusApp)
                 runBlocking {
-                    val future = async {
-                        client.get(
-                            
"/facades/taler/taler-wire-gateway/history/incoming?delta=5"
-                        ) {
-                            expectSuccess = true
+                    launch {
+                        val r = 
client.get("/facades/taler/taler-wire-gateway/history/incoming?delta=5") {
+                            expectSuccess = false
                             contentType(ContentType.Application.Json)
                             basicAuth("foo", "foo")
                         }
+                        val j = mapper.readTree(r.readBytes())
+                        val reservePubFromTwg = 
j.get("incoming_transactions").get(0).get("reserve_pub").asText()
+                        assert(reservePubFromTwg == reservePub)
                     }
-                    talerIncomingForFoo("KUDOS", "10", "Invalid")
+                    newNexusBankTransaction(
+                        "KUDOS",
+                        "10",
+                        reservePub
+                    )
+                    ingestFacadeTransactions(
+                        "foo", // bank account local to Nexus.
+                        "taler-wire-gateway",
+                        ::talerFilter,
+                        ::maybeTalerRefunds
+                    )
                 }
             }
         }
diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index 63a213e2..beb5d12f 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -20,16 +20,44 @@
 package tech.libeufin.util
 import UtilError
 import io.ktor.http.*
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.withContext
 import logger
+import net.taler.wallet.crypto.Base32Crockford
+import org.jetbrains.exposed.sql.Transaction
 import org.postgresql.jdbc.PgConnection
-import java.lang.Long.max
 
-// This class abstracts the LISTEN/NOTIFY construct supported
-class PostgresListenNotify(
-    private val conn: PgConnection,
-    private val channel: String
-) {
+fun Transaction.isPostgres(): Boolean {
+    return this.db.vendor == "postgresql"
+}
+
+fun Transaction.getPgConnection(): PgConnection {
+    if (!this.isPostgres()) throw UtilError(
+        HttpStatusCode.InternalServerError,
+        "Unexpected non-postgresql connection: ${this.db.vendor}"
+    )
+    return this.db.connector().connection as PgConnection
+}
+
+// Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance.
+enum class NotificationsChannelDomains(val value: Int) {
+    LIBEUFIN_TALER_INCOMING(3000)
+}
+
+// Helper that builds a LISTEN-NOTIFY channel name.
+fun buildChannelName(
+    domain: NotificationsChannelDomains,
+    iban: String,
+    separator: String = "_"
+): String {
+    val channelElements = "${domain.value}$separator$iban"
+    return 
"X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}"
+}
+
+// This class abstracts Postgres' LISTEN/NOTIFY.
+// FIXME: find facts where Exposed provides always a live 'conn'.
+class PostgresListenNotify(val conn: PgConnection, val channel: String) {
     fun postrgesListen() {
         val stmt = conn.createStatement()
         stmt.execute("LISTEN $channel")
@@ -41,44 +69,38 @@ class PostgresListenNotify(
         stmt.close()
     }
 
-    suspend fun postgresWaitNotification(timeoutMs: Long) {
-        // Splits the checks into 10ms chunks.
-        val sleepTimeMs = 10L
-        var notificationFound = false
-        val iterations = timeoutMs / sleepTimeMs
-        for (i in 0..iterations) {
-            val maybeNotifications = conn.notifications
-            // If a notification arrived, stop fetching for it.
-            if (maybeNotifications.isNotEmpty()) {
-                // Double checking that the channel is correct.
-                // Notification(s) arrived, double-check channel name.
-                maybeNotifications.forEach {
-                    if (it.name != channel) {
-                        throw UtilError(
-                            statusCode = HttpStatusCode.InternalServerError,
-                            reason = "Listener got wrong notification.  
Expected: $channel, but got: ${it.name}"
-                        )
-                    }
-                }
-                notificationFound = true
-                break
-            }
-            /* Notification didn't arrive, release the thread and
-             * retry in the next chunk.  */
-            delay(sleepTimeMs)
-        }
+    fun postgresUnlisten() {
+        val stmt = conn.createStatement()
+        stmt.execute("UNLISTEN $channel")
+        stmt.close()
+    }
 
-        if (!notificationFound) {
-            throw UtilError(
-                statusCode = HttpStatusCode.NotFound,
-                reason = "Timeout expired for notification on channel 
$channel",
-                ec = LibeufinErrorCode.LIBEUFIN_EC_TIMEOUT_EXPIRED
+    /**
+     * Asks Postgres for notifications with a timeout.  Returns
+     * true when there have been, false otherwise.
+     */
+    fun postgresWaitNotification(timeoutMs: Long): Boolean {
+        if (timeoutMs == 0L)
+            logger.warn("Database notification checker has timeout == 0," +
+                    " that waits FOREVER until a notification arrives."
             )
+        val maybeNotifications = conn.getNotifications(timeoutMs.toInt())
+
+        /**
+         * This check works around the apparent API inconsistency
+         * where instead of returning null, a empty array is given
+         * back when there have been no notifications.
+         */
+        val noResultWorkaround = maybeNotifications.isEmpty()
+        /*if (noResultWorkaround) {
+            logger.warn("JDBC+Postgres: empty array from getNotifications() 
despite docs suggest null.")
+        }*/
+        if (maybeNotifications == null || noResultWorkaround) return false
+
+        for (n in maybeNotifications) {
+            if (n.name.lowercase() != this.channel.lowercase())
+                throw internalServerError("Channel ${this.channel} got 
notified from ${n.name}!")
         }
-        /* Notification arrived.  In this current version
-         * we don't pass any data to the caller; the channel
-         * name itself means that the awaited information arrived.
-         * */
-        return
-        }
-    }
\ No newline at end of file
+        return true
+    }
+}
\ No newline at end of file
diff --git a/util/src/main/kotlin/HTTP.kt b/util/src/main/kotlin/HTTP.kt
index 0f70c7e4..6763db79 100644
--- a/util/src/main/kotlin/HTTP.kt
+++ b/util/src/main/kotlin/HTTP.kt
@@ -88,7 +88,7 @@ fun ApplicationRequest.getBaseUrl(): String {
         logger.info("Building X-Forwarded- base URL")
 
         // FIXME: should tolerate a missing X-Forwarded-Prefix.
-        var prefix: String = this.headers.get("X-Forwarded-Prefix")
+        var prefix: String = this.headers["X-Forwarded-Prefix"]
             ?: throw internalServerError("Reverse proxy did not define 
X-Forwarded-Prefix")
         if (!prefix.endsWith("/"))
             prefix += "/"

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]