gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: wirewatch spring cleaning


From: gnunet
Subject: [taler-exchange] branch master updated: wirewatch spring cleaning
Date: Sat, 21 May 2022 21:07:29 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new 73793729 wirewatch spring cleaning
73793729 is described below

commit 737937291cceddd81e0dac676d3cb909250f628a
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Sat May 21 21:07:24 2022 +0200

    wirewatch spring cleaning
---
 src/exchange/taler-exchange-wirewatch.c | 445 +++++++++++++++++---------------
 1 file changed, 244 insertions(+), 201 deletions(-)

diff --git a/src/exchange/taler-exchange-wirewatch.c 
b/src/exchange/taler-exchange-wirewatch.c
index 898d678a..21d2df15 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -106,6 +106,12 @@ struct WireAccount
    */
   struct GNUNET_TIME_Absolute shard_start_time;
 
+  /**
+   * How long did we take to finish the last shard
+   * for this account?
+   */
+  struct GNUNET_TIME_Relative shard_delay;
+
   /**
    * Name of our job in the shard table.
    */
@@ -117,15 +123,10 @@ struct WireAccount
   unsigned int batch_size;
 
   /**
-   * How much do we incremnt @e batch_size on success?
+   * How much do we increment @e batch_size on success?
    */
   unsigned int batch_thresh;
 
-  /**
-   * How many transactions did we see in the current batch?
-   */
-  unsigned int current_batch_size;
-
   /**
    * Should we delay the next request to the wire plugin a bit?  Set to
    * false if we actually did some work.
@@ -150,12 +151,6 @@ static struct WireAccount *wa_head;
  */
 static struct WireAccount *wa_tail;
 
-/**
- * Wire account we are currently processing.  This would go away
- * if we ever start processing all accounts in parallel.
- */
-static struct WireAccount *wa_pos;
-
 /**
  * Handle to the context for interacting with the bank.
  */
@@ -184,11 +179,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
  */
 static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
 
-/**
- * How long did we take to finish the last shard?
- */
-static struct GNUNET_TIME_Relative shard_delay;
-
 /**
  * Modulus to apply to group shards.  The shard size must ultimately be a
  * multiple of the batch size. Thus, if this is not a multiple of the
@@ -249,9 +239,9 @@ shutdown_task (void *cls)
         wa->started_transaction = false;
       }
       qs = db_plugin->abort_shard (db_plugin->cls,
-                                   wa_pos->job_name,
-                                   wa_pos->shard_start,
-                                   wa_pos->shard_end);
+                                   wa->job_name,
+                                   wa->shard_start,
+                                   wa->shard_end);
       if (qs <= 0)
         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                     "Failed to abort work shard on shutdown\n");
@@ -259,8 +249,6 @@ shutdown_task (void *cls)
       GNUNET_free (wa);
     }
   }
-  wa_pos = NULL;
-
   if (NULL != ctx)
   {
     GNUNET_CURL_fini (ctx);
@@ -359,12 +347,22 @@ exchange_serve_process_config (void)
 
 
 /**
- * Query for incoming wire transfers.
+ * Lock a shard and then begin to query for incoming wire transfers.
  *
- * @param cls NULL
+ * @param cls a `struct WireAccount` to operate on
  */
 static void
-find_transfers (void *cls);
+lock_shard (void *cls);
+
+
+/**
+ * Continue with the credit history of the shard
+ * reserved as @a wa.
+ *
+ * @param[in,out] cls `struct WireAccount *` account with shard to continue 
processing
+ */
+static void
+continue_with_shard (void *cls);
 
 
 /**
@@ -387,23 +385,59 @@ handle_soft_error (struct WireAccount *wa)
                 (unsigned long long) wa->batch_size);
   }
   GNUNET_assert (NULL == task);
-  task = GNUNET_SCHEDULER_add_now (&find_transfers,
-                                   NULL);
+  /* Reset to beginning of transaction, and go again
+     from there. */
+  wa->latest_row_off = wa->batch_start;
+  task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+                                   wa);
 }
 
 
 /**
- * We are done with a shard, move on to the next one.
+ * Schedule the #lock_shard() operation for
+ * @a wa. If @a wa is NULL, start with #wa_head.
+ *
+ * @param wa account to schedule #lock_shard() for,
+ *        possibly NULL (!).
+ */
+static void
+schedule_transfers (struct WireAccount *wa)
+{
+  if (NULL == wa)
+  {
+    wa = wa_head;
+    GNUNET_assert (NULL != wa);
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Will try to lock next shard of %s in %s\n",
+              wa->job_name,
+              GNUNET_STRINGS_relative_time_to_string (
+                GNUNET_TIME_absolute_get_remaining (wa->delayed_until),
+                GNUNET_YES));
+  GNUNET_assert (NULL == task);
+  task = GNUNET_SCHEDULER_add_at (wa->delayed_until,
+                                  &lock_shard,
+                                  wa);
+}
+
+
+/**
+ * We are done with the work that is possible on @a wa right now (and the
+ * transaction was committed, if there was one to commit). Move on to the next
+ * account.
  *
  * @param wa wire account for which we completed a shard
  */
 static void
-shard_completed (struct WireAccount *wa)
+account_completed (struct WireAccount *wa)
 {
-  /* transaction success, update #last_row_off */
-  wa->batch_start = wa->latest_row_off;
-  if (wa->batch_size < MAXIMUM_BATCH_SIZE)
+  GNUNET_assert (! wa->started_transaction);
+  if ( (wa->batch_start + wa->batch_size ==
+        wa->latest_row_off) &&
+       (wa->batch_size < MAXIMUM_BATCH_SIZE) )
   {
+    /* The current batch size worked without serialization
+       issues, and we are allowed to grow. Do so slowly. */
     int delta;
 
     delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4;
@@ -411,45 +445,45 @@ shard_completed (struct WireAccount *wa)
       delta = -delta;
     wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
                                  wa->batch_size + delta + 1);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "Increasing batch size to %llu\n",
                 (unsigned long long) wa->batch_size);
   }
+
   if (wa->delay)
   {
+    /* This account was finished, block this one for the
+       #wirewatch_idle_sleep_interval and move on to the next one. */
     wa->delayed_until
       = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
-    wa_pos = wa_pos->next;
-    if (NULL == wa_pos)
-      wa_pos = wa_head;
-    GNUNET_assert (NULL != wa_pos);
+    wa = wa->next;
   }
-  GNUNET_assert (NULL == task);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Will look for more transfers in %s\n",
-              GNUNET_STRINGS_relative_time_to_string (
-                GNUNET_TIME_absolute_get_remaining (wa_pos->delayed_until),
-                GNUNET_YES));
-  task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
-                                  &find_transfers,
-                                  NULL);
+  schedule_transfers (wa);
 }
 
 
 /**
- * We are finished with the current shard. Update the database, marking the
- * shard as finished.
+ * Check if we are finished with the current shard.  If so, update the
+ * database, marking the shard as finished.
  *
  * @param wa wire account to commit for
- * @return true on success
+ * @return true if we were indeed done with the shard
  */
 static bool
-mark_shard_done (struct WireAccount *wa)
+check_shard_done (struct WireAccount *wa)
 {
   enum GNUNET_DB_QueryStatus qs;
 
   if (wa->shard_end > wa->latest_row_off)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Shard %s (%llu,%llu] at %llu\n",
+                wa->job_name,
+                (unsigned long long) wa->shard_start,
+                (unsigned long long) wa->shard_end,
+                (unsigned long long) wa->latest_row_off);
     return false; /* actually, not done! */
+  }
   /* shard is complete, mark this as well */
   qs = db_plugin->complete_shard (db_plugin->cls,
                                   wa->job_name,
@@ -468,28 +502,25 @@ mark_shard_done (struct WireAccount *wa)
     handle_soft_error (wa);
     return false;
   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
-    /* already existed, ok, let's just continue */
+    GNUNET_break (0);
+    /* Not expected, but let's just continue */
     break;
   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     /* normal case */
-    shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
-
+    wa->shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Completed shard %s (%llu,%llu] after %s\n",
+                wa->job_name,
+                (unsigned long long) wa->shard_start,
+                (unsigned long long) wa->shard_end,
+                GNUNET_STRINGS_relative_time_to_string (wa->shard_delay,
+                                                        GNUNET_YES));
     break;
   }
   return true;
 }
 
 
-/**
- * Continue with the credit history of the shard
- * reserved as @a wa_pos.
- *
- * @param[in,out] wa_pos shard to continue processing
- */
-static void
-continue_with_shard (struct WireAccount *wa_pos);
-
-
 /**
  * We are finished with the current transaction, try
  * to commit and then schedule the next iteration.
@@ -502,8 +533,17 @@ do_commit (struct WireAccount *wa)
   enum GNUNET_DB_QueryStatus qs;
   bool shard_done;
 
+  shard_done = check_shard_done (wa);
   wa->started_transaction = false;
-  shard_done = mark_shard_done (wa);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Committing %s progress (%llu,%llu] at %llu\n (%s)",
+              wa->job_name,
+              (unsigned long long) wa->shard_start,
+              (unsigned long long) wa->shard_end,
+              (unsigned long long) wa->latest_row_off,
+              shard_done
+              ? "shard done"
+              : "shard incomplete");
   qs = db_plugin->commit (db_plugin->cls);
   switch (qs)
   {
@@ -521,7 +561,7 @@ do_commit (struct WireAccount *wa)
     break;
   }
   if (shard_done)
-    shard_completed (wa);
+    account_completed (wa);
   else
     continue_with_shard (wa);
 }
@@ -568,63 +608,67 @@ history_cb (void *cls,
     }
     if (wa->started_transaction)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "End of list. Committing progress!\n");
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                  "End of list. Committing progress on %s of (%llu,%llu]!\n",
+                  wa->job_name,
+                  (unsigned long long) wa->batch_start,
+                  (unsigned long long) wa->latest_row_off);
       do_commit (wa);
+      return GNUNET_OK; /* will be ignored anyway */
     }
-    else
+    /* We did not even start a transaction. */
+    if ( (wa->delay) &&
+         (test_mode) &&
+         (NULL == wa->next) )
     {
-      if ( (wa->delay) &&
-           (test_mode) &&
-           (NULL == wa->next) )
-      {
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                    "Shutdown due to test mode!\n");
-        GNUNET_SCHEDULER_shutdown ();
-        return GNUNET_OK;
-      }
-      else
-      {
-        shard_completed (wa);
-      }
+      /* We exit on idle */
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                  "Shutdown due to test mode!\n");
+      GNUNET_SCHEDULER_shutdown ();
+      return GNUNET_OK;
     }
+    account_completed (wa);
     return GNUNET_OK; /* will be ignored anyway */
   }
+
+  /* We did get 'details' from the bank. Do sanity checks before inserting. */
   if (serial_id < wa->latest_row_off)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Serial ID %llu not monotonic (got %llu before). Failing!\n",
                 (unsigned long long) serial_id,
                 (unsigned long long) wa->latest_row_off);
-    if (wa->started_transaction)
-    {
-      wa->started_transaction = false;
-      db_plugin->rollback (db_plugin->cls);
-    }
     GNUNET_SCHEDULER_shutdown ();
     wa->hh = NULL;
     return GNUNET_SYSERR;
   }
+  /* If we got 'limit' transactions back from the bank,
+     we should not introduce any delay before the next
+     call. */
   if (serial_id >= wa->max_row_off)
     wa->delay = false;
   if (serial_id > wa->shard_end)
   {
-    /* we are done with the current shard, commit and stop this iteration! */
+    /* we are *past* the current shard (likely because the serial_id of the
+       shard_end happens to not exist in the DB). So commit and stop this
+       iteration! */
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "Serial ID %llu past shard end at %llu, ending iteration 
early!\n",
                 (unsigned long long) serial_id,
                 (unsigned long long) wa->shard_end);
-    wa->latest_row_off = serial_id;
+    wa->latest_row_off = serial_id - 1; /* excluding serial_id! */
+    wa->hh = NULL;
     if (wa->started_transaction)
     {
       do_commit (wa);
     }
     else
     {
-      if (mark_shard_done (wa))
-        shard_completed (wa);
+      if (check_shard_done (wa))
+        account_completed (wa);
+      else
+        continue_with_shard (wa);
     }
-    wa->hh = NULL;
     return GNUNET_SYSERR;
   }
   if (! wa->started_transaction)
@@ -640,7 +684,6 @@ history_cb (void *cls,
       wa->hh = NULL;
       return GNUNET_SYSERR;
     }
-    wa_pos->shard_start_time = GNUNET_TIME_absolute_get ();
     wa->started_transaction = true;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -675,6 +718,17 @@ history_cb (void *cls,
     wa->hh = NULL;
     return GNUNET_SYSERR;
   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+    /* Either wirewatch was freshly started after the system was
+       shutdown and we're going over an incomplete shard again
+       after being restarted, or the shard lock period was too
+       short (number of workers set incorrectly?) and a 2nd
+       wirewatcher has been stealing our work while we are still
+       at it. */
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Attempted to import transaction %llu (%s) twice. "
+                "This should happen rarely (if not, ask for support).\n",
+                (unsigned long long) serial_id,
+                wa->job_name);
     /* already existed, ok, let's just continue */
     break;
   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
@@ -686,130 +740,121 @@ history_cb (void *cls,
 }
 
 
-/**
- * Query for incoming wire transfers.
- *
- * @param cls NULL
- */
 static void
-find_transfers (void *cls)
+continue_with_shard (void *cls)
 {
-  enum GNUNET_DB_QueryStatus qs;
+  struct WireAccount *wa = cls;
+  unsigned int limit;
 
-  (void) cls;
-  task = NULL;
-  if (GNUNET_SYSERR ==
-      db_plugin->preflight (db_plugin->cls))
+  limit = GNUNET_MIN (wa->batch_size,
+                      wa->shard_end - wa->latest_row_off);
+  wa->max_row_off = wa->latest_row_off + limit;
+  GNUNET_assert (NULL == wa->hh);
+  wa->hh = TALER_BANK_credit_history (ctx,
+                                      wa->ai->auth,
+                                      wa->latest_row_off,
+                                      limit,
+                                      test_mode
+                                      ? GNUNET_TIME_UNIT_ZERO
+                                      : LONGPOLL_TIMEOUT,
+                                      &history_cb,
+                                      wa);
+  if (NULL == wa->hh)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Failed to obtain database connection!\n");
+                "Failed to start request for account history!\n");
     global_ret = EXIT_FAILURE;
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
-  wa_pos->delay = true;
-  wa_pos->current_batch_size = 0; /* reset counter */
-  if (wa_pos->shard_end <= wa_pos->batch_start)
-  {
-    uint64_t start;
-    uint64_t end;
-    struct GNUNET_TIME_Relative delay;
-    /* advance to next shard */
-
-    if (0 == max_workers)
-      delay = GNUNET_TIME_UNIT_ZERO;
-    else
-      delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
-        GNUNET_CRYPTO_QUALITY_WEAK,
-        4 * GNUNET_TIME_relative_max (
-          wirewatch_idle_sleep_interval,
-          GNUNET_TIME_relative_multiply (shard_delay,
-                                         max_workers)).rel_value_us);
-    qs = db_plugin->begin_shard (db_plugin->cls,
-                                 wa_pos->job_name,
-                                 delay,
-                                 shard_size,
-                                 &start,
-                                 &end);
-    switch (qs)
-    {
-    case GNUNET_DB_STATUS_HARD_ERROR:
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Failed to obtain starting point for montoring from 
database!\n");
-      global_ret = EXIT_FAILURE;
-      GNUNET_SCHEDULER_shutdown ();
-      return;
-    case GNUNET_DB_STATUS_SOFT_ERROR:
-      /* try again */
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Serialization error tying to obtain shard, will try again 
in %s!\n",
-                  GNUNET_STRINGS_relative_time_to_string (
-                    wirewatch_idle_sleep_interval,
-                    GNUNET_YES));
-      task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
-                                           &find_transfers,
-                                           NULL);
-      return;
-    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
-      GNUNET_break (0);
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "No shard available, will try again in %s!\n",
-                  GNUNET_STRINGS_relative_time_to_string (
-                    wirewatch_idle_sleep_interval,
-                    GNUNET_YES));
-      task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
-                                           &find_transfers,
-                                           NULL);
-      return;
-    case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
-      wa_pos->shard_start = start;
-      wa_pos->shard_end = end;
-      wa_pos->batch_start = start;
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                  "Starting with shard at [%llu,%llu) locked for %s\n",
-                  (unsigned long long) start,
-                  (unsigned long long) end,
-                  GNUNET_STRINGS_relative_time_to_string (delay,
-                                                          GNUNET_YES));
-      break;
-    }
-  }
-  wa_pos->latest_row_off = wa_pos->batch_start;
-  continue_with_shard (wa_pos);
 }
 
 
 static void
-continue_with_shard (struct WireAccount *wa_pos)
+lock_shard (void *cls)
 {
-  unsigned int limit;
+  struct WireAccount *wa = cls;
+  enum GNUNET_DB_QueryStatus qs;
+  struct GNUNET_TIME_Relative delay;
 
-  limit = GNUNET_MIN (wa_pos->batch_size,
-                      wa_pos->shard_end - wa_pos->latest_row_off);
-  GNUNET_assert (NULL == wa_pos->hh);
-  wa_pos->max_row_off = wa_pos->latest_row_off + limit - 1;
-  wa_pos->hh = TALER_BANK_credit_history (ctx,
-                                          wa_pos->ai->auth,
-                                          wa_pos->latest_row_off,
-                                          limit,
-                                          test_mode
-                                          ? GNUNET_TIME_UNIT_ZERO
-                                          : LONGPOLL_TIMEOUT,
-                                          &history_cb,
-                                          wa_pos);
-  if (NULL == wa_pos->hh)
+  task = NULL;
+  if (GNUNET_SYSERR ==
+      db_plugin->preflight (db_plugin->cls))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Failed to start request for account history!\n");
-    if (wa_pos->started_transaction)
-    {
-      db_plugin->rollback (db_plugin->cls);
-      wa_pos->started_transaction = false;
-    }
+                "Failed to obtain database connection!\n");
+    global_ret = EXIT_FAILURE;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  /* How long we lock a shard depends on the number of
+     workers expected, and how long we usually took to
+     process a shard. */
+  if (0 == max_workers)
+    delay = GNUNET_TIME_UNIT_ZERO;
+  else
+    delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
+      GNUNET_CRYPTO_QUALITY_WEAK,
+      4 * GNUNET_TIME_relative_max (
+        wirewatch_idle_sleep_interval,
+        GNUNET_TIME_relative_multiply (wa->shard_delay,
+                                       max_workers)).rel_value_us);
+  wa->shard_start_time = GNUNET_TIME_absolute_get ();
+  qs = db_plugin->begin_shard (db_plugin->cls,
+                               wa->job_name,
+                               delay,
+                               shard_size,
+                               &wa->shard_start,
+                               &wa->shard_end);
+  switch (qs)
+  {
+  case GNUNET_DB_STATUS_HARD_ERROR:
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to obtain starting point for montoring from 
database!\n");
     global_ret = EXIT_FAILURE;
     GNUNET_SCHEDULER_shutdown ();
     return;
+  case GNUNET_DB_STATUS_SOFT_ERROR:
+    /* try again */
+    GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Serialization error tying to obtain shard %s, will try again 
in %s!\n",
+                wa->job_name,
+                GNUNET_STRINGS_relative_time_to_string (
+                  wirewatch_idle_sleep_interval,
+                  GNUNET_YES));
+    wa->delayed_until = GNUNET_TIME_relative_to_absolute (
+      wirewatch_idle_sleep_interval);
+    schedule_transfers (wa->next);
+    return;
+  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+    GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "No shard available, will try again for %s in %s!\n",
+                wa->job_name,
+                GNUNET_STRINGS_relative_time_to_string (
+                  wirewatch_idle_sleep_interval,
+                  GNUNET_YES));
+    wa->delayed_until = GNUNET_TIME_relative_to_absolute (
+      wirewatch_idle_sleep_interval);
+    schedule_transfers (wa->next);
+    return;
+  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+    /* continued below */
+    break;
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Starting with shard %s at (%llu,%llu] locked for %s\n",
+              wa->job_name,
+              (unsigned long long) wa->shard_start,
+              (unsigned long long) wa->shard_end,
+              GNUNET_STRINGS_relative_time_to_string (delay,
+                                                      GNUNET_YES));
+  wa->delay = true; /* default is to delay, unless
+                       we find out that we're really busy */
+  wa->batch_start = wa->shard_start;
+  wa->latest_row_off = wa->batch_start;
+  continue_with_shard (wa);
 }
 
 
@@ -838,21 +883,19 @@ run (void *cls,
     global_ret = EXIT_NOTCONFIGURED;
     return;
   }
-  wa_pos = wa_head;
-  GNUNET_assert (NULL != wa_pos);
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
                                  cls);
   ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
                           &rc);
-  rc = GNUNET_CURL_gnunet_rc_create (ctx);
   if (NULL == ctx)
   {
     GNUNET_break (0);
+    GNUNET_SCHEDULER_shutdown ();
     return;
   }
-
-  task = GNUNET_SCHEDULER_add_now (&find_transfers,
-                                   NULL);
+  rc = GNUNET_CURL_gnunet_rc_create (ctx);
+  task = GNUNET_SCHEDULER_add_now (&lock_shard,
+                                   wa_head);
 }
 
 

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]