gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] 01/02: - fixed bug in tcp com challenge logic. added test case


From: gnunet
Subject: [gnunet] 01/02: - fixed bug in tcp com challenge logic. added test case for bidirectional test.
Date: Mon, 25 Jan 2021 15:58:03 +0100

This is an automated email from the git hooks/post-receive script.

t3sserakt pushed a commit to branch master
in repository gnunet.

commit ac71165822501c42b00980db2fb7f5e2144f3d20
Author: t3sserakt <t3ss@posteo.de>
AuthorDate: Mon Jan 25 15:47:10 2021 +0100

    - fixed bug in tcp com challenge logic. added test case for bidirectional 
test.
---
 src/transport/Makefile.am               |  18 +-
 src/transport/gnunet-communicator-tcp.c | 654 +++++++++++++++++---------------
 src/transport/gnunet-communicator-udp.c |   1 +
 src/transport/test_communicator_basic.c | 570 +++++++++++++++++-----------
 4 files changed, 705 insertions(+), 538 deletions(-)

diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index a2fc3811e..0251b001e 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -618,7 +618,8 @@ check_PROGRAMS += \
   test_communicator_basic-udp \
   test_communicator_rekey-tcp \
   test_communicator_rekey-udp \
-  test_communicator_backchannel-udp
+  test_communicator_backchannel-udp \
+  test_communicator_bidirect-tcp
 endif
 endif
 
@@ -696,7 +697,8 @@ TESTS += \
   test_communicator_basic-udp \
   test_communicator_rekey-tcp \
   test_communicator_rekey-udp \
-  test_communicator_backchannel-udp
+  test_communicator_backchannel-udp \
+  test_communicator_bidirect-tcp
 endif
 endif
 
@@ -857,6 +859,14 @@ test_communicator_backchannel_udp_LDADD = \
  $(top_builddir)/src/testing/libgnunettesting.la \
  $(top_builddir)/src/util/libgnunetutil.la \
  $(top_builddir)/src/statistics/libgnunetstatistics.la
+
+test_communicator_bidirect_tcp_SOURCES = \
+ test_communicator_basic.c
+test_communicator_bidirect_tcp_LDADD = \
+ libgnunettransporttesting2.la \
+ $(top_builddir)/src/testing/libgnunettesting.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/statistics/libgnunetstatistics.la
 endif
 
 test_plugin_unix_SOURCES = \
@@ -1567,4 +1577,6 @@ test_communicator_tcp_rekey_peer2.conf \
 test_communicator_udp_rekey_peer1.conf \
 test_communicator_udp_rekey_peer2.conf \
 test_communicator_udp_backchannel_peer1.conf \
-test_communicator_udp_backchannel_peer2.conf
+test_communicator_udp_backchannel_peer2.conf \
+test_communicator_tcp_bidirect_peer1.conf \
+test_communicator_tcp_bidirect_peer2.conf
diff --git a/src/transport/gnunet-communicator-tcp.c 
b/src/transport/gnunet-communicator-tcp.c
index 0c79fc1b4..ed82dba9f 100644
--- a/src/transport/gnunet-communicator-tcp.c
+++ b/src/transport/gnunet-communicator-tcp.c
@@ -589,6 +589,11 @@ struct Queue
    */
   struct ChallengeNonceP challenge;
 
+  /**
+   * Challenge value received. In case of inbound connection we have to 
remember the value, because we send the challenge back later after we received 
the GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_CONFIRMATION_ACK.
+   */
+  struct ChallengeNonceP challenge_received;
+
   /**
    * Iteration Context for retrieving the monotonic time send with key for 
rekeying.
    */
@@ -834,7 +839,7 @@ int addrs_lens;
  * Size of data received without KX challenge played back.
  */
 // TODO remove?
-// size_t unverified_size;
+size_t unverified_size;
 
 /**
  * Database for peer's HELLOs.
@@ -1188,23 +1193,6 @@ setup_cipher (const struct GNUNET_HashCode *dh,
                                     0));
 }
 
-
-/**
- * Setup cipher of @a queue for decryption.
- *
- * @param ephemeral ephemeral key we received from the other peer
- * @param queue[in,out] queue to initialize decryption cipher for
- */
-static void
-setup_in_cipher (const struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral,
-                 struct Queue *queue)
-{
-  struct GNUNET_HashCode dh;
-
-  GNUNET_CRYPTO_eddsa_ecdh (my_private_key, ephemeral, &dh);
-  setup_cipher (&dh, &my_identity, &queue->in_cipher, &queue->in_hmac);
-}
-
 /**
  * Callback called when peerstore store operation for rekey monotime value is 
finished.
  * @param cls Queue context the store operation was executed.
@@ -1278,6 +1266,23 @@ rekey_monotime_cb (void *cls,
                                                      queue);
 }
 
+/**
+ * Setup cipher of @a queue for decryption.
+ *
+ * @param ephemeral ephemeral key we received from the other peer
+ * @param queue[in,out] queue to initialize decryption cipher for
+ */
+static void
+setup_in_cipher (const struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral,
+                 struct Queue *queue)
+{
+  struct GNUNET_HashCode dh;
+
+  GNUNET_CRYPTO_eddsa_ecdh (my_private_key, ephemeral, &dh);
+  setup_cipher (&dh, &my_identity, &queue->in_cipher, &queue->in_hmac);
+}
+
+
 /**
  * Handle @a rekey message on @a queue. The message was already
  * HMAC'ed, but we should additionally still check the signature.
@@ -1415,6 +1420,220 @@ handshake_ack_monotime_cb (void *cls,
                                                              queue);
 }
 
+/**
+ * Sending challenge with TcpConfirmationAck back to sender of ephemeral key.
+ *
+ * @param tc The TCPConfirmation originally send.
+ * @param queue The queue context.
+ */
+static void
+send_challenge (struct ChallengeNonceP challenge, struct Queue *queue)
+{
+  struct TCPConfirmationAck tca;
+  struct TcpHandshakeAckSignature thas;
+
+  GNUNET_log_from_nocheck (GNUNET_ERROR_TYPE_DEBUG,
+                           "transport",
+                           "sending challenge\n");
+
+  tca.header.type = ntohs (
+    GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_CONFIRMATION_ACK);
+  tca.header.size = ntohs (sizeof(tca));
+  tca.challenge = challenge;
+  tca.sender = my_identity;
+  tca.monotonic_time =
+    GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
+  thas.purpose.purpose = htonl (
+    GNUNET_SIGNATURE_COMMUNICATOR_TCP_HANDSHAKE_ACK);
+  thas.purpose.size = htonl (sizeof(thas));
+  thas.sender = my_identity;
+  thas.receiver = queue->target;
+  thas.monotonic_time = tca.monotonic_time;
+  thas.challenge = tca.challenge;
+  GNUNET_CRYPTO_eddsa_sign (my_private_key,
+                            &thas,
+                            &tca.sender_sig);
+  GNUNET_assert (0 ==
+                 gcry_cipher_encrypt (queue->out_cipher,
+                                      &queue->cwrite_buf[queue->cwrite_off],
+                                      sizeof(tca),
+                                      &tca,
+                                      sizeof(tca)));
+  queue->cwrite_off += sizeof(tca);
+  GNUNET_log_from_nocheck (GNUNET_ERROR_TYPE_DEBUG,
+                           "transport",
+                           "sending challenge done\n");
+}
+
+/**
+ * Setup cipher for outgoing data stream based on target and
+ * our ephemeral private key.
+ *
+ * @param queue queue to setup outgoing (encryption) cipher for
+ */
+static void
+setup_out_cipher (struct Queue *queue)
+{
+  struct GNUNET_HashCode dh;
+
+  GNUNET_CRYPTO_ecdh_eddsa (&queue->ephemeral, &queue->target.public_key, &dh);
+  /* we don't need the private key anymore, drop it! */
+  memset (&queue->ephemeral, 0, sizeof(queue->ephemeral));
+  setup_cipher (&dh, &queue->target, &queue->out_cipher, &queue->out_hmac);
+  queue->rekey_time = GNUNET_TIME_relative_to_absolute (rekey_interval);
+  queue->rekey_left_bytes =
+    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, REKEY_MAX_BYTES);
+}
+
+
+/**
+ * Inject a `struct TCPRekey` message into the queue's plaintext
+ * buffer.
+ *
+ * @param queue queue to perform rekeying on
+ */
+static void
+inject_rekey (struct Queue *queue)
+{
+  struct TCPRekey rekey;
+  struct TcpRekeySignature thp;
+
+  GNUNET_assert (0 == queue->pwrite_off);
+  memset (&rekey, 0, sizeof(rekey));
+  GNUNET_CRYPTO_ecdhe_key_create (&queue->ephemeral);
+  rekey.header.type = ntohs (GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_REKEY);
+  rekey.header.size = ntohs (sizeof(rekey));
+  GNUNET_CRYPTO_ecdhe_key_get_public (&queue->ephemeral, &rekey.ephemeral);
+  rekey.monotonic_time =
+    GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
+  thp.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_TCP_REKEY);
+  thp.purpose.size = htonl (sizeof(thp));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "inject_rekey size %u\n",
+              thp.purpose.size);
+  thp.sender = my_identity;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "sender %s\n",
+              GNUNET_p2s (&thp.sender.public_key));
+  thp.receiver = queue->target;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "receiver %s\n",
+              GNUNET_p2s (&thp.receiver.public_key));
+  thp.ephemeral = rekey.ephemeral;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "ephemeral %s\n",
+              GNUNET_e2s (&thp.ephemeral));
+  thp.monotonic_time = rekey.monotonic_time;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "time %s\n",
+              GNUNET_STRINGS_absolute_time_to_string (
+                GNUNET_TIME_absolute_ntoh (thp.monotonic_time)));
+  GNUNET_CRYPTO_eddsa_sign (my_private_key,
+                            &thp,
+                            &rekey.sender_sig);
+  calculate_hmac (&queue->out_hmac, &rekey, sizeof(rekey), &rekey.hmac);
+  /* Encrypt rekey message with 'old' cipher */
+  GNUNET_assert (0 ==
+                 gcry_cipher_encrypt (queue->out_cipher,
+                                      &queue->cwrite_buf[queue->cwrite_off],
+                                      sizeof(rekey),
+                                      &rekey,
+                                      sizeof(rekey)));
+  queue->cwrite_off += sizeof(rekey);
+  /* Setup new cipher for successive messages */
+  gcry_cipher_close (queue->out_cipher);
+  setup_out_cipher (queue);
+}
+
+/**
+ * We have been notified that our socket is ready to write.
+ * Then reschedule this function to be called again once more is available.
+ *
+ * @param cls a `struct Queue`
+ */
+static void
+queue_write (void *cls)
+{
+  struct Queue *queue = cls;
+  ssize_t sent;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "In queue write\n");
+  queue->write_task = NULL;
+  if (0 != queue->cwrite_off)
+  {
+    sent = GNUNET_NETWORK_socket_send (queue->sock,
+                                       queue->cwrite_buf,
+                                       queue->cwrite_off);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sent %lu bytes to TCP queue\n", sent);
+    if ((-1 == sent) && (EAGAIN != errno) && (EINTR != errno))
+    {
+      GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
+      queue_destroy (queue);
+      return;
+    }
+    if (sent > 0)
+    {
+      size_t usent = (size_t) sent;
+      queue->cwrite_off -= usent;
+      memmove (queue->cwrite_buf,
+               &queue->cwrite_buf[usent],
+               queue->cwrite_off);
+      reschedule_queue_timeout (queue);
+    }
+  }
+  /* can we encrypt more? (always encrypt full messages, needed
+     such that #mq_cancel() can work!) */
+  if ((0 < queue->rekey_left_bytes) &&
+      (queue->pwrite_off > 0) &&
+      (queue->cwrite_off + queue->pwrite_off <= BUF_SIZE))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Encrypting %lu bytes\n", queue->pwrite_off);
+    GNUNET_assert (0 ==
+                   gcry_cipher_encrypt (queue->out_cipher,
+                                        &queue->cwrite_buf[queue->cwrite_off],
+                                        queue->pwrite_off,
+                                        queue->pwrite_buf,
+                                        queue->pwrite_off));
+    if (queue->rekey_left_bytes > queue->pwrite_off)
+      queue->rekey_left_bytes -= queue->pwrite_off;
+    else
+      queue->rekey_left_bytes = 0;
+    queue->cwrite_off += queue->pwrite_off;
+    queue->pwrite_off = 0;
+  }
+  // if ((-1 != unverified_size)&& ((0 == queue->pwrite_off) &&
+  if (((0 == queue->pwrite_off) &&
+       ((0 == queue->rekey_left_bytes) ||
+        (0 ==
+         GNUNET_TIME_absolute_get_remaining (
+           queue->rekey_time).rel_value_us))))
+  {
+    inject_rekey (queue);
+  }
+  if ((0 == queue->pwrite_off) && (! queue->finishing) &&
+      (GNUNET_YES == queue->mq_awaits_continue))
+  {
+    queue->mq_awaits_continue = GNUNET_NO;
+    GNUNET_MQ_impl_send_continue (queue->mq);
+  }
+  /* did we just finish writing 'finish'? */
+  if ((0 == queue->cwrite_off) && (GNUNET_YES == queue->finishing))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Finishing queue\n");
+    queue_destroy (queue);
+    return;
+  }
+  /* do we care to write more? */
+  if ((0 < queue->cwrite_off) || (0 < queue->pwrite_off))
+    queue->write_task =
+      GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                      queue->sock,
+                                      &queue_write,
+                                      queue);
+}
+
 /**
  * Test if we have received a full message in plaintext.
  * If so, handle it.
@@ -1450,16 +1669,16 @@ try_handle_plaintext (struct Queue *queue)
     return 0; /* not even a header */
   }
 
-  /* if ((-1 != unverified_size) && (unverified_size > INITIAL_CORE_KX_SIZE)) 
*/
-  /* { */
-  /*   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, */
-  /*               "Already received data of size %lu bigger than KX size 
%lu!\n", */
-  /*               unverified_size, */
-  /*               INITIAL_CORE_KX_SIZE); */
-  /*   GNUNET_break_op (0); */
-  /*   queue_finish (queue); */
-  /*   return 0; */
-  /* } */
+  if ((-1 != unverified_size) && (unverified_size > INITIAL_CORE_KX_SIZE))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Already received data of size %lu bigger than KX size %lu!\n",
+                unverified_size,
+                INITIAL_CORE_KX_SIZE);
+    GNUNET_break_op (0);
+    queue_finish (queue);
+    return 0;
+  }
 
   type = ntohs (hdr->type);
   switch (type)
@@ -1520,43 +1739,53 @@ try_handle_plaintext (struct Queue *queue)
                                                                   queue);
 
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Handling plaintext, ack processed!");
-
-    // unverified_size = -1;
-
-    /* char *foreign_addr; */
-
-    /* switch (queue->address->sa_family) */
-    /* { */
-    /* case AF_INET: */
-    /*   GNUNET_asprintf (&foreign_addr, */
-    /*                    "%s-%s", */
-    /*                    COMMUNICATOR_ADDRESS_PREFIX, */
-    /*                    GNUNET_a2s (queue->address, queue->address_len)); */
-    /*   break; */
-
-    /* case AF_INET6: */
-    /*   GNUNET_asprintf (&foreign_addr, */
-    /*                    "%s-%s", */
-    /*                    COMMUNICATOR_ADDRESS_PREFIX, */
-    /*                    GNUNET_a2s (queue->address, queue->address_len)); */
-    /*   break; */
-
-    /* default: */
-    /*   GNUNET_assert (0); */
-    /* } */
-
-    /* queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch, */
-    /*                                                   &queue->target, */
-    /*                                                   foreign_addr, */
-    /*                                                   0 /\* no MTU *\/, */
-    /*                                                   
GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED, */
-    /*                                                   0, /\* Priority *\/ */
-    /*                                                   queue->nt, */
-    /*                                                   queue->cs, */
-    /*                                                   queue->mq); */
-
-    /* GNUNET_free (foreign_addr); */
+                "Handling plaintext, ack processed!\n");
+
+    if (GNUNET_TRANSPORT_CS_INBOUND ==     queue->cs)
+    {
+      send_challenge (queue->challenge_received, queue);
+      queue->write_task =
+        GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                        queue->sock,
+                                        &queue_write,
+                                        queue);
+    }
+
+    unverified_size = -1;
+
+    char *foreign_addr;
+
+    switch (queue->address->sa_family)
+    {
+    case AF_INET:
+      GNUNET_asprintf (&foreign_addr,
+                       "%s-%s",
+                       COMMUNICATOR_ADDRESS_PREFIX,
+                       GNUNET_a2s (queue->address, queue->address_len));
+      break;
+
+    case AF_INET6:
+      GNUNET_asprintf (&foreign_addr,
+                       "%s-%s",
+                       COMMUNICATOR_ADDRESS_PREFIX,
+                       GNUNET_a2s (queue->address, queue->address_len));
+      break;
+
+    default:
+      GNUNET_assert (0);
+    }
+
+    queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch,
+                                                      &queue->target,
+                                                      foreign_addr,
+                                                      0 /* no MTU */,
+                                                      
GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
+                                                      0, /* Priority */
+                                                      queue->nt,
+                                                      queue->cs,
+                                                      queue->mq);
+
+    GNUNET_free (foreign_addr);
 
     size = ntohs (hdr->size);
     break;
@@ -1633,8 +1862,8 @@ try_handle_plaintext (struct Queue *queue)
     return 0;
   }
   GNUNET_assert (0 != size);
-  /* if (-1 != unverified_size) */
-  /*   unverified_size += size; */
+  if (-1 != unverified_size)
+    unverified_size += size;
   return size;
 }
 
@@ -2043,178 +2272,6 @@ tcp_address_to_sockaddr (const char *bindto, socklen_t 
*sock_len)
   return in;
 }
 
-
-/**
- * Setup cipher for outgoing data stream based on target and
- * our ephemeral private key.
- *
- * @param queue queue to setup outgoing (encryption) cipher for
- */
-static void
-setup_out_cipher (struct Queue *queue)
-{
-  struct GNUNET_HashCode dh;
-
-  GNUNET_CRYPTO_ecdh_eddsa (&queue->ephemeral, &queue->target.public_key, &dh);
-  /* we don't need the private key anymore, drop it! */
-  memset (&queue->ephemeral, 0, sizeof(queue->ephemeral));
-  setup_cipher (&dh, &queue->target, &queue->out_cipher, &queue->out_hmac);
-  queue->rekey_time = GNUNET_TIME_relative_to_absolute (rekey_interval);
-  queue->rekey_left_bytes =
-    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, REKEY_MAX_BYTES);
-}
-
-
-/**
- * Inject a `struct TCPRekey` message into the queue's plaintext
- * buffer.
- *
- * @param queue queue to perform rekeying on
- */
-static void
-inject_rekey (struct Queue *queue)
-{
-  struct TCPRekey rekey;
-  struct TcpRekeySignature thp;
-
-  GNUNET_assert (0 == queue->pwrite_off);
-  memset (&rekey, 0, sizeof(rekey));
-  GNUNET_CRYPTO_ecdhe_key_create (&queue->ephemeral);
-  rekey.header.type = ntohs (GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_REKEY);
-  rekey.header.size = ntohs (sizeof(rekey));
-  GNUNET_CRYPTO_ecdhe_key_get_public (&queue->ephemeral, &rekey.ephemeral);
-  rekey.monotonic_time =
-    GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
-  thp.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_TCP_REKEY);
-  thp.purpose.size = htonl (sizeof(thp));
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "inject_rekey size %u\n",
-              thp.purpose.size);
-  thp.sender = my_identity;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "sender %s\n",
-              GNUNET_p2s (&thp.sender.public_key));
-  thp.receiver = queue->target;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "receiver %s\n",
-              GNUNET_p2s (&thp.receiver.public_key));
-  thp.ephemeral = rekey.ephemeral;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "ephemeral %s\n",
-              GNUNET_e2s (&thp.ephemeral));
-  thp.monotonic_time = rekey.monotonic_time;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "time %s\n",
-              GNUNET_STRINGS_absolute_time_to_string (
-                GNUNET_TIME_absolute_ntoh (thp.monotonic_time)));
-  GNUNET_CRYPTO_eddsa_sign (my_private_key,
-                            &thp,
-                            &rekey.sender_sig);
-  calculate_hmac (&queue->out_hmac, &rekey, sizeof(rekey), &rekey.hmac);
-  /* Encrypt rekey message with 'old' cipher */
-  GNUNET_assert (0 ==
-                 gcry_cipher_encrypt (queue->out_cipher,
-                                      &queue->cwrite_buf[queue->cwrite_off],
-                                      sizeof(rekey),
-                                      &rekey,
-                                      sizeof(rekey)));
-  queue->cwrite_off += sizeof(rekey);
-  /* Setup new cipher for successive messages */
-  gcry_cipher_close (queue->out_cipher);
-  setup_out_cipher (queue);
-}
-
-
-/**
- * We have been notified that our socket is ready to write.
- * Then reschedule this function to be called again once more is available.
- *
- * @param cls a `struct Queue`
- */
-static void
-queue_write (void *cls)
-{
-  struct Queue *queue = cls;
-  ssize_t sent;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "In queue write\n");
-  queue->write_task = NULL;
-  if (0 != queue->cwrite_off)
-  {
-    sent = GNUNET_NETWORK_socket_send (queue->sock,
-                                       queue->cwrite_buf,
-                                       queue->cwrite_off);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Sent %lu bytes to TCP queue\n", sent);
-    if ((-1 == sent) && (EAGAIN != errno) && (EINTR != errno))
-    {
-      GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
-      queue_destroy (queue);
-      return;
-    }
-    if (sent > 0)
-    {
-      size_t usent = (size_t) sent;
-      queue->cwrite_off -= usent;
-      memmove (queue->cwrite_buf,
-               &queue->cwrite_buf[usent],
-               queue->cwrite_off);
-      reschedule_queue_timeout (queue);
-    }
-  }
-  /* can we encrypt more? (always encrypt full messages, needed
-     such that #mq_cancel() can work!) */
-  if ((0 < queue->rekey_left_bytes) &&
-      (queue->pwrite_off > 0) &&
-      (queue->cwrite_off + queue->pwrite_off <= BUF_SIZE))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Encrypting %lu bytes\n", queue->pwrite_off);
-    GNUNET_assert (0 ==
-                   gcry_cipher_encrypt (queue->out_cipher,
-                                        &queue->cwrite_buf[queue->cwrite_off],
-                                        queue->pwrite_off,
-                                        queue->pwrite_buf,
-                                        queue->pwrite_off));
-    if (queue->rekey_left_bytes > queue->pwrite_off)
-      queue->rekey_left_bytes -= queue->pwrite_off;
-    else
-      queue->rekey_left_bytes = 0;
-    queue->cwrite_off += queue->pwrite_off;
-    queue->pwrite_off = 0;
-  }
-  // if ((-1 != unverified_size)&& ((0 == queue->pwrite_off) &&
-  if (((0 == queue->pwrite_off) &&
-       ((0 == queue->rekey_left_bytes) ||
-        (0 ==
-         GNUNET_TIME_absolute_get_remaining (
-           queue->rekey_time).rel_value_us))))
-  {
-    inject_rekey (queue);
-  }
-  if ((0 == queue->pwrite_off) && (! queue->finishing) &&
-      (GNUNET_YES == queue->mq_awaits_continue))
-  {
-    queue->mq_awaits_continue = GNUNET_NO;
-    GNUNET_MQ_impl_send_continue (queue->mq);
-  }
-  /* did we just finish writing 'finish'? */
-  if ((0 == queue->cwrite_off) && (GNUNET_YES == queue->finishing))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Finishing queue\n");
-    queue_destroy (queue);
-    return;
-  }
-  /* do we care to write more? */
-  if ((0 < queue->cwrite_off) || (0 < queue->pwrite_off))
-    queue->write_task =
-      GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
-                                      queue->sock,
-                                      &queue_write,
-                                      queue);
-}
-
-
 /**
  * Signature of functions implementing the sending functionality of a
  * message queue.
@@ -2348,39 +2405,39 @@ boot_queue (struct Queue *queue)
                                              NULL,
                                              &mq_error,
                                              queue);
-  {
-    char *foreign_addr;
-
-    switch (queue->address->sa_family)
-    {
-    case AF_INET:
-      GNUNET_asprintf (&foreign_addr,
-                       "%s-%s",
-                       COMMUNICATOR_ADDRESS_PREFIX,
-                       GNUNET_a2s (queue->address, queue->address_len));
-      break;
-
-    case AF_INET6:
-      GNUNET_asprintf (&foreign_addr,
-                       "%s-%s",
-                       COMMUNICATOR_ADDRESS_PREFIX,
-                       GNUNET_a2s (queue->address, queue->address_len));
-      break;
-
-    default:
-      GNUNET_assert (0);
-    }
-    queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch,
-                                                      &queue->target,
-                                                      foreign_addr,
-                                                      0 /* no MTU */,
-                                                      
GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
-                                                      0, /* Priority */
-                                                      queue->nt,
-                                                      queue->cs,
-                                                      queue->mq);
-    GNUNET_free (foreign_addr);
-  }
+  /* { */
+  /*   char *foreign_addr; */
+
+  /*   switch (queue->address->sa_family) */
+  /*   { */
+  /*   case AF_INET: */
+  /*     GNUNET_asprintf (&foreign_addr, */
+  /*                      "%s-%s", */
+  /*                      COMMUNICATOR_ADDRESS_PREFIX, */
+  /*                      GNUNET_a2s (queue->address, queue->address_len)); */
+  /*     break; */
+
+  /*   case AF_INET6: */
+  /*     GNUNET_asprintf (&foreign_addr, */
+  /*                      "%s-%s", */
+  /*                      COMMUNICATOR_ADDRESS_PREFIX, */
+  /*                      GNUNET_a2s (queue->address, queue->address_len)); */
+  /*     break; */
+
+  /*   default: */
+  /*     GNUNET_assert (0); */
+  /*   } */
+  /*   queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch, */
+  /*                                                     &queue->target, */
+  /*                                                     foreign_addr, */
+  /*                                                     0 /\* no MTU *\/, */
+  /*                                                     
GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED, */
+  /*                                                     0, /\* Priority *\/ */
+  /*                                                     queue->nt, */
+  /*                                                     queue->cs, */
+  /*                                                     queue->mq); */
+  /*   GNUNET_free (foreign_addr); */
+  /* } */
 }
 
 
@@ -2594,48 +2651,6 @@ free_proto_queue (struct ProtoQueue *pq)
   GNUNET_free (pq);
 }
 
-/**
- * Sending challenge with TcpConfirmationAck back to sender of ephemeral key.
- *
- * @param tc The TCPConfirmation originally send.
- * @param queue The queue context.
- */
-static void
-send_challenge (struct ChallengeNonceP challenge, struct Queue *queue)
-{
-  struct TCPConfirmationAck tca;
-  struct TcpHandshakeAckSignature thas;
-
-  GNUNET_log_from_nocheck (GNUNET_ERROR_TYPE_DEBUG,
-                           "transport",
-                           "sending challenge\n");
-
-  tca.header.type = ntohs (
-    GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_CONFIRMATION_ACK);
-  tca.header.size = ntohs (sizeof(tca));
-  tca.challenge = challenge;
-  tca.sender = my_identity;
-  tca.monotonic_time =
-    GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
-  thas.purpose.purpose = htonl (
-    GNUNET_SIGNATURE_COMMUNICATOR_TCP_HANDSHAKE_ACK);
-  thas.purpose.size = htonl (sizeof(thas));
-  thas.sender = my_identity;
-  thas.receiver = queue->target;
-  thas.monotonic_time = tca.monotonic_time;
-  thas.challenge = tca.challenge;
-  GNUNET_CRYPTO_eddsa_sign (my_private_key,
-                            &thas,
-                            &tca.sender_sig);
-  GNUNET_assert (0 ==
-                 gcry_cipher_encrypt (queue->out_cipher,
-                                      &queue->cwrite_buf[queue->cwrite_off],
-                                      sizeof(tca),
-                                      &tca,
-                                      sizeof(tca)));
-  queue->cwrite_off += sizeof(tca);
-}
-
 /**
  * Read from the socket of the proto queue until we have enough data
  * to upgrade to full queue.
@@ -2722,7 +2737,8 @@ proto_read_kx (void *cls)
                                     &queue_write,
                                     queue);
   // TODO To early! Move it somewhere else.
-  // send_challenge (tc, queue);
+  // send_challenge (tc.challenge, queue);
+  queue->challenge_received = tc.challenge;
 
   GNUNET_CONTAINER_DLL_remove (proto_head, proto_tail, pq);
   GNUNET_free (pq);
@@ -2853,6 +2869,12 @@ queue_read_kx (void *cls)
     return;
   }
   send_challenge (tc.challenge, queue);
+  queue->write_task =
+    GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                    queue->sock,
+                                    &queue_write,
+                                    queue);
+
   /* update queue timeout */
   reschedule_queue_timeout (queue);
   /* prepare to continue with regular read task immediately */
@@ -2866,7 +2888,7 @@ queue_read_kx (void *cls)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "cread_off set to %lu bytes\n",
               queue->cread_off);
-  if (0 < queue->cread_off)
+  if (0 <= queue->cread_off)
     queue->read_task = GNUNET_SCHEDULER_add_now (&queue_read, queue);
 }
 
diff --git a/src/transport/gnunet-communicator-udp.c 
b/src/transport/gnunet-communicator-udp.c
index 018da8f0e..2e09bc9d2 100644
--- a/src/transport/gnunet-communicator-udp.c
+++ b/src/transport/gnunet-communicator-udp.c
@@ -1882,6 +1882,7 @@ consider_ss_ack (struct SharedSecret *ss, int initial)
 
     // kce_generate (ss, ++ss->sequence_allowed);
     // kce_generate (ss, ++ss->sequence_allowed);
+    // TODO This task must be per sender!
     kce_task = GNUNET_SCHEDULER_add_delayed (WORKING_QUEUE_INTERVALL,
                                              kce_generate_cb,
                                              ss);
diff --git a/src/transport/test_communicator_basic.c 
b/src/transport/test_communicator_basic.c
index 0250de474..ffc21e47a 100644
--- a/src/transport/test_communicator_basic.c
+++ b/src/transport/test_communicator_basic.c
@@ -42,7 +42,7 @@
 
 #define NUM_PEERS 2
 
-static struct GNUNET_SCHEDULER_Task *to_task;
+static struct GNUNET_SCHEDULER_Task *to_task[NUM_PEERS];
 
 static int queue_est = GNUNET_NO;
 
@@ -59,27 +59,29 @@ static struct GNUNET_STATISTICS_Handle *stats[NUM_PEERS];
 
 static char *cfg_peers_name[NUM_PEERS];
 
+static int finished[NUM_PEERS];
+
 static int ret;
 
 static int bidirect = GNUNET_NO;
 
 static size_t long_message_size;
 
-static struct GNUNET_TIME_Absolute start_short;
+static struct GNUNET_TIME_Absolute start_short[NUM_PEERS];
 
-static struct GNUNET_TIME_Absolute start_long;
+static struct GNUNET_TIME_Absolute start_long[NUM_PEERS];
 
-static struct GNUNET_TIME_Absolute timeout;
+static struct GNUNET_TIME_Absolute timeout[NUM_PEERS];
 
-static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *my_tc;
+// static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *my_tc;
 
 static char *communicator_name;
 
 static char *test_name;
 
-static struct GNUNET_STATISTICS_GetHandle *box_stats;
+static struct GNUNET_STATISTICS_GetHandle *box_stats[NUM_PEERS];
 
-static struct GNUNET_STATISTICS_GetHandle *rekey_stats;
+static struct GNUNET_STATISTICS_GetHandle *rekey_stats[NUM_PEERS];
 
 #define TEST_SECTION "test-setup"
 
@@ -97,7 +99,7 @@ static struct GNUNET_STATISTICS_GetHandle *rekey_stats;
 
 #define PEER_B 1
 
-static unsigned int iterations_left = TOTAL_ITERATIONS;
+static unsigned int iterations_left[NUM_PEERS];
 
 #define TIMEOUT_MULTIPLIER 1
 
@@ -118,11 +120,11 @@ enum TestPhase
   TP_SIZE_CHECK
 };
 
-static unsigned int phase_short;
+static unsigned int phase_short[NUM_PEERS];
 
-static unsigned int phase_long;
+static unsigned int phase_long[NUM_PEERS];
 
-static unsigned int phase_size;
+static unsigned int phase_size[NUM_PEERS];
 
 static long long unsigned int allowed_packet_loss_short;
 
@@ -140,26 +142,23 @@ static struct GNUNET_TIME_Relative delay_short;
 
 static struct GNUNET_TIME_Relative delay_long;
 
-static size_t num_sent_short = 0;
-
-static size_t num_sent_long = 0;
-
-static size_t num_sent_size = 0;
+static size_t num_sent_short[NUM_PEERS];
 
-static uint32_t ack = 0;
+static size_t num_sent_long[NUM_PEERS];
 
-static enum TestPhase phase;
+static size_t num_sent_size[NUM_PEERS];
 
-static size_t num_received_short = 0;
+static uint32_t ack[NUM_PEERS];
 
-static size_t num_received_long = 0;
+static enum TestPhase phase[NUM_PEERS];
 
-static size_t num_received_size = 0;
+static size_t num_received_short[NUM_PEERS];
 
-static uint64_t avg_latency = 0;
+static size_t num_received_long[NUM_PEERS];
 
-static struct GNUNET_TIME_Relative duration;
+static size_t num_received_size[NUM_PEERS];
 
+static uint64_t avg_latency[NUM_PEERS];
 
 static void
 communicator_available_cb (
@@ -284,43 +283,87 @@ make_payload (size_t payload_size)
   return payload;
 }
 
+static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *
+get_tc_h (unsigned int peer_nr)
+{
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got peer %u\n",
+       peer_nr);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Handle %p peer 0\n",
+       tc_hs[0]);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Handle %p peer 1\n",
+       tc_hs[1]);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Handle %p get\n",
+       tc_hs[peer_nr]);
+
+  return tc_hs[peer_nr];
+}
+
+
+static unsigned int
+get_peer_nr_from_tc (struct
+                     GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle 
*tc_h)
+{
+  if (tc_h == get_tc_h (0))
+    return PEER_A;
+  else
+    return PEER_B;
+}
+
+static unsigned int
+get_peer_nr (void *cls, unsigned int get_the_other_one)
+{
+  if (0 == strcmp ((char*) cls, cfg_peers_name[0]))
+    return get_the_other_one ? PEER_B : PEER_A;
+  else
+    return get_the_other_one ? PEER_A : PEER_B;
+}
 
 static void
 latency_timeout (void *cls)
 {
 
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+  unsigned int peer_nr;
   size_t num_sent = 0;
   size_t num_received = 0;
 
-  to_task = NULL;
-  if (GNUNET_TIME_absolute_get_remaining (timeout).rel_value_us > 0)
+  peer_nr = get_peer_nr_from_tc (tc_h);
+  to_task[peer_nr] = NULL;
+  if (GNUNET_TIME_absolute_get_remaining (timeout[peer_nr]).rel_value_us > 0)
   {
-    to_task = GNUNET_SCHEDULER_add_at (timeout,
-                                       &latency_timeout,
-                                       NULL);
+    to_task[peer_nr] = GNUNET_SCHEDULER_add_at (timeout[peer_nr],
+                                                &latency_timeout,
+                                                cls);
     return;
   }
-  switch (phase)
+  switch (phase[peer_nr])
   {
   case TP_INIT:
     GNUNET_assert (0);
     break;
   case TP_BURST_SHORT:
-    num_sent = num_sent_short;
-    num_received = num_received_short;
+    num_sent = num_sent_short[peer_nr];
+    num_received = num_received_short[peer_nr];
     break;
   case TP_BURST_LONG:
-    num_sent = num_sent_long;
-    num_received = num_received_long;
+    num_sent = num_sent_long[peer_nr];
+    num_received = num_received_long[peer_nr];
     break;
   case TP_SIZE_CHECK:
-    num_sent = num_sent_size;
-    num_received = num_received_size;
+    num_sent = num_sent_size[peer_nr];
+    num_received = num_received_size[peer_nr];
     break;
   }
   LOG (GNUNET_ERROR_TYPE_ERROR,
        "Latency too high. Test failed. (Phase: %d. Sent: %lu, Received: 
%lu)\n",
-       phase, num_sent, num_received);
+       phase[peer_nr], num_sent, num_received);
   ret = 2;
   GNUNET_SCHEDULER_shutdown ();
 }
@@ -328,31 +371,36 @@ latency_timeout (void *cls)
 static void
 size_test (void *cls)
 {
+  unsigned int peer_nr;
   char *payload;
   size_t max_size = 64000;
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
 
+  peer_nr = get_peer_nr_from_tc (tc_h);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "size_test_cb %u\n",
-       (unsigned int) num_sent_size);
-  GNUNET_assert (TP_SIZE_CHECK == phase);
+       (unsigned int) num_sent_size[peer_nr]);
+  GNUNET_assert (TP_SIZE_CHECK == phase[peer_nr]);
   if (LONG_MESSAGE_SIZE != long_message_size)
     max_size = long_message_size;
-  if (ack + 10 > max_size)
+  if (ack[peer_nr] + 10 > max_size)
     return; /* Leave some room for our protocol, so not 2^16 exactly */
-  ack += 10;
-  payload = make_payload (ack);
-  num_sent_size++;
-  GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
-                                                        (ack < max_size)
+  ack[peer_nr] += 10;
+  payload = make_payload (ack[peer_nr]);
+  num_sent_size[peer_nr]++;
+  GNUNET_TRANSPORT_TESTING_transport_communicator_send (tc_h,
+                                                        (ack[peer_nr] <
+                                                         max_size)
                                                         ? &size_test
                                                         : NULL,
-                                                        NULL,
+                                                        cls,
                                                         payload,
-                                                        ack);
+                                                        ack[peer_nr]);
   GNUNET_free (payload);
-  timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (
-                                                GNUNET_TIME_UNIT_SECONDS,
-                                                TIMEOUT_MULTIPLIER));
+  timeout[peer_nr] = GNUNET_TIME_relative_to_absolute (
+    GNUNET_TIME_relative_multiply (
+      GNUNET_TIME_UNIT_SECONDS,
+      TIMEOUT_MULTIPLIER));
 }
 
 
@@ -362,24 +410,28 @@ long_test (void *cls);
 static void
 long_test_cb (void *cls)
 {
+  unsigned int peer_nr;
   char *payload;
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+
+  peer_nr = get_peer_nr_from_tc (tc_h);
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "long_test_cb %u/%u\n",
-       (unsigned int) num_sent_long,
-       (unsigned int) num_received_long);
+       (unsigned int) num_sent_long[peer_nr],
+       (unsigned int) num_received_long[peer_nr]);
   payload = make_payload (long_message_size);
-  num_sent_long++;
-  GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
+  num_sent_long[peer_nr]++;
+  GNUNET_TRANSPORT_TESTING_transport_communicator_send (tc_h,
                                                         (burst_packets_long ==
-                                                         num_sent_long)
+                                                         
num_sent_long[peer_nr])
                                                         ? NULL
                                                         : &long_test,
-                                                        NULL,
+                                                        cls,
                                                         payload,
                                                         long_message_size);
   GNUNET_free (payload);
-  timeout = GNUNET_TIME_relative_to_absolute (
+  timeout[peer_nr] = GNUNET_TIME_relative_to_absolute (
     GNUNET_TIME_relative_multiply (
       GNUNET_TIME_UNIT_SECONDS,
       TIMEOUT_MULTIPLIER));
@@ -391,7 +443,7 @@ long_test (void *cls)
 {
   GNUNET_SCHEDULER_add_delayed (delay_long,
                                 &long_test_cb,
-                                NULL);
+                                cls);
 }
 
 
@@ -402,26 +454,33 @@ short_test (void *cls);
 static void
 short_test_cb (void *cls)
 {
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+  unsigned int peer_nr;
   char *payload;
 
+  peer_nr = get_peer_nr_from_tc (tc_h);
+
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "short_test_cb %u/%u\n",
-       (unsigned int) num_sent_short,
-       (unsigned int) num_received_short);
+       "short_test_cb %u/%u for peer %u and handle %p\n",
+       (unsigned int) num_sent_short[peer_nr],
+       (unsigned int) num_received_short[peer_nr],
+       peer_nr,
+       tc_h);
   payload = make_payload (SHORT_MESSAGE_SIZE);
-  num_sent_short++;
-  GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
+  num_sent_short[peer_nr]++;
+  GNUNET_TRANSPORT_TESTING_transport_communicator_send (tc_h,
                                                         (burst_packets_short ==
-                                                         num_sent_short)
+                                                         
num_sent_short[peer_nr])
                                                         ? NULL
                                                         : &short_test,
-                                                        NULL,
+                                                        cls,
                                                         payload,
                                                         SHORT_MESSAGE_SIZE);
   GNUNET_free (payload);
-  timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (
-                                                GNUNET_TIME_UNIT_SECONDS,
-                                                TIMEOUT_MULTIPLIER));
+  timeout[peer_nr] = GNUNET_TIME_relative_to_absolute (
+    GNUNET_TIME_relative_multiply (
+      GNUNET_TIME_UNIT_SECONDS,
+      TIMEOUT_MULTIPLIER));
 }
 
 
@@ -430,7 +489,7 @@ short_test (void *cls)
 {
   GNUNET_SCHEDULER_add_delayed (delay_short,
                                 &short_test_cb,
-                                NULL);
+                                cls);
 }
 
 
@@ -462,9 +521,14 @@ short_test (void *cls)
 static void
 process_statistics_box_done (void *cls, int success)
 {
-  if (NULL != box_stats)
-    box_stats = NULL;
-  if (NULL == rekey_stats)
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+  unsigned int peer_nr;
+
+  peer_nr = get_peer_nr_from_tc (tc_h);
+
+  if (NULL != box_stats[peer_nr])
+    box_stats[peer_nr] = NULL;
+  if (NULL == rekey_stats[peer_nr])
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Finished\n");
@@ -476,9 +540,14 @@ process_statistics_box_done (void *cls, int success)
 static void
 process_statistics_rekey_done (void *cls, int success)
 {
-  if (NULL != rekey_stats)
-    rekey_stats = NULL;
-  if (NULL == box_stats)
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+  unsigned int peer_nr;
+
+  peer_nr = get_peer_nr_from_tc (tc_h);
+
+  if (NULL != rekey_stats[peer_nr])
+    rekey_stats[peer_nr] = NULL;
+  if (NULL == box_stats[peer_nr])
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Finished\n");
@@ -533,24 +602,37 @@ process_statistics (void *cls,
 }
 
 static void
-choose_phase ()
+choose_phase (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle 
*tc_h)
 {
-  if (GNUNET_YES == phase_short)
+  unsigned int peer_nr;
+
+  peer_nr = get_peer_nr_from_tc (tc_h);
+
+  if (GNUNET_YES == phase_short[peer_nr])
   {
-    phase =  TP_BURST_SHORT;
-    start_short = GNUNET_TIME_absolute_get ();
-    short_test (NULL);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Choose phase short with peer %u and Handle %p\n",
+                peer_nr,
+                tc_h);
+    phase[peer_nr] =  TP_BURST_SHORT;
+    start_short[peer_nr] = GNUNET_TIME_absolute_get ();
+    short_test (tc_h);
   }
-  else if (GNUNET_YES == phase_long)
+  else if (GNUNET_YES == phase_long[peer_nr])
   {
-    phase =  TP_BURST_LONG;
-    start_long = GNUNET_TIME_absolute_get ();
-    long_test (NULL);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Choose phase long with peer %u\n",
+                peer_nr);
+    phase[peer_nr] =  TP_BURST_LONG;
+    start_long[peer_nr] = GNUNET_TIME_absolute_get ();
+    long_test (tc_h);
   }
-  else if (GNUNET_YES == phase_size)
+  else if (GNUNET_YES == phase_size[peer_nr])
   {
-    phase =  TP_SIZE_CHECK;
-    size_test (NULL);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Choose phase size\n");
+    phase[peer_nr] =  TP_SIZE_CHECK;
+    size_test (tc_h);
   }
   else
   {
@@ -560,28 +642,33 @@ choose_phase ()
                                                           "backchannel",
                                                           test_name))) )
     {
-      if (NULL != box_stats)
-        GNUNET_STATISTICS_get_cancel (box_stats);
-      box_stats = GNUNET_STATISTICS_get (stats[1],
-                                         "C-UDP",
-                                         "# messages decrypted with BOX",
-                                         process_statistics_box_done,
-                                         &process_statistics,
-                                         NULL);
-      if (NULL != rekey_stats)
-        GNUNET_STATISTICS_get_cancel (rekey_stats);
-      rekey_stats = GNUNET_STATISTICS_get (stats[0],
-                                           "C-UDP",
-                                           "# rekeying successful",
-                                           process_statistics_rekey_done,
-                                           &process_statistics,
-                                           NULL);
+      if (NULL != box_stats[peer_nr])
+        GNUNET_STATISTICS_get_cancel (box_stats[peer_nr]);
+      box_stats[peer_nr] = GNUNET_STATISTICS_get (stats[1],
+                                                  "C-UDP",
+                                                  "# messages decrypted with 
BOX",
+                                                  process_statistics_box_done,
+                                                  &process_statistics,
+                                                  tc_h);
+      if (NULL != rekey_stats[peer_nr])
+        GNUNET_STATISTICS_get_cancel (rekey_stats[peer_nr]);
+      rekey_stats[peer_nr] = GNUNET_STATISTICS_get (stats[0],
+                                                    "C-UDP",
+                                                    "# rekeying successful",
+                                                    
process_statistics_rekey_done,
+                                                    &process_statistics,
+                                                    tc_h);
     }
     else
     {
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Finished\n");
-      GNUNET_SCHEDULER_shutdown ();
+      if (((PEER_A == peer_nr) && finished[PEER_B]) || ((PEER_B == peer_nr) &&
+                                                        finished[PEER_A]))
+      {
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "Finished\n");
+        GNUNET_SCHEDULER_shutdown ();
+      }
+      finished[peer_nr] = GNUNET_YES;
     }
   }
 }
@@ -604,37 +691,54 @@ add_queue_cb (void *cls,
               tc_queue,
               size_t mtu)
 {
-  if (TP_INIT != phase)
-    return;
-  if (0 != strcmp ((char*) cls, cfg_peers_name[0]))
+
+  unsigned int peer_nr;
+
+  peer_nr = get_peer_nr (cls, GNUNET_NO);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Handle %p add %u %u\n",
+       tc_h,
+       peer_nr,
+       get_peer_nr_from_tc (tc_h));
+
+  if ((GNUNET_NO == bidirect)&&(0 != strcmp ((char*) cls, cfg_peers_name[0])))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Queue available at receiving peer\n");
     return; // TODO?
+  }
+  else if (TP_INIT != phase[peer_nr])
+    return;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Queue established, starting test...\n");
   // start_short = GNUNET_TIME_absolute_get ();
-  my_tc = tc_h;
+  // my_tc = tc_h;
   if (0 != mtu) /* Message header overhead */
     long_message_size = mtu - sizeof(struct GNUNET_TRANSPORT_SendMessageTo)
                         - sizeof(struct GNUNET_MessageHeader);
   else
     long_message_size = LONG_MESSAGE_SIZE;
   // phase = TP_BURST_SHORT;
-  timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (
-                                                GNUNET_TIME_UNIT_SECONDS,
-                                                TIMEOUT_MULTIPLIER));
-  GNUNET_assert (NULL == to_task);
-  to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (
-                                            GNUNET_TIME_UNIT_SECONDS,
-                                            TIMEOUT_MULTIPLIER),
-                                          &latency_timeout,
-                                          NULL);
+  timeout[peer_nr] = GNUNET_TIME_relative_to_absolute (
+    GNUNET_TIME_relative_multiply (
+      GNUNET_TIME_UNIT_SECONDS,
+      TIMEOUT_MULTIPLIER));
+  GNUNET_assert (NULL == to_task[peer_nr]);
+  to_task[peer_nr] = GNUNET_SCHEDULER_add_delayed (
+    GNUNET_TIME_relative_multiply (
+      GNUNET_TIME_UNIT_SECONDS,
+      TIMEOUT_MULTIPLIER),
+    &latency_timeout,
+    tc_h);
   // prepare_test (NULL);
   // short_test (NULL);
-  choose_phase ();
+  choose_phase (tc_h);
 }
 
 
 static void
-update_avg_latency (const char*payload)
+update_avg_latency (const char *payload, unsigned int peer_nr)
 {
   struct GNUNET_TIME_AbsoluteNBO *ts_n;
   struct GNUNET_TIME_Absolute ts;
@@ -645,31 +749,33 @@ update_avg_latency (const char*payload)
   ts = GNUNET_TIME_absolute_ntoh (*ts_n);
   latency = GNUNET_TIME_absolute_get_duration (ts);
 
-  switch (phase)
+  switch (phase[peer_nr])
   {
   case TP_INIT:
     GNUNET_assert (0);
     break;
   case TP_BURST_SHORT:
-    num_received = num_received_short;
+    num_received = num_received_short[peer_nr];
     break;
   case TP_BURST_LONG:
-    num_received = num_received_long;
+    num_received = num_received_long[peer_nr];
     break;
   case TP_SIZE_CHECK:
-    num_received = num_received_size;
+    num_received = num_received_size[peer_nr];
     break;
   }
   if (1 >= num_received)
-    avg_latency = latency.rel_value_us;
+    avg_latency[peer_nr] = latency.rel_value_us;
   else
-    avg_latency = ((avg_latency * (num_received - 1)) + latency.rel_value_us)
-                  / num_received;
+    avg_latency[peer_nr] = ((avg_latency[peer_nr] * (num_received - 1))
+                            + latency.rel_value_us)
+                           / num_received;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Latency of received packet: %s with avg latency %lu\n",
+       "Latency of received packet by peer %u: %s with avg latency %lu\n",
+       peer_nr,
        GNUNET_STRINGS_relative_time_to_string (latency,
                                                GNUNET_YES),
-       avg_latency);
+       avg_latency[peer_nr]);
 }
 
 
@@ -679,25 +785,31 @@ static void
 load_phase_config ()
 {
 
-  phase_short =  GNUNET_CONFIGURATION_get_value_yesno (cfg_peers[0],
-                                                       TEST_SECTION,
-                                                       "PHASE_SHORT");
-  if (GNUNET_SYSERR == phase_short)
-    phase_short = GNUNET_YES;
+  phase_short[0] =  GNUNET_CONFIGURATION_get_value_yesno (cfg_peers[0],
+                                                          TEST_SECTION,
+                                                          "PHASE_SHORT");
+  if (GNUNET_SYSERR == phase_short[0])
+    phase_short[0] = GNUNET_YES;
 
-  phase_long =  GNUNET_CONFIGURATION_get_value_yesno (cfg_peers[0],
-                                                      TEST_SECTION,
-                                                      "PHASE_LONG");
+  phase_short[1] = phase_short[0];
 
-  if (GNUNET_SYSERR == phase_long)
-    phase_long = GNUNET_YES;
+  phase_long[0] =  GNUNET_CONFIGURATION_get_value_yesno (cfg_peers[0],
+                                                         TEST_SECTION,
+                                                         "PHASE_LONG");
 
-  phase_size =   GNUNET_CONFIGURATION_get_value_yesno (cfg_peers[0],
-                                                       TEST_SECTION,
-                                                       "PHASE_SIZE");
+  if (GNUNET_SYSERR == phase_long[0])
+    phase_long[0] = GNUNET_YES;
 
-  if (GNUNET_SYSERR == phase_size)
-    phase_size = GNUNET_YES;
+  phase_long[1] = phase_long[0];
+
+  phase_size[0] =   GNUNET_CONFIGURATION_get_value_yesno (cfg_peers[0],
+                                                          TEST_SECTION,
+                                                          "PHASE_SIZE");
+
+  if (GNUNET_SYSERR == phase_size[0])
+    phase_size[0] = GNUNET_YES;
+
+  phase_size[1] = phase_size[0];
 }
 
 /**
@@ -716,18 +828,24 @@ incoming_message_cb (
   const char *payload,
   size_t payload_len)
 {
-  if (0 != strcmp ((char*) cls,
-                   cfg_peers_name[NUM_PEERS - 1]))
+  unsigned int peer_nr;
+  static struct GNUNET_TIME_Relative duration;
+
+  peer_nr = get_peer_nr (cls, GNUNET_YES);
+
+  if ((GNUNET_NO == bidirect)&&(0 != strcmp ((char*) cls,
+                                             cfg_peers_name[NUM_PEERS - 1])))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "unexpected receiver...\n");
     return;
   }
   /* Reset timeout */
-  timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (
-                                                GNUNET_TIME_UNIT_SECONDS,
-                                                TIMEOUT_MULTIPLIER));
-  switch (phase)
+  timeout[peer_nr] = GNUNET_TIME_relative_to_absolute (
+    GNUNET_TIME_relative_multiply (
+      GNUNET_TIME_UNIT_SECONDS,
+      TIMEOUT_MULTIPLIER));
+  switch (phase[peer_nr])
   {
   case TP_INIT:
     GNUNET_break (0);
@@ -735,34 +853,37 @@ incoming_message_cb (
   case TP_BURST_SHORT:
     {
       GNUNET_assert (SHORT_MESSAGE_SIZE == payload_len);
-      num_received_short++;
-      duration = GNUNET_TIME_absolute_get_duration (start_short);
-      update_avg_latency (payload);
-      if ((num_sent_short == burst_packets_short) && (num_received_short >
-                                                      burst_packets_short / 100
-                                                      *
-                                                      
allowed_packet_loss_short) )
+      num_received_short[peer_nr]++;
+      duration = GNUNET_TIME_absolute_get_duration (start_short[peer_nr]);
+      update_avg_latency (payload, peer_nr);
+      if ((num_sent_short[peer_nr] == burst_packets_short) &&
+          (num_received_short[peer_nr] >
+           burst_packets_short
+           / 100
+           *
+           allowed_packet_loss_short) )
       {
         LOG (GNUNET_ERROR_TYPE_MESSAGE,
-             "Short size packet test done.\n");
+             "Short size packet test for peer %u done.\n",
+             peer_nr);
         char *goodput = GNUNET_STRINGS_byte_size_fancy (
-          (SHORT_MESSAGE_SIZE * num_received_short * 1000 * 1000)
+          (SHORT_MESSAGE_SIZE * num_received_short[peer_nr] * 1000 * 1000)
           / duration.rel_value_us);
         LOG (GNUNET_ERROR_TYPE_MESSAGE,
              "%lu/%lu packets in %llu us (%s/s) -- avg latency: %llu us\n",
-             (unsigned long) num_received_short,
-             (unsigned long) num_sent_short,
+             (unsigned long) num_received_short[peer_nr],
+             (unsigned long) num_sent_short[peer_nr],
              (unsigned long long) duration.rel_value_us,
              goodput,
-             (unsigned long long) avg_latency);
+             (unsigned long long) avg_latency[peer_nr]);
         GNUNET_free (goodput);
         // start_long = GNUNET_TIME_absolute_get ();
         // phase = TP_BURST_LONG;
         // num_sent_short = 0;
-        avg_latency = 0;
+        avg_latency[peer_nr] = 0;
         // num_received = 0;
-        phase_short = GNUNET_NO;
-        choose_phase ();
+        phase_short[peer_nr] = GNUNET_NO;
+        choose_phase (get_tc_h (peer_nr));
         // long_test (NULL);
       }
       break;
@@ -775,37 +896,40 @@ incoming_message_cb (
              "Ignoring packet with wrong length\n");
         return;   // Ignore
       }
-      num_received_long++;
-      duration = GNUNET_TIME_absolute_get_duration (start_long);
-      update_avg_latency (payload);
-      if ((num_sent_long == burst_packets_long) && (num_received_long >
-                                                    burst_packets_long
-                                                    / 100
-                                                    * 
allowed_packet_loss_short) )
+      num_received_long[peer_nr]++;
+      duration = GNUNET_TIME_absolute_get_duration (start_long[peer_nr]);
+      update_avg_latency (payload, peer_nr);
+      if ((num_sent_long[peer_nr] == burst_packets_long) &&
+          (num_received_long[peer_nr] >
+           burst_packets_long
+           / 100
+           *
+           allowed_packet_loss_short) )
       {
         LOG (GNUNET_ERROR_TYPE_MESSAGE,
-             "Long size packet test done.\n");
+             "Long size packet test  for peer %u done.\n",
+             peer_nr);
         char *goodput = GNUNET_STRINGS_byte_size_fancy (
-          (long_message_size * num_received_long * 1000 * 1000)
+          (long_message_size * num_received_long[peer_nr] * 1000 * 1000)
           / duration.
           rel_value_us);
 
         LOG (GNUNET_ERROR_TYPE_MESSAGE,
              "%lu/%lu packets in %llu us (%s/s) -- avg latency: %llu us\n",
-             (unsigned long) num_received_long,
-             (unsigned long) num_sent_long,
+             (unsigned long) num_received_long[peer_nr],
+             (unsigned long) num_sent_long[peer_nr],
              (unsigned long long) duration.rel_value_us,
              goodput,
-             (unsigned long long) avg_latency);
+             (unsigned long long) avg_latency[peer_nr]);
         GNUNET_free (goodput);
-        ack = 0;
+        ack[peer_nr] = 0;
         // phase = TP_SIZE_CHECK;
         // num_received = 0;
         // num_sent_long = 0;
-        avg_latency = 0;
+        avg_latency[peer_nr] = 0;
         // size_test (NULL);
-        phase_long = GNUNET_NO;
-        choose_phase ();
+        phase_long[peer_nr] = GNUNET_NO;
+        choose_phase (get_tc_h (peer_nr));
       }
       break;
     }
@@ -813,39 +937,44 @@ incoming_message_cb (
     {
       size_t max_size = 64000;
 
-      GNUNET_assert (TP_SIZE_CHECK == phase);
+      GNUNET_assert (TP_SIZE_CHECK == phase[peer_nr]);
       if (LONG_MESSAGE_SIZE != long_message_size)
         max_size = long_message_size;
-      num_received_size++;
-      update_avg_latency (payload);
-      if (num_received_size >= (max_size) / 10)
+      num_received_size[peer_nr]++;
+      update_avg_latency (payload, peer_nr);
+      if ((GNUNET_YES == phase_size[peer_nr]) && (num_received_size[peer_nr] >=
+                                                  (max_size) / 10) )
       {
         LOG (GNUNET_ERROR_TYPE_MESSAGE,
-             "Size packet test done.\n");
+             "Size packet test  for peer %u done.\n",
+             peer_nr);
         LOG (GNUNET_ERROR_TYPE_MESSAGE,
              "%lu/%lu packets -- avg latency: %llu us\n",
-             (unsigned long) num_received_size,
-             (unsigned long) num_sent_size,
-             (unsigned long long) avg_latency);
-        num_received_size = 0;
-        num_sent_size = 0;
-        avg_latency = 0;
-        iterations_left--;
-        if (0 != iterations_left)
+             (unsigned long) num_received_size[peer_nr],
+             (unsigned long) num_sent_size[peer_nr],
+             (unsigned long long) avg_latency[peer_nr]);
+        iterations_left[peer_nr]--;
+        phase_size[peer_nr] = GNUNET_NO;
+        if (0 != iterations_left[peer_nr])
         {
           // start_short = GNUNET_TIME_absolute_get ();
           // phase = TP_BURST_SHORT;
-          num_sent_short = 0;
-          num_sent_long = 0;
-          num_received_short = 0;
-          num_received_long = 0;
+          num_received_size[peer_nr] = 0;
+          num_sent_size[peer_nr] = 0;
+          avg_latency[peer_nr] = 0;
+          num_sent_short[peer_nr] = 0;
+          num_sent_long[peer_nr] = 0;
+          num_received_short[peer_nr] = 0;
+          num_received_long[peer_nr] = 0;
           // short_test (NULL);
-          load_phase_config ();
-          choose_phase ();
-          break;
+          if (((PEER_A == peer_nr) && finished[PEER_B]) || ((PEER_B ==
+                                                             peer_nr) &&
+                                                            finished[PEER_A]))
+          {
+            load_phase_config ();
+          }
         }
-        phase_size = GNUNET_NO;
-        choose_phase ();
+        choose_phase (get_tc_h (peer_nr));
       }
       break;
     }
@@ -859,23 +988,23 @@ do_shutdown (void *cls)
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "shuting down test.\n");
 
-  if (NULL != box_stats)
-  {
-    GNUNET_STATISTICS_get_cancel (box_stats);
-    box_stats = NULL;
-  }
-  if (NULL != rekey_stats)
-  {
-    GNUNET_STATISTICS_get_cancel (rekey_stats);
-    rekey_stats = NULL;
-  }
-  if (NULL != to_task)
-  {
-    GNUNET_SCHEDULER_cancel (to_task);
-    to_task = NULL;
-  }
   for (unsigned int i = 0; i < NUM_PEERS; i++)
   {
+    if (NULL != box_stats[i])
+    {
+      GNUNET_STATISTICS_get_cancel (box_stats[i]);
+      box_stats[i] = NULL;
+    }
+    if (NULL != rekey_stats[i])
+    {
+      GNUNET_STATISTICS_get_cancel (rekey_stats[i]);
+      rekey_stats[i] = NULL;
+    }
+    if (NULL != to_task[i])
+    {
+      GNUNET_SCHEDULER_cancel (to_task[i]);
+      to_task[i] = NULL;
+    }
     GNUNET_TRANSPORT_TESTING_transport_communicator_service_stop (tc_hs[i]);
     GNUNET_STATISTICS_destroy (stats[i], GNUNET_NO);
   }
@@ -935,7 +1064,10 @@ main (int argc,
   char *test_mode;
   char *cfg_peer;
 
-  phase = TP_INIT;
+  iterations_left[0] = TOTAL_ITERATIONS;
+  iterations_left[1] = TOTAL_ITERATIONS;
+  phase[0] = TP_INIT;
+  phase[1] = TP_INIT;
   ret = 1;
   test_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]);
   communicator_name = strchr (test_name, '-');

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]