gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] 01/05: batch modifications


From: gnunet
Subject: [taler-exchange] 01/05: batch modifications
Date: Tue, 06 Dec 2022 14:15:04 +0100

This is an automated email from the git hooks/post-receive script.

joseph-xu pushed a commit to branch master
in repository exchange.

commit b6476ac881cfd3bde41c88b94f6a7538acf76f9c
Author: Joseph <Joseph.xu@efrei.net>
AuthorDate: Wed Nov 23 10:41:57 2022 -0500

    batch modifications
---
 contrib/gana                                 |   2 +-
 src/exchangedb/pg_batch_reserves_in_insert.c | 157 +++++++++++++++++++++------
 src/exchangedb/test_exchangedb_by_j.c        |  53 ++++++++-
 3 files changed, 175 insertions(+), 37 deletions(-)

diff --git a/contrib/gana b/contrib/gana
index 20f8eb7a..212ee0a7 160000
--- a/contrib/gana
+++ b/contrib/gana
@@ -1 +1 @@
-Subproject commit 20f8eb7a72e2160409f0f78264ec5198e9caa193
+Subproject commit 212ee0a78adc43cb5c04d6ea96ccc2fe74fed62b
diff --git a/src/exchangedb/pg_batch_reserves_in_insert.c 
b/src/exchangedb/pg_batch_reserves_in_insert.c
index 455f080d..f40641ed 100644
--- a/src/exchangedb/pg_batch_reserves_in_insert.c
+++ b/src/exchangedb/pg_batch_reserves_in_insert.c
@@ -16,7 +16,7 @@
 /**
  * @file exchangedb/pg_batch_reserves_in_insert.c
  * @brief Implementation of the reserves_in_insert function for Postgres
- * @author JOSEPHxu
+ * @author Joseph XU
  */
 #include "platform.h"
 #include "taler_error_codes.h"
@@ -35,15 +35,12 @@
 
 
 /**
- * Generate event notification for the reserve
- * change.
+ * Generate event notification for the reserve change.
  *
- * @param pg plugin state
  * @param reserve_pub reserve to notfiy on
  */
-static void
-notify_on_reserve (struct PostgresClosure *pg,
-                   const struct TALER_ReservePublicKeyP *reserve_pub)
+static char *
+compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
 {
   struct TALER_ReserveEventP rep = {
     .header.size = htons (sizeof (rep)),
@@ -51,12 +48,7 @@ notify_on_reserve (struct PostgresClosure *pg,
     .reserve_pub = *reserve_pub
   };
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Notifying on reserve!\n");
-  TEH_PG_event_notify (pg,
-                       &rep.header,
-                       NULL,
-                       0);
+  return GNUNET_PG_get_event_notify_channel (&rep.header);
 }
 
 
@@ -75,8 +67,11 @@ TEH_PG_batch_reserves_in_insert (void *cls,
   uint64_t reserve_uuid;
   bool conflicted;
   bool transaction_duplicate;
+  bool need_update = false;
   struct GNUNET_TIME_Timestamp reserve_expiration
     = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
+  bool conflicts[reserves_length];
+  char *notify_s[reserves_length];
 
   PREPARE (pg,
            "reserve_create",
@@ -84,8 +79,8 @@ TEH_PG_batch_reserves_in_insert (void *cls,
            "out_reserve_found AS conflicted"
            ",transaction_duplicate"
            ",ruuid AS reserve_uuid"
-           " FROM exchange_do_batch_reserves_in"
-           " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);");
+           " FROM batch_reserves_insert"
+           " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12);");
   expiry = GNUNET_TIME_absolute_to_timestamp (
     GNUNET_TIME_absolute_add (reserves->execution_time.abs_time,
                               pg->idle_reserve_expiration_time));
@@ -98,34 +93,51 @@ TEH_PG_batch_reserves_in_insert (void *cls,
               GNUNET_STRINGS_relative_time_to_string (
                 pg->idle_reserve_expiration_time,
                 GNUNET_NO));
+
+  {
+    if (GNUNET_OK !=
+        TEH_PG_start_read_committed(pg,
+                                   "READ_COMMITED"))
+    {
+      GNUNET_break (0);
+      return GNUNET_DB_STATUS_HARD_ERROR;
+    }
+  }
   /* Optimistically assume this is a new reserve, create balance for the first
      time; we do this before adding the actual transaction to "reserves_in",
      as for a new reserve it can't be a duplicate 'add' operation, and as
      the 'add' operation needs the reserve entry as a foreign key. */
   for (unsigned int i = 0; i<reserves_length; i++)
+  {
+    const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
+    notify_s[i] = compute_notify_on_reserve (&reserve->reserve_pub);
+  }
+
+  for (unsigned int i=0;i<reserves_length;i++)
   {
     const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
     struct GNUNET_PQ_QueryParam params[] = {
-      GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub), /*$1*/
-      GNUNET_PQ_query_param_timestamp (&expiry),  /*$4*/
-      GNUNET_PQ_query_param_timestamp (&gc),  /*$5*/
-      GNUNET_PQ_query_param_uint64 (&reserve->wire_reference), /*6*/
-      TALER_PQ_query_param_amount (&reserve->balance), /*7+8*/
-      GNUNET_PQ_query_param_string (reserve->exchange_account_name), /*9*/
-      GNUNET_PQ_query_param_timestamp (&reserve->execution_time), /*10*/
-      GNUNET_PQ_query_param_auto_from_type (&h_payto), /*11*/
-      GNUNET_PQ_query_param_string (reserve->sender_account_details),/*12*/
-      GNUNET_PQ_query_param_timestamp (&reserve_expiration),/*13*/
+      GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub),
+      GNUNET_PQ_query_param_timestamp (&expiry),
+      GNUNET_PQ_query_param_timestamp (&gc),
+      GNUNET_PQ_query_param_uint64 (&reserve->wire_reference),
+      TALER_PQ_query_param_amount (&reserve->balance),
+      GNUNET_PQ_query_param_string (reserve->exchange_account_name),
+      GNUNET_PQ_query_param_timestamp (&reserve->execution_time),
+      GNUNET_PQ_query_param_auto_from_type (&h_payto),
+      GNUNET_PQ_query_param_string (reserve->sender_account_details),
+      GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+      GNUNET_PQ_query_param_string (notify_s[i]),
       GNUNET_PQ_query_param_end
     };
-    /* We should get all our results into results[]*/
+
     struct GNUNET_PQ_ResultSpec rs[] = {
-      GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
-                                    &reserve_uuid),
       GNUNET_PQ_result_spec_bool ("conflicted",
                                   &conflicted),
       GNUNET_PQ_result_spec_bool ("transaction_duplicate",
                                   &transaction_duplicate),
+      GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
+                                    &reserve_uuid),
       GNUNET_PQ_result_spec_end
     };
 
@@ -137,15 +149,92 @@ TEH_PG_batch_reserves_in_insert (void *cls,
                                                     params,
                                                     rs);
     if (qs1 < 0)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "Failed to create reserves (%d)\n",
+                  qs1);
       return qs1;
-    notify_on_reserve (pg,
-                       &reserve->reserve_pub);
-    GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1);
-    results[i] = (transaction_duplicate)
+    }
+   GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1);
+   results[i] = (transaction_duplicate)
       ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
       : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-    if ( (! conflicted) && transaction_duplicate)
-      TEH_PG_rollback (pg);
+   conflicts[i] = conflicted;
+   if (!conflicts[i] && transaction_duplicate)
+   {
+     GNUNET_break (0);
+     TEH_PG_rollback (pg);
+     return GNUNET_DB_STATUS_HARD_ERROR;
+   }
+   need_update |= conflicted;
   }
+  // commit
+  {
+    enum GNUNET_DB_QueryStatus cs;
+
+    cs = TEH_PG_commit (pg);
+    if (cs < 0)
+      return cs;
+  }
+
+  if (!need_update)
+    goto exit;
+  // begin serializable
+  {
+    if (GNUNET_OK !=
+        TEH_PG_start(pg,
+                     "reserve-insert-continued"))
+    {
+      GNUNET_break (0);
+      return GNUNET_DB_STATUS_HARD_ERROR;
+    }
+  }
+
+  enum GNUNET_DB_QueryStatus qs2;
+  PREPARE (pg,
+           "reserves_in_add_transaction",
+           "SELECT batch_reserves_update"
+           " ($1,$2,$3,$4,$5,$6,$7,$8,$9);");
+  for (unsigned int i=0;i<reserves_length;i++)
+  {
+    if (! conflicts[i])
+      continue;
+    {
+      const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
+      struct GNUNET_PQ_QueryParam params[] = {
+        GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub),
+        GNUNET_PQ_query_param_timestamp (&expiry),
+        GNUNET_PQ_query_param_uint64 (&reserve->wire_reference),
+        TALER_PQ_query_param_amount (&reserve->balance),
+        GNUNET_PQ_query_param_string (reserve->exchange_account_name),
+        GNUNET_PQ_query_param_bool (conflicted),
+        GNUNET_PQ_query_param_auto_from_type (&h_payto),
+        GNUNET_PQ_query_param_string (notify_s[i]),
+        GNUNET_PQ_query_param_end
+      };
+      qs2 = GNUNET_PQ_eval_prepared_non_select (pg->conn,
+                                                "reserves_in_add_transaction",
+                                                params);
+      if (qs2<0)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    "Failed to update reserves (%d)\n",
+                    qs2);
+        return qs2;
+      }
+    }
+  }
+  {
+    enum GNUNET_DB_QueryStatus cs;
+
+    cs = TEH_PG_commit (pg);
+    if (cs < 0)
+      return cs;
+  }
+
+ exit:
+  for (unsigned int i=0;i<reserves_length;i++)
+    GNUNET_free (notify_s[i]);
+
   return reserves_length;
 }
diff --git a/src/exchangedb/test_exchangedb_by_j.c 
b/src/exchangedb/test_exchangedb_by_j.c
index 43f47167..0f252a09 100644
--- a/src/exchangedb/test_exchangedb_by_j.c
+++ b/src/exchangedb/test_exchangedb_by_j.c
@@ -91,9 +91,22 @@ run (void *cls)
     result = 77;
     goto cleanup;
   }
+<<<<<<< HEAD
   for (unsigned int i = 0; i< 7; i++)
+=======
+  if (GNUNET_OK !=
+      plugin->setup_partitions (plugin->cls,
+                                num_partitions))
   {
-    static unsigned int batches[] = {1, 1, 2, 4, 16, 64, 256};
+    GNUNET_break (0);
+    result = 77;
+    goto cleanup;
+  }
+
+  for (unsigned int i = 0; i< 8; i++)
+>>>>>>> 26922c6d (batch modifications)
+  {
+    static unsigned int batches[] = {1, 1,0, 2, 4, 16, 64, 256};
     const char *sndr = "payto://x-taler-bank/localhost:8080/1";
     struct TALER_Amount value;
     unsigned int batch_size = batches[i];
@@ -101,6 +114,7 @@ run (void *cls)
     struct GNUNET_TIME_Timestamp ts;
     struct GNUNET_TIME_Relative duration;
     struct TALER_EXCHANGEDB_ReserveInInfo reserves[batch_size];
+    /*   struct TALER_EXCHANGEDB_ReserveInInfo reserves2[batch_size];*/
     enum GNUNET_DB_QueryStatus results[batch_size];
     GNUNET_assert (GNUNET_OK ==
                    TALER_string_to_amount (CURRENCY ":1.000010",
@@ -109,9 +123,14 @@ run (void *cls)
     ts = GNUNET_TIME_timestamp_get ();
     for (unsigned int r = 0; r<10; r++)
     {
+<<<<<<< HEAD
       plugin->start_read_committed (plugin->cls,
                                     "test_by_j");
 
+=======
+       plugin->start (plugin->cls,
+         "test_by_exchange_j");
+>>>>>>> 26922c6d (batch modifications)
       for (unsigned int k = 0; k<batch_size; k++)
       {
         RND_BLK (&reserves[k].reserve_pub);
@@ -120,6 +139,7 @@ run (void *cls)
         reserves[k].sender_account_details = sndr;
         reserves[k].exchange_account_name = "name";
         reserves[k].wire_reference = k;
+<<<<<<< HEAD
 
       }
       FAILIF (batch_size !=
@@ -129,13 +149,42 @@ run (void *cls)
                                                 results));
 
       plugin->commit (plugin->cls);
+=======
+      }
+      FAILIF (batch_size !=
+            plugin->batch_reserves_in_insert (plugin->cls,
+                                              reserves,
+                                              batch_size,
+                                              results));
+      /*plugin->commit (plugin->cls);*/
+>>>>>>> 26922c6d (batch modifications)
     }
+    /*
+    for (unsigned int s=0;s<10;s++)
+    {
+      for (unsigned int k = 0; k<batch_size; k++)
+      {
+        RND_BLK (&reserves2[k].reserve_pub);
+        reserves2[k].balance = value;
+        reserves2[k].execution_time = ts;
+        reserves2[k].sender_account_details = sndr;
+        reserves2[k].exchange_account_name = "name";
+        reserves2[k].wire_reference = k;
+      }
+      FAILIF (batch_size !=
+            plugin->batch_reserves_in_insert (plugin->cls,
+                                              reserves2,
+                                              batch_size,
+                                              results));
+                                              }*/
+
     duration = GNUNET_TIME_absolute_get_duration (now);
     fprintf (stdout,
              "for a batchsize equal to %d it took %s\n",
              batch_size,
              GNUNET_STRINGS_relative_time_to_string (duration,
                                                      GNUNET_NO) );
+
   }
   result = 0;
 drop:
@@ -155,7 +204,6 @@ main (int argc,
   char *config_filename;
   char *testname;
   struct GNUNET_CONFIGURATION_Handle *cfg;
-
   (void) argc;
   result = -1;
   if (NULL == (plugin_name = strrchr (argv[0], (int) '-')))
@@ -163,6 +211,7 @@ main (int argc,
     GNUNET_break (0);
     return -1;
   }
+
   GNUNET_log_setup (argv[0],
                     "WARNING",
                     NULL);

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]