gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] branch master updated: introduce locking to avoid ce


From: gnunet
Subject: [taler-wallet-core] branch master updated: introduce locking to avoid certain simultaneous requests to the exchange
Date: Tue, 18 Aug 2020 14:53:15 +0200

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

The following commit(s) were added to refs/heads/master by this push:
     new e2f7bc79 introduce locking to avoid certain simultaneous requests to 
the exchange
e2f7bc79 is described below

commit e2f7bc79cd4326f769f370f230041101a099d98c
Author: Florian Dold <florian.dold@gmail.com>
AuthorDate: Tue Aug 18 18:23:06 2020 +0530

    introduce locking to avoid certain simultaneous requests to the exchange
---
 packages/taler-wallet-core/src/operations/pay.ts   |  6 ++-
 .../taler-wallet-core/src/operations/refresh.ts    | 34 ++++++++------
 packages/taler-wallet-core/src/operations/state.ts | 53 +++++++++++++++++++++-
 3 files changed, 77 insertions(+), 16 deletions(-)

diff --git a/packages/taler-wallet-core/src/operations/pay.ts 
b/packages/taler-wallet-core/src/operations/pay.ts
index c3dd6c6d..996e1c1e 100644
--- a/packages/taler-wallet-core/src/operations/pay.ts
+++ b/packages/taler-wallet-core/src/operations/pay.ts
@@ -57,7 +57,7 @@ import { Logger } from "../util/logging";
 import { parsePayUri } from "../util/taleruri";
 import { guardOperationException, OperationFailedError } from "./errors";
 import { createRefreshGroup, getTotalRefreshCost } from "./refresh";
-import { InternalWalletState } from "./state";
+import { InternalWalletState, EXCHANGE_COINS_LOCK } from "./state";
 import { getTimestampNow, timestampAddDuration } from "../util/time";
 import { strcmp, canonicalJson } from "../util/helpers";
 import { readSuccessResponseJsonOrThrow } from "../util/http";
@@ -796,7 +796,9 @@ export async function submitPay(
 
   logger.trace("making pay request", JSON.stringify(reqBody, undefined, 2));
 
-  const resp = await ws.http.postJson(payUrl, reqBody);
+  const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], () =>
+    ws.http.postJson(payUrl, reqBody),
+  );
 
   const merchantResp = await readSuccessResponseJsonOrThrow(
     resp,
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts 
b/packages/taler-wallet-core/src/operations/refresh.ts
index 52325281..409ae58c 100644
--- a/packages/taler-wallet-core/src/operations/refresh.ts
+++ b/packages/taler-wallet-core/src/operations/refresh.ts
@@ -29,7 +29,7 @@ import {
 } from "../types/dbTypes";
 import { amountToPretty } from "../util/helpers";
 import { TransactionHandle } from "../util/query";
-import { InternalWalletState } from "./state";
+import { InternalWalletState, EXCHANGE_COINS_LOCK } from "./state";
 import { Logger } from "../util/logging";
 import { getWithdrawDenomList } from "./withdraw";
 import { updateExchangeFromUrl } from "./exchanges";
@@ -43,7 +43,7 @@ import { guardOperationException } from "./errors";
 import { NotificationType } from "../types/notifications";
 import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto";
 import { getTimestampNow } from "../util/time";
-import { readSuccessResponseJsonOrThrow } from "../util/http";
+import { readSuccessResponseJsonOrThrow, HttpResponse } from "../util/http";
 import {
   codecForExchangeMeltResponse,
   codecForExchangeRevealResponse,
@@ -248,7 +248,14 @@ async function refreshMelt(
     value_with_fee: Amounts.stringify(refreshSession.amountRefreshInput),
   };
   logger.trace(`melt request for coin:`, meltReq);
-  const resp = await ws.http.postJson(reqUrl.href, meltReq);
+
+  const resp = await ws.runSequentialized(
+    [EXCHANGE_COINS_LOCK],
+    async () => {
+      return await ws.http.postJson(reqUrl.href, meltReq);
+    },
+  );
+
   const meltResponse = await readSuccessResponseJsonOrThrow(
     resp,
     codecForExchangeMeltResponse(),
@@ -339,7 +346,13 @@ async function refreshReveal(
     refreshSession.exchangeBaseUrl,
   );
 
-  const resp = await ws.http.postJson(reqUrl.href, req);
+  const resp = await ws.runSequentialized(
+    [EXCHANGE_COINS_LOCK],
+    async () => {
+      return await ws.http.postJson(reqUrl.href, req);
+    },
+  );
+
   const reveal = await readSuccessResponseJsonOrThrow(
     resp,
     codecForExchangeRevealResponse(),
@@ -446,6 +459,9 @@ async function incrementRefreshRetry(
   }
 }
 
+/**
+ * Actually process a refresh group that has been created.
+ */
 export async function processRefreshGroup(
   ws: InternalWalletState,
   refreshGroupId: string,
@@ -557,15 +573,7 @@ export async function createRefreshGroup(
 
   await tx.put(Stores.refreshGroups, refreshGroup);
 
-  const processAsync = async (): Promise<void> => {
-    try {
-      await processRefreshGroup(ws, refreshGroupId);
-    } catch (e) {
-      logger.trace(`Error during refresh: ${e}`);
-    }
-  };
-
-  processAsync();
+  logger.trace(`created refresh group ${refreshGroupId}`);
 
   return {
     refreshGroupId,
diff --git a/packages/taler-wallet-core/src/operations/state.ts 
b/packages/taler-wallet-core/src/operations/state.ts
index cfec85d0..582dd92d 100644
--- a/packages/taler-wallet-core/src/operations/state.ts
+++ b/packages/taler-wallet-core/src/operations/state.ts
@@ -22,11 +22,15 @@ import { Logger } from "../util/logging";
 import { PendingOperationsResponse } from "../types/pending";
 import { WalletNotification } from "../types/notifications";
 import { Database } from "../util/query";
+import { openPromise, OpenedPromise } from "../util/promiseUtils";
 
 type NotificationListener = (n: WalletNotification) => void;
 
 const logger = new Logger("state.ts");
 
+export const EXCHANGE_COINS_LOCK = "exchange-coins-lock";
+export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock";
+
 export class InternalWalletState {
   cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {};
   memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
@@ -41,6 +45,16 @@ export class InternalWalletState {
 
   listeners: NotificationListener[] = [];
 
+  /**
+   * Promises that are waiting for a particular resource.
+   */
+  private resourceWaiters: Record<string, OpenedPromise<void>[]> = {};
+
+  /**
+   * Resources that are currently locked.
+   */
+  private resourceLocks: Set<string> = new Set();
+
   constructor(
     public db: Database,
     public http: HttpRequestLibrary,
@@ -49,7 +63,7 @@ export class InternalWalletState {
     this.cryptoApi = new CryptoApi(cryptoWorkerFactory);
   }
 
-  public notify(n: WalletNotification): void {
+  notify(n: WalletNotification): void {
     logger.trace("Notification", n);
     for (const l of this.listeners) {
       const nc = JSON.parse(JSON.stringify(n));
@@ -62,4 +76,41 @@ export class InternalWalletState {
   addNotificationListener(f: (n: WalletNotification) => void): void {
     this.listeners.push(f);
   }
+
+  /**
+   * Run an async function after acquiring a list of locks, identified
+   * by string tokens.
+   */
+  async runSequentialized<T>(tokens: string[], f: () => Promise<T>) {
+    // Make sure locks are always acquired in the same order
+    tokens = [... tokens].sort();
+
+    for (const token of tokens) {
+      if (this.resourceLocks.has(token)) {
+        const p = openPromise<void>();
+        let waitList = this.resourceWaiters[token];
+        if (!waitList) {
+          waitList = this.resourceWaiters[token] = [];
+        }
+        waitList.push(p);
+        await p.promise;
+      }
+      this.resourceLocks.add(token);
+    }
+
+    try {
+      logger.trace(`begin exclusive execution on ${JSON.stringify(tokens)}`);
+      const result = await f();
+      logger.trace(`end exclusive execution on ${JSON.stringify(tokens)}`);
+      return result;
+    } finally {
+      for (const token of tokens) {
+        this.resourceLocks.delete(token);
+        let waiter = (this.resourceWaiters[token] ?? []).shift();
+        if (waiter) {
+          waiter.resolve();
+        }
+      }
+    }
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]