gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: -add skeleton logic for purse ex


From: gnunet
Subject: [taler-exchange] branch master updated: -add skeleton logic for purse expiration
Date: Mon, 16 May 2022 15:43:45 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new 02716c40 -add skeleton logic for purse expiration
02716c40 is described below

commit 02716c4084c76630f35a5fcc4d2ef4e17d7e1b00
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Mon May 16 15:43:40 2022 +0200

    -add skeleton logic for purse expiration
---
 src/exchange/.gitignore                            |   1 +
 src/exchange/Makefile.am                           |  14 ++
 src/exchange/exchange.conf                         |   7 +
 ...r-exchange-router.c => taler-exchange-expire.c} | 256 ++++++++++++---------
 src/exchange/taler-exchange-router.c               |   3 +-
 src/exchangedb/plugin_exchangedb_postgres.c        |  21 ++
 src/include/taler_exchangedb_plugin.h              |  15 ++
 7 files changed, 201 insertions(+), 116 deletions(-)

diff --git a/src/exchange/.gitignore b/src/exchange/.gitignore
index c12ee011..bcfdb7e8 100644
--- a/src/exchange/.gitignore
+++ b/src/exchange/.gitignore
@@ -10,3 +10,4 @@ test_taler_exchange_httpd_home/.config/taler/account-1.json
 taler-exchange-closer
 taler-exchange-transfer
 taler-exchange-router
+taler-exchange-expire
diff --git a/src/exchange/Makefile.am b/src/exchange/Makefile.am
index 21cc1228..24fb7e3d 100644
--- a/src/exchange/Makefile.am
+++ b/src/exchange/Makefile.am
@@ -19,6 +19,7 @@ pkgcfg_DATA = \
 bin_PROGRAMS = \
   taler-exchange-aggregator \
   taler-exchange-closer \
+  taler-exchange-expire \
   taler-exchange-httpd \
   taler-exchange-router \
   taler-exchange-transfer \
@@ -51,6 +52,19 @@ taler_exchange_closer_LDADD = \
   -lgnunetutil \
   $(XLIB)
 
+taler_exchange_expire_SOURCES = \
+  taler-exchange-expire.c
+taler_exchange_expire_LDADD = \
+  $(LIBGCRYPT_LIBS) \
+  $(top_builddir)/src/json/libtalerjson.la \
+  $(top_builddir)/src/util/libtalerutil.la \
+  $(top_builddir)/src/bank-lib/libtalerbank.la \
+  $(top_builddir)/src/exchangedb/libtalerexchangedb.la \
+  -ljansson \
+  -lgnunetcurl \
+  -lgnunetutil \
+  $(XLIB)
+
 taler_exchange_router_SOURCES = \
   taler-exchange-router.c
 taler_exchange_router_LDADD = \
diff --git a/src/exchange/exchange.conf b/src/exchange/exchange.conf
index 9c68208a..df136d9e 100644
--- a/src/exchange/exchange.conf
+++ b/src/exchange/exchange.conf
@@ -47,8 +47,15 @@ BASE_URL = http://localhost:8081/
 # How long should the aggregator sleep if it has nothing to do?
 AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s
 
+# FIXME: document!
 ROUTER_IDLE_SLEEP_INTERVAL = 60 s
 
+# How big is an individual shard to be processed
+# by taler-exchange-expire (in time).  It may take
+# this much time for an expired purse to be really
+# cleaned up and the coins refunded.
+EXPIRE_SHARD_SIZE = 5 m
+
 # How long should the transfer tool
 # sleep if it has nothing to do?
 TRANSFER_IDLE_SLEEP_INTERVAL = 60 s
diff --git a/src/exchange/taler-exchange-router.c 
b/src/exchange/taler-exchange-expire.c
similarity index 62%
copy from src/exchange/taler-exchange-router.c
copy to src/exchange/taler-exchange-expire.c
index ca4499e3..c7691930 100644
--- a/src/exchange/taler-exchange-router.c
+++ b/src/exchange/taler-exchange-expire.c
@@ -15,12 +15,8 @@
 */
 
 /**
- * @file taler-exchange-router.c
- * @brief Process that routes P2P payments. Responsible for
- *   (1) refunding coins in unmerged purses, (2) merging purses into local 
reserves;
- *   (3) aggregating remote payments into the respective wad transfers.
- *   Execution of actual wad transfers is still to be done by 
taler-exchange-transfer,
- *   and watching for incoming wad transfers is done by 
taler-exchange-wirewatch.
+ * @file taler-exchange-expire.c
+ * @brief Process that cleans up expired purses
  * @author Christian Grothoff
  */
 #include "platform.h"
@@ -33,9 +29,6 @@
 #include "taler_bank_service.h"
 
 
-// FIXME: revisit how (and if) we do sharding!
-// Maybe use different helpers for wads than
-// for local purses?!
 /**
  * Work shard we are processing.
  */
@@ -50,12 +43,12 @@ struct Shard
   /**
    * Starting row of the shard.
    */
-  uint32_t shard_start;
+  struct GNUNET_TIME_Absolute shard_start;
 
   /**
    * Inclusive end row of the shard.
    */
-  uint32_t shard_end;
+  struct GNUNET_TIME_Absolute shard_end;
 
   /**
    * Number of starting points found in the shard.
@@ -65,26 +58,6 @@ struct Shard
 };
 
 
-/**
- * What is the smallest unit we support for wire transfers?
- * We will need to round down to a multiple of this amount.
- */
-static struct TALER_Amount currency_round_unit;
-
-/**
- * What is the base URL of this exchange?  Used in the
- * wire transfer subjects so that merchants and governments
- * can ask for the list of aggregated deposits.
- */
-static char *exchange_base_url;
-
-/**
- * Set to #GNUNET_YES if this exchange does not support KYC checks
- * and thus P2P transfers are to be made regardless of the
- * KYC status of the target reserve.
- */
-static int kyc_off;
-
 /**
  * The exchange's configuration.
  */
@@ -103,14 +76,14 @@ static struct GNUNET_SCHEDULER_Task *task;
 /**
  * How long should we sleep when idle before trying to find more work?
  */
-static struct GNUNET_TIME_Relative router_idle_sleep_interval;
+static struct GNUNET_TIME_Relative expire_idle_sleep_interval;
 
 /**
  * How big are the shards we are processing? Is an inclusive offset, so every
  * shard ranges from [X,X+shard_size) exclusive.  So a shard covers
- * shard_size slots.  The maximum value for shard_size is INT32_MAX+1.
+ * shard_size slots.
  */
-static uint32_t shard_size;
+static struct GNUNET_TIME_Relative shard_size;
 
 /**
  * Value to return from main(). 0 on success, non-zero on errors.
@@ -150,54 +123,29 @@ shutdown_task (void *cls)
   }
   TALER_EXCHANGEDB_plugin_unload (db_plugin);
   db_plugin = NULL;
-  TALER_EXCHANGEDB_unload_accounts ();
   cfg = NULL;
 }
 
 
 /**
- * Parse the configuration for wirewatch.
+ * Parse the configuration for expire.
  *
  * @return #GNUNET_OK on success
  */
 static enum GNUNET_GenericReturnValue
-parse_wirewatch_config (void)
+parse_expire_config (void)
 {
-  if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_string (cfg,
-                                             "exchange",
-                                             "BASE_URL",
-                                             &exchange_base_url))
-  {
-    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
-                               "exchange",
-                               "BASE_URL");
-    return GNUNET_SYSERR;
-  }
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_time (cfg,
                                            "exchange",
-                                           "ROUTER_IDLE_SLEEP_INTERVAL",
-                                           &router_idle_sleep_interval))
+                                           "EXPIRE_IDLE_SLEEP_INTERVAL",
+                                           &expire_idle_sleep_interval))
   {
     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
                                "exchange",
-                               "ROUTER_IDLE_SLEEP_INTERVAL");
+                               "EXPIRE_IDLE_SLEEP_INTERVAL");
     return GNUNET_SYSERR;
   }
-  if ( (GNUNET_OK !=
-        TALER_config_get_amount (cfg,
-                                 "taler",
-                                 "CURRENCY_ROUND_UNIT",
-                                 &currency_round_unit)) ||
-       ( (0 != currency_round_unit.fraction) &&
-         (0 != currency_round_unit.value) ) )
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Need non-zero value in section `TALER' under 
`CURRENCY_ROUND_UNIT'\n");
-    return GNUNET_SYSERR;
-  }
-
   if (NULL ==
       (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
   {
@@ -205,16 +153,6 @@ parse_wirewatch_config (void)
                 "Failed to initialize DB subsystem\n");
     return GNUNET_SYSERR;
   }
-  if (GNUNET_OK !=
-      TALER_EXCHANGEDB_load_accounts (cfg,
-                                      TALER_EXCHANGEDB_ALO_DEBIT))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "No wire accounts configured for debit!\n");
-    TALER_EXCHANGEDB_plugin_unload (db_plugin);
-    db_plugin = NULL;
-    return GNUNET_SYSERR;
-  }
   return GNUNET_OK;
 }
 
@@ -251,11 +189,11 @@ release_shard (struct Shard *s)
 {
   enum GNUNET_DB_QueryStatus qs;
 
-  qs = db_plugin->release_revolving_shard (
+  qs = db_plugin->complete_shard (
     db_plugin->cls,
-    "router",
-    s->shard_start,
-    s->shard_end);
+    "expire",
+    s->shard_start.abs_value_us,
+    s->shard_end.abs_value_us);
   GNUNET_free (s);
   switch (qs)
   {
@@ -269,25 +207,112 @@ release_shard (struct Shard *s)
     /* Strange, but let's just continue */
     break;
   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Purse expiration shard completed with %llu purses\n",
+                (unsigned long long) s->work_counter);
     /* normal case */
     break;
   }
 }
 
 
+/**
+ * Release lock on shard @a s in the database due to an abort of the
+ * operation.  On error, terminates this process.
+ *
+ * @param[in] s shard to free (and memory to release)
+ */
 static void
-run_routing (void *cls)
+abort_shard (struct Shard *s)
+{
+  enum GNUNET_DB_QueryStatus qs;
+
+  qs = db_plugin->abort_shard (db_plugin->cls,
+                               "expire",
+                               s->shard_start.abs_value_us,
+                               s->shard_end.abs_value_us);
+  if (0 >= qs)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to abort shard (%d)!\n",
+                qs);
+    global_ret = EXIT_FAILURE;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+}
+
+
+/**
+ * Main function that processes the work in one shard.
+ *
+ * @param[in] cls a `struct Shard` to process
+ */
+static void
+run_expire (void *cls)
 {
   struct Shard *s = cls;
+  enum GNUNET_DB_QueryStatus qs;
 
   task = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Checking for ready P2P transfers to route\n");
-  // FIXME: do actual work here!
-  commit_or_warn ();
-  release_shard (s);
-  task = GNUNET_SCHEDULER_add_now (&run_shard,
-                                   NULL);
+              "Checking for expired purses\n");
+  if (GNUNET_SYSERR ==
+      db_plugin->preflight (db_plugin->cls))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to obtain database connection!\n");
+    global_ret = EXIT_FAILURE;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  if (db_plugin->start (db_plugin->cls,
+                        "expire-purse"))
+  {
+    global_ret = EXIT_FAILURE;
+    db_plugin->rollback (db_plugin->cls);
+    abort_shard (s);
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  qs = db_plugin->expire_purse (db_plugin->cls,
+                                s->shard_start,
+                                s->shard_end);
+  switch (qs)
+  {
+  case GNUNET_DB_STATUS_HARD_ERROR:
+    GNUNET_break (0);
+    global_ret = EXIT_FAILURE;
+    db_plugin->rollback (db_plugin->cls);
+    abort_shard (s);
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  case GNUNET_DB_STATUS_SOFT_ERROR:
+    db_plugin->rollback (db_plugin->cls);
+    abort_shard (s);
+    task = GNUNET_SCHEDULER_add_now (&run_shard,
+                                     NULL);
+    return;
+  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+    if (0 > commit_or_warn ())
+    {
+      db_plugin->rollback (db_plugin->cls);
+      abort_shard (s);
+    }
+    else
+    {
+      release_shard (s);
+    }
+    task = GNUNET_SCHEDULER_add_now (&run_shard,
+                                     NULL);
+    return;
+  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+    /* commit, and go again immediately */
+    s->work_counter++;
+    (void) commit_or_warn ();
+    task = GNUNET_SCHEDULER_add_now (&run_expire,
+                                     s);
+  }
 }
 
 
@@ -315,12 +340,12 @@ run_shard (void *cls)
   }
   s = GNUNET_new (struct Shard);
   s->start_time = GNUNET_TIME_timestamp_get ();
-  qs = db_plugin->begin_revolving_shard (db_plugin->cls,
-                                         "router",
-                                         shard_size,
-                                         1U + INT32_MAX,
-                                         &s->shard_start,
-                                         &s->shard_end);
+  qs = db_plugin->begin_shard (db_plugin->cls,
+                               "expire",
+                               shard_size,
+                               shard_size.rel_value_us,
+                               &s->shard_start.abs_value_us,
+                               &s->shard_end.abs_value_us);
   if (0 >= qs)
   {
     if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@@ -343,11 +368,24 @@ run_shard (void *cls)
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
+  if (GNUNET_TIME_absolute_is_future (s->shard_end))
+  {
+    task = GNUNET_SCHEDULER_add_at (s->shard_end,
+                                    &run_shard,
+                                    NULL);
+    abort_shard (s);
+    return;
+  }
+  /* If this is a first-time run, we immediately
+     try to catch up with the present */
+  if (GNUNET_TIME_absolute_is_zero (s->shard_start))
+    s->shard_end = GNUNET_TIME_absolute_get ();
+
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Starting shard [%u:%u]!\n",
-              (unsigned int) s->shard_start,
-              (unsigned int) s->shard_end);
-  task = GNUNET_SCHEDULER_add_now (&run_routing,
+              "Starting shard [%llu:%llu]!\n",
+              (unsigned long long) s->shard_start.abs_value_us,
+              (unsigned long long) s->shard_end.abs_value_us);
+  task = GNUNET_SCHEDULER_add_now (&run_expire,
                                    s);
 }
 
@@ -366,33 +404,27 @@ run (void *cls,
      const char *cfgfile,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
-  unsigned long long ass;
   (void) cls;
   (void) args;
   (void) cfgfile;
 
   cfg = c;
-  if (GNUNET_OK != parse_wirewatch_config ())
+  if (GNUNET_OK != parse_expire_config ())
   {
     cfg = NULL;
     global_ret = EXIT_NOTCONFIGURED;
     return;
   }
   if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_number (cfg,
-                                             "exchange",
-                                             "ROUTER_SHARD_SIZE",
-                                             &ass))
+      GNUNET_CONFIGURATION_get_value_time (cfg,
+                                           "exchange",
+                                           "EXPIRE_SHARD_SIZE",
+                                           &shard_size))
   {
     cfg = NULL;
     global_ret = EXIT_NOTCONFIGURED;
     return;
   }
-  if ( (0 == ass) ||
-       (ass > INT32_MAX) )
-    shard_size = 1U + INT32_MAX;
-  else
-    shard_size = (uint32_t) ass;
   GNUNET_assert (NULL == task);
   task = GNUNET_SCHEDULER_add_now (&run_shard,
                                    NULL);
@@ -402,7 +434,7 @@ run (void *cls,
 
 
 /**
- * The main function of the taler-exchange-router.
+ * The main function of the taler-exchange-expire.
  *
  * @param argc number of arguments from the command line
  * @param argv command line arguments
@@ -419,10 +451,6 @@ main (int argc,
                                "test",
                                "run in test mode and exit when idle",
                                &test_mode),
-    GNUNET_GETOPT_option_flag ('y',
-                               "kyc-off",
-                               "perform wire transfers without KYC checks",
-                               &kyc_off),
     GNUNET_GETOPT_OPTION_END
   };
   enum GNUNET_GenericReturnValue ret;
@@ -434,9 +462,9 @@ main (int argc,
   TALER_OS_init ();
   ret = GNUNET_PROGRAM_run (
     argc, argv,
-    "taler-exchange-router",
+    "taler-exchange-expire",
     gettext_noop (
-      "background process that routes P2P transfers"),
+      "background process that expires purses"),
     options,
     &run, NULL);
   GNUNET_free_nz ((void *) argv);
@@ -448,4 +476,4 @@ main (int argc,
 }
 
 
-/* end of taler-exchange-router.c */
+/* end of taler-exchange-expire.c */
diff --git a/src/exchange/taler-exchange-router.c 
b/src/exchange/taler-exchange-router.c
index ca4499e3..0816dfdb 100644
--- a/src/exchange/taler-exchange-router.c
+++ b/src/exchange/taler-exchange-router.c
@@ -17,8 +17,7 @@
 /**
  * @file taler-exchange-router.c
  * @brief Process that routes P2P payments. Responsible for
- *   (1) refunding coins in unmerged purses, (2) merging purses into local 
reserves;
- *   (3) aggregating remote payments into the respective wad transfers.
+ *   aggregating remote payments into the respective wad transfers.
  *   Execution of actual wad transfers is still to be done by 
taler-exchange-transfer,
  *   and watching for incoming wad transfers is done by 
taler-exchange-wirewatch.
  * @author Christian Grothoff
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c 
b/src/exchangedb/plugin_exchangedb_postgres.c
index 4d5efb9c..ab282f4f 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -13554,6 +13554,25 @@ postgres_insert_purse_request (
 }
 
 
+/**
+ * Function called to clean up one expired purse.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param start_time select purse expired after this time
+ * @param end_time select purse expired before this time
+ * @return transaction status code (#GNUNET_DB_STATUS_SUCCESS_NO_RESULTS if no 
purse expired in the given time interval).
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_expire_purse (
+  void *cls,
+  struct GNUNET_TIME_Absolute start_time,
+  struct GNUNET_TIME_Absolute end_time)
+{
+  GNUNET_break (0);
+  return GNUNET_DB_STATUS_HARD_ERROR;
+}
+
+
 /**
  * Function called to obtain information about a purse.
  *
@@ -14283,6 +14302,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
     = &postgres_insert_purse_request;
   plugin->select_purse_request
     = &postgres_select_purse_request;
+  plugin->expire_purse
+    = &postgres_expire_purse;
   plugin->select_purse
     = &postgres_select_purse;
   plugin->select_purse_by_merge_pub
diff --git a/src/include/taler_exchangedb_plugin.h 
b/src/include/taler_exchangedb_plugin.h
index 52e684f6..213fe114 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -4603,6 +4603,21 @@ struct TALER_EXCHANGEDB_Plugin
     bool *in_conflict);
 
 
+  /**
+   * Function called to clean up one expired purse.
+   *
+   * @param cls the @e cls of this struct with the plugin-specific state
+   * @param start_time select purse expired after this time
+   * @param end_time select purse expired before this time
+   * @return transaction status code (#GNUNET_DB_STATUS_SUCCESS_NO_RESULTS if 
no purse expired in the given time interval).
+   */
+  enum GNUNET_DB_QueryStatus
+  (*expire_purse)(
+    void *cls,
+    struct GNUNET_TIME_Absolute start_time,
+    struct GNUNET_TIME_Absolute end_time);
+
+
   /**
    * Function called to obtain information about a purse.
    *

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]