gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: taler-auditor-sync WiP


From: gnunet
Subject: [taler-exchange] branch master updated: taler-auditor-sync WiP
Date: Mon, 11 Jan 2021 23:02:25 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new 2518da8f taler-auditor-sync WiP
2518da8f is described below

commit 2518da8f4581d868dd8eafabc54e6b2ddcc998d4
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Mon Jan 11 23:02:22 2021 +0100

    taler-auditor-sync WiP
---
 src/auditor/taler-auditor-sync.c      | 203 +++++++++++++++++++++++++++++-----
 src/include/taler_exchangedb_plugin.h |   2 +-
 2 files changed, 177 insertions(+), 28 deletions(-)

diff --git a/src/auditor/taler-auditor-sync.c b/src/auditor/taler-auditor-sync.c
index a76c9a0b..fae3d218 100644
--- a/src/auditor/taler-auditor-sync.c
+++ b/src/auditor/taler-auditor-sync.c
@@ -52,6 +52,182 @@ static unsigned int transaction_size = 512;
  */
 static unsigned int actual_size;
 
+static struct Table
+{
+  enum TALER_EXCHANGEDB_ReplicatedTable rt;
+  uint64_t start_serial;
+  uint64_t end_serial;
+  bool end;
+} tables[] = {
+  { .rt = TALER_EXCHANGEDB_RT_DENOMINATIONS},
+  { .rt = TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS},
+  { .rt = TALER_EXCHANGEDB_RT_RESERVES},
+  { .rt = TALER_EXCHANGEDB_RT_RESERVES_IN},
+  { .rt = TALER_EXCHANGEDB_RT_RESERVES_CLOSE},
+  { .rt = TALER_EXCHANGEDB_RT_RESERVES_OUT},
+  { .rt = TALER_EXCHANGEDB_RT_AUDITORS},
+  { .rt = TALER_EXCHANGEDB_RT_AUDITOR_DENOM_SIGS},
+  { .rt = TALER_EXCHANGEDB_RT_EXCHANGE_SIGN_KEYS},
+  { .rt = TALER_EXCHANGEDB_RT_SIGNKEY_REVOCATIONS},
+  { .rt = TALER_EXCHANGEDB_RT_KNOWN_COINS},
+  { .rt = TALER_EXCHANGEDB_RT_REFRESH_COMMITMENTS},
+  { .rt = TALER_EXCHANGEDB_RT_REFRESH_REVEALED_COINS},
+  { .rt = TALER_EXCHANGEDB_RT_REFRESH_TRANSFER_KEYS},
+  { .rt = TALER_EXCHANGEDB_RT_DEPOSITS},
+  { .rt = TALER_EXCHANGEDB_RT_REFUNDS},
+  { .rt = TALER_EXCHANGEDB_RT_WIRE_OUT},
+  { .rt = TALER_EXCHANGEDB_RT_AGGREGATION_TRACKING},
+  { .rt = TALER_EXCHANGEDB_RT_WIRE_FEE},
+  { .rt = TALER_EXCHANGEDB_RT_RECOUP},
+  { .rt = TALER_EXCHANGEDB_RT_RECOUP_REFRESH },
+  { .end = true }
+};
+
+
+/**
+ * Function called on data to replicate in the auditor's database.
+ *
+ * @param cls closure
+ * @param td record from an exchange table
+ * @return #GNUNET_OK to continue to iterate,
+ *         #GNUNET_SYSERR to fail with an error
+ */
+static int
+do_insert (void *cls,
+           const struct TALER_EXCHANGEDB_TableData *td)
+{
+  // FIXME ...
+}
+
+
+/**
+ * Run one replication transaction.
+ *
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR to rollback
+ */
+static int
+transact (struct TALER_EXCHANGEDB_Session *ss,
+          struct TALER_EXCHANGEDB_Session *ds)
+{
+  if (GNUNET_OK !=
+      src->start (src->cls,
+                  ss,
+                  "lookup src serials"))
+    return GNUNET_SYSERR;
+  for (unsigned int i = 0; ! tables[i].end; i++)
+    src->lookup_serial_by_table (src->cls,
+                                 ss,
+                                 tables[i].rt,
+                                 &tables[i].end_serial);
+  if (GNUNET_OK !=
+      src->commit (src->cls,
+                   ss))
+    return GNUNET_SYSERR;
+  if (GNUNET_OK !=
+      dst->start (src->cls,
+                  ds,
+                  "lookup dst serials"))
+    return GNUNET_SYSERR;
+  for (unsigned int i = 0; ! tables[i].end; i++)
+    dst->lookup_serial_by_table (dst->cls,
+                                 ds,
+                                 tables[i].rt,
+                                 &tables[i].start_serial);
+  if (GNUNET_OK !=
+      dst->commit (dst->cls,
+                   ds))
+    return GNUNET_SYSERR;
+  for (unsigned int i = 0; ! tables[i].end; i++)
+  {
+    printf ("%d ", i);
+    fflush (stdout);
+    while (tables[i].start_serial < tables[i].end_serial)
+    {
+      enum GNUNET_DB_QueryStatus qs;
+
+      if (GNUNET_OK !=
+          src->start (src->cls,
+                      ss,
+                      "copy table (src)"))
+        return GNUNET_SYSERR;
+      if (GNUNET_OK !=
+          dst->start (dst->cls,
+                      ds,
+                      "copy table (dst)"))
+        return GNUNET_SYSERR;
+      qs = src->lookup_records_by_table (src->cls,
+                                         ss,
+                                         tables[i].rt,
+                                         tables[i].start_serial,
+                                         &do_insert,
+                                         ds);
+      if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+      {
+        global_ret = 3;
+        return GNUNET_SYSERR;
+      }
+      if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+        return GNUNET_SYSERR; /* will retry */
+      if (0 == qs)
+      {
+        GNUNET_break (0); /* should be impossible */
+        global_ret = 4;
+        return GNUNET_SYSERR;
+      }
+    }
+  }
+  /* we do not care about conflicting UPDATEs to src table, so safe to just 
rollback */
+  src->rollback (src->cls,
+                 ss);
+  if (GNUNET_OK !=
+      dst->commit (dst->cls,
+                   ds))
+    return GNUNET_SYSERR;
+  printf ("\n");
+  return GNUNET_OK;
+}
+
+
+/**
+ * Task to do the actual synchronization work.
+ *
+ * @param cls NULL, unused
+ */
+static void
+do_sync (void *cls)
+{
+  struct GNUNET_TIME_Relative delay;
+  struct TALER_EXCHANGEDB_Session *ss;
+  struct TALER_EXCHANGEDB_Session *ds;
+
+  sync_task = NULL;
+  actual_size = 0;
+  ss = src->get_session (src->cls);
+  ds = dst->get_session (dst->cls);
+  if (GNUNET_OK !=
+      transact (ss,
+                ds))
+  {
+    src->rollback (src->cls,
+                   ss);
+    dst->rollback (dst->cls,
+                   ds);
+  }
+  if (0 != global_ret)
+    return;
+  if (actual_size < transaction_size / 2)
+  {
+    delay = GNUNET_TIME_STD_BACKOFF (delay);
+  }
+  else if (actual_size >= transaction_size)
+  {
+    delay = GNUNET_TIME_UNIT_ZERO;
+  }
+  sync_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                            &do_sync,
+                                            NULL);
+}
+
 
 /**
  * Set an option of type 'char *' from the command line with
@@ -150,33 +326,6 @@ load_config (const char *cfgfile)
 }
 
 
-/**
- * Task to do the actual synchronization work.
- *
- * @param cls NULL, unused
- */
-static void
-do_sync (void *cls)
-{
-  struct GNUNET_TIME_Relative delay;
-
-  sync_task = NULL;
-  actual_size = 0;
-  // FIXME: do real work here!
-  if (actual_size < transaction_size / 2)
-  {
-    delay = GNUNET_TIME_STD_BACKOFF (delay);
-  }
-  else if (actual_size >= transaction_size)
-  {
-    delay = GNUNET_TIME_UNIT_ZERO;
-  }
-  sync_task = GNUNET_SCHEDULER_add_delayed (delay,
-                                            &do_sync,
-                                            NULL);
-}
-
-
 /**
  * Shutdown task.
  *
diff --git a/src/include/taler_exchangedb_plugin.h 
b/src/include/taler_exchangedb_plugin.h
index 8286260c..92163bb5 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -3773,7 +3773,7 @@ struct TALER_EXCHANGEDB_Plugin
    * @param session a session
    * @param table table for which we should return the serial
    * @param[out] latest serial number in use
-   * @return transaction status code, GNUNET_DB_STATUS_HARD_ERROR if
+   * @return transaction status code, #GNUNET_DB_STATUS_HARD_ERROR if
    *         @a table does not have a serial number
    */
   enum GNUNET_DB_QueryStatus

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]