gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] branch master updated: split of set union from set service (pre


From: gnunet
Subject: [gnunet] branch master updated: split of set union from set service (preliminary)
Date: Sun, 16 Aug 2020 20:53:02 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new be0475f2a split of set union from set service (preliminary)
be0475f2a is described below

commit be0475f2a583d203465d3091ff933806a5ace613
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Sun Aug 16 20:46:39 2020 +0200

    split of set union from set service (preliminary)
---
 configure.ac                                    |    2 +
 po/POTFILES.in                                  |   19 +
 src/include/gnunet_protocols.h                  |  132 +
 src/setu/Makefile.am                            |  102 +
 src/setu/gnunet-service-setu.c                  | 3482 +++++++++++++++++++++++
 src/setu/gnunet-service-setu.h                  |  393 +++
 src/setu/gnunet-service-setu_protocol.h         |  226 ++
 src/setu/gnunet-service-setu_strata_estimator.c |  303 ++
 src/setu/gnunet-service-setu_strata_estimator.h |  169 ++
 src/setu/gnunet-setu-ibf-profiler.c             |  308 ++
 src/setu/gnunet-setu-profiler.c                 |  499 ++++
 src/setu/ibf.c                                  |  409 +++
 src/setu/ibf.h                                  |  255 ++
 src/setu/ibf_sim.c                              |  142 +
 src/setu/plugin_block_setu_test.c               |  123 +
 src/setu/setu.h                                 |  304 ++
 src/setu/setu_api.c                             |  872 ++++++
 src/setu/test_setu_api.c                        |  360 +++
 18 files changed, 8100 insertions(+)

diff --git a/configure.ac b/configure.ac
index e492242a6..bb205220c 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1937,6 +1937,8 @@ src/scalarproduct/Makefile
 src/scalarproduct/scalarproduct.conf
 src/set/Makefile
 src/set/set.conf
+src/setu/Makefile
+src/setu/setu.conf
 src/sq/Makefile
 src/statistics/Makefile
 src/statistics/statistics.conf
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 1f5cc81c3..c5503c343 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -333,6 +333,25 @@ src/set/ibf.c
 src/set/ibf_sim.c
 src/set/plugin_block_set_test.c
 src/set/set_api.c
+src/seti/gnunet-service-set.c
+src/seti/gnunet-service-set_intersection.c
+src/seti/gnunet-service-set_union.c
+src/seti/gnunet-service-set_union_strata_estimator.c
+src/seti/gnunet-set-ibf-profiler.c
+src/seti/gnunet-set-profiler.c
+src/seti/ibf.c
+src/seti/ibf_sim.c
+src/seti/plugin_block_set_test.c
+src/seti/set_api.c
+src/setu/gnunet-service-set_union.c
+src/setu/gnunet-service-set_union_strata_estimator.c
+src/setu/gnunet-service-setu.c
+src/setu/gnunet-setu-ibf-profiler.c
+src/setu/gnunet-setu-profiler.c
+src/setu/ibf.c
+src/setu/ibf_sim.c
+src/setu/plugin_block_setu_test.c
+src/setu/setu_api.c
 src/sq/sq.c
 src/sq/sq_exec.c
 src/sq/sq_prepare.c
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 5af58664f..c3fcde0b9 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1658,10 +1658,142 @@ extern "C" {
 #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT 547
 
 
+/*******************************************************************************
+ * SETU message types
+ 
******************************************************************************/
+
+
+/**
+ * Cancel a set operation
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_CANCEL 550
+
+/**
+ * Add element to set
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_ADD 551
+
+/**
+ * Create a new local set
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_CREATE 552
+
+/**
+ * Handle result message from operation
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_RESULT 553
+
+/**
+ * Evaluate a set operation
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_EVALUATE 554
+
+/**
+ * Listen for operation requests
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_LISTEN 555
+
+/**
+ * Reject a set request.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_REJECT 556
+
+/**
+ * Accept an incoming set request
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_ACCEPT 557
+
+/**
+ * Notify the client of an incoming request from a remote peer
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_REQUEST 558
+
+
+/**
+ * Demand the whole element from the other
+ * peer, given only the hash code.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL 565
+
+/**
+ * Demand the whole element from the other
+ * peer, given only the hash code.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND 566
+
+/**
+ * Tell the other peer to send us a list of
+ * hashes that match an IBF key.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY 567
+
+/**
+ * Tell the other peer which hashes match a
+ * given IBF key.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER 568
+
+/**
+ * Request a set union operation from a remote peer.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST 581
+
+/**
+ * Strata estimator.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE 582
+
+/**
+ * Invertible bloom filter.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF 583
+
+/**
+ * Actual set elements.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS 584
+
+/**
+ * Requests for the elements with the given hashes.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENT_REQUESTS 585
+
+/**
+ * Set operation is done.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE 586
+
+/**
+ * Compressed strata estimator.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC 590
+
+/**
+ * Request all missing elements from the other peer,
+ * based on their sets and the elements we previously sent
+ * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE 597
+
+/**
+ * Send a set element, not as response to a demand but because
+ * we're sending the full set.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT 598
+
+/**
+ * Request all missing elements from the other peer,
+ * based on their sets and the elements we previously sent
+ * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 599
+
+
 
/*******************************************************************************
  * SET message types
  
******************************************************************************/
 
+
 /**
  * Demand the whole element from the other
  * peer, given only the hash code.
diff --git a/src/setu/Makefile.am b/src/setu/Makefile.am
new file mode 100644
index 000000000..b37ceba51
--- /dev/null
+++ b/src/setu/Makefile.am
@@ -0,0 +1,102 @@
+# This Makefile.am is in the public domain
+AM_CPPFLAGS = -I$(top_srcdir)/src/include
+
+pkgcfgdir= $(pkgdatadir)/config.d/
+
+libexecdir= $(pkglibdir)/libexec/
+
+plugindir = $(libdir)/gnunet
+
+pkgcfg_DATA = \
+  setu.conf
+
+if USE_COVERAGE
+  AM_CFLAGS = -fprofile-arcs -ftest-coverage
+endif
+
+if HAVE_TESTING
+bin_PROGRAMS = \
+ gnunet-setu-profiler
+
+noinst_PROGRAMS = \
+ gnunet-setu-ibf-profiler
+endif
+
+libexec_PROGRAMS = \
+ gnunet-service-setu
+
+lib_LTLIBRARIES = \
+  libgnunetsetu.la
+
+gnunet_setu_profiler_SOURCES = \
+ gnunet-setu-profiler.c
+gnunet_setu_profiler_LDADD = \
+  $(top_builddir)/src/util/libgnunetutil.la \
+  $(top_builddir)/src/statistics/libgnunetstatistics.la \
+  libgnunetsetu.la \
+  $(top_builddir)/src/testing/libgnunettesting.la \
+  $(GN_LIBINTL)
+
+
+gnunet_setu_ibf_profiler_SOURCES = \
+ gnunet-setu-ibf-profiler.c \
+ ibf.c
+gnunet_setu_ibf_profiler_LDADD = \
+  $(top_builddir)/src/util/libgnunetutil.la \
+  $(GN_LIBINTL)
+
+gnunet_service_setu_SOURCES = \
+ gnunet-service-setu.c gnunet-service-setu.h \
+ ibf.c ibf.h \
+ gnunet-service-setu_strata_estimator.c gnunet-service-setu_strata_estimator.h 
\
+ gnunet-service-setu_protocol.h
+gnunet_service_setu_LDADD = \
+  $(top_builddir)/src/util/libgnunetutil.la \
+  $(top_builddir)/src/statistics/libgnunetstatistics.la \
+  $(top_builddir)/src/core/libgnunetcore.la \
+  $(top_builddir)/src/cadet/libgnunetcadet.la \
+  $(top_builddir)/src/block/libgnunetblock.la \
+  libgnunetsetu.la \
+  $(GN_LIBINTL)
+
+libgnunetsetu_la_SOURCES = \
+  setu_api.c setu.h
+libgnunetsetu_la_LIBADD = \
+  $(top_builddir)/src/util/libgnunetutil.la \
+  $(LTLIBINTL)
+libgnunetsetu_la_LDFLAGS = \
+  $(GN_LIB_LDFLAGS)
+
+if HAVE_TESTING
+check_PROGRAMS = \
+ test_setu_api
+endif
+
+if ENABLE_TEST_RUN
+AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export 
PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset 
XDG_CONFIG_HOME;
+TESTS = $(check_PROGRAMS)
+endif
+
+test_setu_api_SOURCES = \
+ test_setu_api.c
+test_setu_api_LDADD = \
+  $(top_builddir)/src/util/libgnunetutil.la \
+  $(top_builddir)/src/testing/libgnunettesting.la \
+  libgnunetsetu.la
+
+plugin_LTLIBRARIES = \
+  libgnunet_plugin_block_setu_test.la
+
+libgnunet_plugin_block_setu_test_la_SOURCES = \
+  plugin_block_setu_test.c
+libgnunet_plugin_block_setu_test_la_LIBADD = \
+  $(top_builddir)/src/block/libgnunetblock.la \
+  $(top_builddir)/src/block/libgnunetblockgroup.la \
+  $(top_builddir)/src/util/libgnunetutil.la \
+  $(LTLIBINTL)
+libgnunet_plugin_block_setu_test_la_LDFLAGS = \
+ $(GN_PLUGIN_LDFLAGS)
+
+
+EXTRA_DIST = \
+  test_setu.conf
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
new file mode 100644
index 000000000..88edc622f
--- /dev/null
+++ b/src/setu/gnunet-service-setu.c
@@ -0,0 +1,3482 @@
+/*
+      This file is part of GNUnet
+      Copyright (C) 2013-2017, 2020 GNUnet e.V.
+
+      GNUnet is free software: you can redistribute it and/or modify it
+      under the terms of the GNU Affero General Public License as published
+      by the Free Software Foundation, either version 3 of the License,
+      or (at your option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      Affero General Public License for more details.
+
+      You should have received a copy of the GNU Affero General Public License
+      along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file setu/gnunet-service-setu.c
+ * @brief set union operation
+ * @author Florian Dold
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_statistics_service.h"
+#include "gnunet-service-setu.h"
+#include "ibf.h"
+#include "gnunet-service-setu_strata_estimator.h"
+#include "gnunet-service-setu_protocol.h"
+#include "gnunet_statistics_service.h"
+#include <gcrypt.h>
+
+
+#define LOG(kind, ...) GNUNET_log_from (kind, "set-union", __VA_ARGS__)
+
+/**
+ * How long do we hold on to an incoming channel if there is
+ * no local listener before giving up?
+ */
+#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
+
+/**
+ * Number of IBFs in a strata estimator.
+ */
+#define SE_STRATA_COUNT 32
+
+/**
+ * Size of the IBFs in the strata estimator.
+ */
+#define SE_IBF_SIZE 80
+
+/**
+ * The hash num parameter for the difference digests and strata estimators.
+ */
+#define SE_IBF_HASH_NUM 4
+
+/**
+ * Number of buckets that can be transmitted in one message.
+ */
+#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
+
+/**
+ * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
+ * Choose this value so that computing the IBF is still cheaper
+ * than transmitting all values.
+ */
+#define MAX_IBF_ORDER (20)
+
+/**
+ * Number of buckets used in the ibf per estimated
+ * difference.
+ */
+#define IBF_ALPHA 4
+
+
+/**
+ * Current phase we are in for a union operation.
+ */
+enum UnionOperationPhase
+{
+  /**
+   * We sent the request message, and expect a strata estimator.
+   */
+  PHASE_EXPECT_SE,
+
+  /**
+   * We sent the strata estimator, and expect an IBF. This phase is entered 
once
+   * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
+   *
+   * XXX: could use better wording.
+   * XXX: repurposed to also expect a "request full set" message, should be 
renamed
+   *
+   * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
+   */
+  PHASE_EXPECT_IBF,
+
+  /**
+   * Continuation for multi part IBFs.
+   */
+  PHASE_EXPECT_IBF_CONT,
+
+  /**
+   * We are decoding an IBF.
+   */
+  PHASE_INVENTORY_ACTIVE,
+
+  /**
+   * The other peer is decoding the IBF we just sent.
+   */
+  PHASE_INVENTORY_PASSIVE,
+
+  /**
+   * The protocol is almost finished, but we still have to flush our message
+   * queue and/or expect some elements.
+   */
+  PHASE_FINISH_CLOSING,
+
+  /**
+   * In the penultimate phase,
+   * we wait until all our demands
+   * are satisfied.  Then we send a done
+   * message, and wait for another done message.
+   */
+  PHASE_FINISH_WAITING,
+
+  /**
+   * In the ultimate phase, we wait until
+   * our demands are satisfied and then
+   * quit (sending another DONE message).
+   */
+  PHASE_DONE,
+
+  /**
+   * After sending the full set, wait for responses with the elements
+   * that the local peer is missing.
+   */
+  PHASE_FULL_SENDING,
+};
+
+
+/**
+ * State of an evaluate operation with another peer.
+ */
+struct OperationState
+{
+  /**
+   * Copy of the set's strata estimator at the time of
+   * creation of this operation.
+   */
+  struct StrataEstimator *se;
+
+  /**
+   * The IBF we currently receive.
+   */
+  struct InvertibleBloomFilter *remote_ibf;
+
+  /**
+   * The IBF with the local set's element.
+   */
+  struct InvertibleBloomFilter *local_ibf;
+
+  /**
+   * Maps unsalted IBF-Keys to elements.
+   * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
+   * Colliding IBF-Keys are linked.
+   */
+  struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
+
+  /**
+   * Current state of the operation.
+   */
+  enum UnionOperationPhase phase;
+
+  /**
+   * Did we send the client that we are done?
+   */
+  int client_done_sent;
+
+  /**
+   * Number of ibf buckets already received into the @a remote_ibf.
+   */
+  unsigned int ibf_buckets_received;
+
+  /**
+   * Hashes for elements that we have demanded from the other peer.
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
+
+  /**
+   * Salt that we're using for sending IBFs
+   */
+  uint32_t salt_send;
+
+  /**
+   * Salt for the IBF we've received and that we're currently decoding.
+   */
+  uint32_t salt_receive;
+
+  /**
+   * Number of elements we received from the other peer
+   * that were not in the local set yet.
+   */
+  uint32_t received_fresh;
+
+  /**
+   * Total number of elements received from the other peer.
+   */
+  uint32_t received_total;
+
+  /**
+   * Initial size of our set, just before
+   * the operation started.
+   */
+  uint64_t initial_size;
+};
+
+
+/**
+ * The key entry is used to associate an ibf key with an element.
+ */
+struct KeyEntry
+{
+  /**
+   * IBF key for the entry, derived from the current salt.
+   */
+  struct IBF_Key ibf_key;
+
+  /**
+   * The actual element associated with the key.
+   *
+   * Only owned by the union operation if element->operation
+   * is #GNUNET_YES.
+   */
+  struct ElementEntry *element;
+
+  /**
+   * Did we receive this element?
+   * Even if element->is_foreign is false, we might
+   * have received the element, so this indicates that
+   * the other peer has it.
+   */
+  int received;
+};
+
+
+/**
+ * Used as a closure for sending elements
+ * with a specific IBF key.
+ */
+struct SendElementClosure
+{
+  /**
+   * The IBF key whose matching elements should be
+   * sent.
+   */
+  struct IBF_Key ibf_key;
+
+  /**
+   * Operation for which the elements
+   * should be sent.
+   */
+  struct Operation *op;
+};
+
+
+/**
+ * Extra state required for efficient set union.
+ */
+struct SetState
+{
+  /**
+   * The strata estimator is only generated once for
+   * each set.
+   * The IBF keys are derived from the element hashes with
+   * salt=0.
+   */
+  struct StrataEstimator *se;
+};
+
+
+/**
+ * A listener is inhabited by a client, and waits for evaluation
+ * requests from remote peers.
+ */
+struct Listener
+{
+  /**
+   * Listeners are held in a doubly linked list.
+   */
+  struct Listener *next;
+
+  /**
+   * Listeners are held in a doubly linked list.
+   */
+  struct Listener *prev;
+
+  /**
+   * Head of DLL of operations this listener is responsible for.
+   * Once the client has accepted/declined the operation, the
+   * operation is moved to the respective set's operation DLLS.
+   */
+  struct Operation *op_head;
+
+  /**
+   * Tail of DLL of operations this listener is responsible for.
+   * Once the client has accepted/declined the operation, the
+   * operation is moved to the respective set's operation DLLS.
+   */
+  struct Operation *op_tail;
+
+  /**
+   * Client that owns the listener.
+   * Only one client may own a listener.
+   */
+  struct ClientState *cs;
+
+  /**
+   * The port we are listening on with CADET.
+   */
+  struct GNUNET_CADET_Port *open_port;
+
+  /**
+   * Application ID for the operation, used to distinguish
+   * multiple operations of the same type with the same peer.
+   */
+  struct GNUNET_HashCode app_id;
+
+};
+
+
+/**
+ * Handle to the cadet service, used to listen for and connect to
+ * remote peers.
+ */
+static struct GNUNET_CADET_Handle *cadet;
+
+/**
+ * Statistics handle.
+ */
+struct GNUNET_STATISTICS_Handle *_GSS_statistics;
+
+/**
+ * Listeners are held in a doubly linked list.
+ */
+static struct Listener *listener_head;
+
+/**
+ * Listeners are held in a doubly linked list.
+ */
+static struct Listener *listener_tail;
+
+/**
+ * Number of active clients.
+ */
+static unsigned int num_clients;
+
+/**
+ * Are we in shutdown? if #GNUNET_YES and the number of clients
+ * drops to zero, disconnect from CADET.
+ */
+static int in_shutdown;
+
+/**
+ * Counter for allocating unique IDs for clients, used to identify
+ * incoming operation requests from remote peers, that the client can
+ * choose to accept or refuse.  0 must not be used (reserved for
+ * uninitialized).
+ */
+static uint32_t suggest_id;
+
+
+/**
+ * Iterator over hash map entries, called to
+ * destroy the linked list of colliding ibf key entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to iterate,
+ *         #GNUNET_NO if not.
+ */
+static int
+destroy_key_to_element_iter (void *cls,
+                             uint32_t key,
+                             void *value)
+{
+  struct KeyEntry *k = value;
+
+  GNUNET_assert (NULL != k);
+  if (GNUNET_YES == k->element->remote)
+  {
+    GNUNET_free (k->element);
+    k->element = NULL;
+  }
+  GNUNET_free (k);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Destroy the union operation.  Only things specific to the union
+ * operation are destroyed.
+ *
+ * @param op union operation to destroy
+ */
+static void
+union_op_cancel (struct Operation *op)
+{
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "destroying union op\n");
+  /* check if the op was canceled twice */
+  GNUNET_assert (NULL != op->state);
+  if (NULL != op->state->remote_ibf)
+  {
+    ibf_destroy (op->state->remote_ibf);
+    op->state->remote_ibf = NULL;
+  }
+  if (NULL != op->state->demanded_hashes)
+  {
+    GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
+    op->state->demanded_hashes = NULL;
+  }
+  if (NULL != op->state->local_ibf)
+  {
+    ibf_destroy (op->state->local_ibf);
+    op->state->local_ibf = NULL;
+  }
+  if (NULL != op->state->se)
+  {
+    strata_estimator_destroy (op->state->se);
+    op->state->se = NULL;
+  }
+  if (NULL != op->state->key_to_element)
+  {
+    GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+                                             &destroy_key_to_element_iter,
+                                             NULL);
+    GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
+    op->state->key_to_element = NULL;
+  }
+  GNUNET_free (op->state);
+  op->state = NULL;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "destroying union op done\n");
+}
+
+
+/**
+ * Inform the client that the union operation has failed,
+ * and proceed to destroy the evaluate operation.
+ *
+ * @param op the union operation to fail
+ */
+static void
+fail_union_operation (struct Operation *op)
+{
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_SETU_ResultMessage *msg;
+
+  LOG (GNUNET_ERROR_TYPE_WARNING,
+       "union operation failed\n");
+  ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
+  msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
+  msg->request_id = htonl (op->client_request_id);
+  msg->element_type = htons (0);
+  GNUNET_MQ_send (op->set->cs->mq,
+                  ev);
+  _GSS_operation_destroy (op);
+}
+
+
+/**
+ * Derive the IBF key from a hash code and
+ * a salt.
+ *
+ * @param src the hash code
+ * @return the derived IBF key
+ */
+static struct IBF_Key
+get_ibf_key (const struct GNUNET_HashCode *src)
+{
+  struct IBF_Key key;
+  uint16_t salt = 0;
+
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CRYPTO_kdf (&key, sizeof(key),
+                                    src, sizeof *src,
+                                    &salt, sizeof(salt),
+                                    NULL, 0));
+  return key;
+}
+
+
+/**
+ * Context for #op_get_element_iterator
+ */
+struct GetElementContext
+{
+  /**
+   * FIXME.
+   */
+  struct GNUNET_HashCode hash;
+
+  /**
+   * FIXME.
+   */
+  struct KeyEntry *k;
+};
+
+
+/**
+ * Iterator over the mapping from IBF keys to element entries.  Checks if we
+ * have an element with a given GNUNET_HashCode.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should search further,
+ *         #GNUNET_NO if we've found the element.
+ */
+static int
+op_get_element_iterator (void *cls,
+                         uint32_t key,
+                         void *value)
+{
+  struct GetElementContext *ctx = cls;
+  struct KeyEntry *k = value;
+
+  GNUNET_assert (NULL != k);
+  if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
+                                   &ctx->hash))
+  {
+    ctx->k = k;
+    return GNUNET_NO;
+  }
+  return GNUNET_YES;
+}
+
+
+/**
+ * Determine whether the given element is already in the operation's element
+ * set.
+ *
+ * @param op operation that should be tested for 'element_hash'
+ * @param element_hash hash of the element to look for
+ * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
+ */
+static struct KeyEntry *
+op_get_element (struct Operation *op,
+                const struct GNUNET_HashCode *element_hash)
+{
+  int ret;
+  struct IBF_Key ibf_key;
+  struct GetElementContext ctx = { { { 0 } }, 0 };
+
+  ctx.hash = *element_hash;
+
+  ibf_key = get_ibf_key (element_hash);
+  ret = GNUNET_CONTAINER_multihashmap32_get_multiple 
(op->state->key_to_element,
+                                                      (uint32_t) 
ibf_key.key_val,
+                                                      op_get_element_iterator,
+                                                      &ctx);
+
+  /* was the iteration aborted because we found the element? */
+  if (GNUNET_SYSERR == ret)
+  {
+    GNUNET_assert (NULL != ctx.k);
+    return ctx.k;
+  }
+  return NULL;
+}
+
+
+/**
+ * Insert an element into the union operation's
+ * key-to-element mapping. Takes ownership of 'ee'.
+ * Note that this does not insert the element in the set,
+ * only in the operation's key-element mapping.
+ * This is done to speed up re-tried operations, if some elements
+ * were transmitted, and then the IBF fails to decode.
+ *
+ * XXX: clarify ownership, doesn't sound right.
+ *
+ * @param op the union operation
+ * @param ee the element entry
+ * @parem received was this element received from the remote peer?
+ */
+static void
+op_register_element (struct Operation *op,
+                     struct ElementEntry *ee,
+                     int received)
+{
+  struct IBF_Key ibf_key;
+  struct KeyEntry *k;
+
+  ibf_key = get_ibf_key (&ee->element_hash);
+  k = GNUNET_new (struct KeyEntry);
+  k->element = ee;
+  k->ibf_key = ibf_key;
+  k->received = received;
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CONTAINER_multihashmap32_put 
(op->state->key_to_element,
+                                                      (uint32_t) 
ibf_key.key_val,
+                                                      k,
+                                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+}
+
+
+/**
+ * FIXME.
+ */
+static void
+salt_key (const struct IBF_Key *k_in,
+          uint32_t salt,
+          struct IBF_Key *k_out)
+{
+  int s = salt % 64;
+  uint64_t x = k_in->key_val;
+
+  /* rotate ibf key */
+  x = (x >> s) | (x << (64 - s));
+  k_out->key_val = x;
+}
+
+
+/**
+ * FIXME.
+ */
+static void
+unsalt_key (const struct IBF_Key *k_in,
+            uint32_t salt,
+            struct IBF_Key *k_out)
+{
+  int s = salt % 64;
+  uint64_t x = k_in->key_val;
+
+  x = (x << s) | (x >> (64 - s));
+  k_out->key_val = x;
+}
+
+
+/**
+ * Insert a key into an ibf.
+ *
+ * @param cls the ibf
+ * @param key unused
+ * @param value the key entry to get the key from
+ */
+static int
+prepare_ibf_iterator (void *cls,
+                      uint32_t key,
+                      void *value)
+{
+  struct Operation *op = cls;
+  struct KeyEntry *ke = value;
+  struct IBF_Key salted_key;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "[OP %x] inserting %lx (hash %s) into ibf\n",
+       (void *) op,
+       (unsigned long) ke->ibf_key.key_val,
+       GNUNET_h2s (&ke->element->element_hash));
+  salt_key (&ke->ibf_key,
+            op->state->salt_send,
+            &salted_key);
+  ibf_insert (op->state->local_ibf, salted_key);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Iterator for initializing the
+ * key-to-element mapping of a union operation
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ *        into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+init_key_to_element_iterator (void *cls,
+                              const struct GNUNET_HashCode *key,
+                              void *value)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee = value;
+
+  /* make sure that the element belongs to the set at the time
+   * of creating the operation */
+  if (GNUNET_NO ==
+      _GSS_is_element_of_operation (ee,
+                                    op))
+    return GNUNET_YES;
+  GNUNET_assert (GNUNET_NO == ee->remote);
+  op_register_element (op,
+                       ee,
+                       GNUNET_NO);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Initialize the IBF key to element mapping local to this set
+ * operation.
+ *
+ * @param op the set union operation
+ */
+static void
+initialize_key_to_element (struct Operation *op)
+{
+  unsigned int len;
+
+  GNUNET_assert (NULL == op->state->key_to_element);
+  len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
+  op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
+  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
+                                         &init_key_to_element_iterator,
+                                         op);
+}
+
+
+/**
+ * Create an ibf with the operation's elements
+ * of the specified size
+ *
+ * @param op the union operation
+ * @param size size of the ibf to create
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
+ */
+static int
+prepare_ibf (struct Operation *op,
+             uint32_t size)
+{
+  GNUNET_assert (NULL != op->state->key_to_element);
+
+  if (NULL != op->state->local_ibf)
+    ibf_destroy (op->state->local_ibf);
+  op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
+  if (NULL == op->state->local_ibf)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to allocate local IBF\n");
+    return GNUNET_SYSERR;
+  }
+  GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+                                           &prepare_ibf_iterator,
+                                           op);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Send an ibf of appropriate size.
+ *
+ * Fragments the IBF into multiple messages if necessary.
+ *
+ * @param op the union operation
+ * @param ibf_order order of the ibf to send, size=2^order
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
+ */
+static int
+send_ibf (struct Operation *op,
+          uint16_t ibf_order)
+{
+  unsigned int buckets_sent = 0;
+  struct InvertibleBloomFilter *ibf;
+
+  if (GNUNET_OK !=
+      prepare_ibf (op, 1 << ibf_order))
+  {
+    /* allocation failed */
+    return GNUNET_SYSERR;
+  }
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "sending ibf of size %u\n",
+       1 << ibf_order);
+
+  {
+    char name[64] = { 0 };
+    snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
+    GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
+  }
+
+  ibf = op->state->local_ibf;
+
+  while (buckets_sent < (1 << ibf_order))
+  {
+    unsigned int buckets_in_message;
+    struct GNUNET_MQ_Envelope *ev;
+    struct IBFMessage *msg;
+
+    buckets_in_message = (1 << ibf_order) - buckets_sent;
+    /* limit to maximum */
+    if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
+      buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
+
+    ev = GNUNET_MQ_msg_extra (msg,
+                              buckets_in_message * IBF_BUCKET_SIZE,
+                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
+    msg->reserved1 = 0;
+    msg->reserved2 = 0;
+    msg->order = ibf_order;
+    msg->offset = htonl (buckets_sent);
+    msg->salt = htonl (op->state->salt_send);
+    ibf_write_slice (ibf, buckets_sent,
+                     buckets_in_message, &msg[1]);
+    buckets_sent += buckets_in_message;
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "ibf chunk size %u, %u/%u sent\n",
+         buckets_in_message,
+         buckets_sent,
+         1 << ibf_order);
+    GNUNET_MQ_send (op->mq, ev);
+  }
+
+  /* The other peer must decode the IBF, so
+   * we're passive. */
+  op->state->phase = PHASE_INVENTORY_PASSIVE;
+  return GNUNET_OK;
+}
+
+
+/**
+ * Compute the necessary order of an ibf
+ * from the size of the symmetric set difference.
+ *
+ * @param diff the difference
+ * @return the required size of the ibf
+ */
+static unsigned int
+get_order_from_difference (unsigned int diff)
+{
+  unsigned int ibf_order;
+
+  ibf_order = 2;
+  while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
+          ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
+         (ibf_order < MAX_IBF_ORDER))
+    ibf_order++;
+  // add one for correction
+  return ibf_order + 1;
+}
+
+
+/**
+ * Send a set element.
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ *        into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+send_full_element_iterator (void *cls,
+                            const struct GNUNET_HashCode *key,
+                            void *value)
+{
+  struct Operation *op = cls;
+  struct GNUNET_SETU_ElementMessage *emsg;
+  struct ElementEntry *ee = value;
+  struct GNUNET_SETU_Element *el = &ee->element;
+  struct GNUNET_MQ_Envelope *ev;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Sending element %s\n",
+       GNUNET_h2s (key));
+  ev = GNUNET_MQ_msg_extra (emsg,
+                            el->size,
+                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+  emsg->element_type = htons (el->element_type);
+  GNUNET_memcpy (&emsg[1],
+                 el->data,
+                 el->size);
+  GNUNET_MQ_send (op->mq,
+                  ev);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Switch to full set transmission for @a op.
+ *
+ * @param op operation to switch to full set transmission.
+ */
+static void
+send_full_set (struct Operation *op)
+{
+  struct GNUNET_MQ_Envelope *ev;
+
+  op->state->phase = PHASE_FULL_SENDING;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Dedicing to transmit the full set\n");
+  /* FIXME: use a more memory-friendly way of doing this with an
+     iterator, just as we do in the non-full case! */
+  (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
+                                                &send_full_element_iterator,
+                                                op);
+  ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+  GNUNET_MQ_send (op->mq,
+                  ev);
+}
+
+
+/**
+ * Handle a strata estimator from a remote peer
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+static int
+check_union_p2p_strata_estimator (void *cls,
+                                  const struct StrataEstimatorMessage *msg)
+{
+  struct Operation *op = cls;
+  int is_compressed;
+  size_t len;
+
+  if (op->state->phase != PHASE_EXPECT_SE)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
+                     msg->header.type));
+  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
+  if ((GNUNET_NO == is_compressed) &&
+      (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle a strata estimator from a remote peer
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+static void
+handle_union_p2p_strata_estimator (void *cls,
+                                   const struct StrataEstimatorMessage *msg)
+{
+  struct Operation *op = cls;
+  struct StrataEstimator *remote_se;
+  unsigned int diff;
+  uint64_t other_size;
+  size_t len;
+  int is_compressed;
+
+  is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
+                     msg->header.type));
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# bytes of SE received",
+                            ntohs (msg->header.size),
+                            GNUNET_NO);
+  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
+  other_size = GNUNET_ntohll (msg->set_size);
+  remote_se = strata_estimator_create (SE_STRATA_COUNT,
+                                       SE_IBF_SIZE,
+                                       SE_IBF_HASH_NUM);
+  if (NULL == remote_se)
+  {
+    /* insufficient resources, fail */
+    fail_union_operation (op);
+    return;
+  }
+  if (GNUNET_OK !=
+      strata_estimator_read (&msg[1],
+                             len,
+                             is_compressed,
+                             remote_se))
+  {
+    /* decompression failed */
+    strata_estimator_destroy (remote_se);
+    fail_union_operation (op);
+    return;
+  }
+  GNUNET_assert (NULL != op->state->se);
+  diff = strata_estimator_difference (remote_se,
+                                      op->state->se);
+
+  if (diff > 200)
+    diff = diff * 3 / 2;
+
+  strata_estimator_destroy (remote_se);
+  strata_estimator_destroy (op->state->se);
+  op->state->se = NULL;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "got se diff=%d, using ibf size %d\n",
+       diff,
+       1U << get_order_from_difference (diff));
+
+  {
+    char *set_debug;
+
+    set_debug = getenv ("GNUNET_SETU_BENCHMARK");
+    if ((NULL != set_debug) &&
+        (0 == strcmp (set_debug, "1")))
+    {
+      FILE *f = fopen ("set.log", "a");
+      fprintf (f, "%llu\n", (unsigned long long) diff);
+      fclose (f);
+    }
+  }
+
+  if ((GNUNET_YES == op->byzantine) &&
+      (other_size < op->byzantine_lower_bound))
+  {
+    GNUNET_break (0);
+    fail_union_operation (op);
+    return;
+  }
+
+  if ((GNUNET_YES == op->force_full) ||
+      (diff > op->state->initial_size / 4) ||
+      (0 == other_size))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
+         diff,
+         op->state->initial_size);
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# of full sends",
+                              1,
+                              GNUNET_NO);
+    if ((op->state->initial_size <= other_size) ||
+        (0 == other_size))
+    {
+      send_full_set (op);
+    }
+    else
+    {
+      struct GNUNET_MQ_Envelope *ev;
+
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Telling other peer that we expect its full set\n");
+      op->state->phase = PHASE_EXPECT_IBF;
+      ev = GNUNET_MQ_msg_header (
+        GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
+      GNUNET_MQ_send (op->mq,
+                      ev);
+    }
+  }
+  else
+  {
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# of ibf sends",
+                              1,
+                              GNUNET_NO);
+    if (GNUNET_OK !=
+        send_ibf (op,
+                  get_order_from_difference (diff)))
+    {
+      /* Internal error, best we can do is shut the connection */
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to send IBF, closing connection\n");
+      fail_union_operation (op);
+      return;
+    }
+  }
+  GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Iterator to send elements to a remote peer
+ *
+ * @param cls closure with the element key and the union operation
+ * @param key ignored
+ * @param value the key entry
+ */
+static int
+send_offers_iterator (void *cls,
+                      uint32_t key,
+                      void *value)
+{
+  struct SendElementClosure *sec = cls;
+  struct Operation *op = sec->op;
+  struct KeyEntry *ke = value;
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_MessageHeader *mh;
+
+  /* Detect 32-bit key collision for the 64-bit IBF keys. */
+  if (ke->ibf_key.key_val != sec->ibf_key.key_val)
+    return GNUNET_YES;
+
+  ev = GNUNET_MQ_msg_header_extra (mh,
+                                   sizeof(struct GNUNET_HashCode),
+                                   GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
+
+  GNUNET_assert (NULL != ev);
+  *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "[OP %x] sending element offer (%s) to peer\n",
+       (void *) op,
+       GNUNET_h2s (&ke->element->element_hash));
+  GNUNET_MQ_send (op->mq, ev);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the 
given IBF key.
+ *
+ * @param op union operation
+ * @param ibf_key IBF key of interest
+ */
+static void
+send_offers_for_key (struct Operation *op,
+                     struct IBF_Key ibf_key)
+{
+  struct SendElementClosure send_cls;
+
+  send_cls.ibf_key = ibf_key;
+  send_cls.op = op;
+  (void) GNUNET_CONTAINER_multihashmap32_get_multiple (
+    op->state->key_to_element,
+    (uint32_t) ibf_key.
+    key_val,
+    &send_offers_iterator,
+    &send_cls);
+}
+
+
+/**
+ * Decode which elements are missing on each side, and
+ * send the appropriate offers and inquiries.
+ *
+ * @param op union operation
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
+ */
+static int
+decode_and_send (struct Operation *op)
+{
+  struct IBF_Key key;
+  struct IBF_Key last_key;
+  int side;
+  unsigned int num_decoded;
+  struct InvertibleBloomFilter *diff_ibf;
+
+  GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
+
+  if (GNUNET_OK !=
+      prepare_ibf (op,
+                   op->state->remote_ibf->size))
+  {
+    GNUNET_break (0);
+    /* allocation failed */
+    return GNUNET_SYSERR;
+  }
+  diff_ibf = ibf_dup (op->state->local_ibf);
+  ibf_subtract (diff_ibf,
+                op->state->remote_ibf);
+
+  ibf_destroy (op->state->remote_ibf);
+  op->state->remote_ibf = NULL;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "decoding IBF (size=%u)\n",
+       diff_ibf->size);
+
+  num_decoded = 0;
+  key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable 
*/
+
+  while (1)
+  {
+    int res;
+    int cycle_detected = GNUNET_NO;
+
+    last_key = key;
+
+    res = ibf_decode (diff_ibf, &side, &key);
+    if (res == GNUNET_OK)
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "decoded ibf key %lx\n",
+           (unsigned long) key.key_val);
+      num_decoded += 1;
+      if ((num_decoded > diff_ibf->size) ||
+          ((num_decoded > 1) &&
+           (last_key.key_val == key.key_val)))
+      {
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "detected cyclic ibf (decoded %u/%u)\n",
+             num_decoded,
+             diff_ibf->size);
+        cycle_detected = GNUNET_YES;
+      }
+    }
+    if ((GNUNET_SYSERR == res) ||
+        (GNUNET_YES == cycle_detected))
+    {
+      int next_order;
+      next_order = 0;
+      while (1 << next_order < diff_ibf->size)
+        next_order++;
+      next_order++;
+      if (next_order <= MAX_IBF_ORDER)
+      {
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "decoding failed, sending larger ibf (size %u)\n",
+             1 << next_order);
+        GNUNET_STATISTICS_update (_GSS_statistics,
+                                  "# of IBF retries",
+                                  1,
+                                  GNUNET_NO);
+        op->state->salt_send++;
+        if (GNUNET_OK !=
+            send_ibf (op, next_order))
+        {
+          /* Internal error, best we can do is shut the connection */
+          GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                      "Failed to send IBF, closing connection\n");
+          fail_union_operation (op);
+          ibf_destroy (diff_ibf);
+          return GNUNET_SYSERR;
+        }
+      }
+      else
+      {
+        GNUNET_STATISTICS_update (_GSS_statistics,
+                                  "# of failed union operations (too large)",
+                                  1,
+                                  GNUNET_NO);
+        // XXX: Send the whole set, element-by-element
+        LOG (GNUNET_ERROR_TYPE_ERROR,
+             "set union failed: reached ibf limit\n");
+        fail_union_operation (op);
+        ibf_destroy (diff_ibf);
+        return GNUNET_SYSERR;
+      }
+      break;
+    }
+    if (GNUNET_NO == res)
+    {
+      struct GNUNET_MQ_Envelope *ev;
+
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "transmitted all values, sending DONE\n");
+      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
+      GNUNET_MQ_send (op->mq, ev);
+      /* We now wait until we get a DONE message back
+       * and then wait for our MQ to be flushed and all our
+       * demands be delivered. */
+      break;
+    }
+    if (1 == side)
+    {
+      struct IBF_Key unsalted_key;
+
+      unsalt_key (&key,
+                  op->state->salt_receive,
+                  &unsalted_key);
+      send_offers_for_key (op,
+                           unsalted_key);
+    }
+    else if (-1 == side)
+    {
+      struct GNUNET_MQ_Envelope *ev;
+      struct InquiryMessage *msg;
+
+      /* It may be nice to merge multiple requests, but with CADET's corking 
it is not worth
+       * the effort additional complexity. */
+      ev = GNUNET_MQ_msg_extra (msg,
+                                sizeof(struct IBF_Key),
+                                GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
+      msg->salt = htonl (op->state->salt_receive);
+      GNUNET_memcpy (&msg[1],
+                     &key,
+                     sizeof(struct IBF_Key));
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "sending element inquiry for IBF key %lx\n",
+           (unsigned long) key.key_val);
+      GNUNET_MQ_send (op->mq, ev);
+    }
+    else
+    {
+      GNUNET_assert (0);
+    }
+  }
+  ibf_destroy (diff_ibf);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Check an IBF message from a remote peer.
+ *
+ * Reassemble the IBF from multiple pieces, and
+ * process the whole IBF once possible.
+ *
+ * @param cls the union operation
+ * @param msg the header of the message
+ * @return #GNUNET_OK if @a msg is well-formed
+ */
+static int
+check_union_p2p_ibf (void *cls,
+                     const struct IBFMessage *msg)
+{
+  struct Operation *op = cls;
+  unsigned int buckets_in_message;
+
+  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
+                       / IBF_BUCKET_SIZE;
+  if (0 == buckets_in_message)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
+      * IBF_BUCKET_SIZE)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (op->state->phase == PHASE_EXPECT_IBF_CONT)
+  {
+    if (ntohl (msg->offset) != op->state->ibf_buckets_received)
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }
+    if (1 << msg->order != op->state->remote_ibf->size)
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }
+    if (ntohl (msg->salt) != op->state->salt_receive)
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }
+  }
+  else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
+           (op->state->phase != PHASE_EXPECT_IBF))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle an IBF message from a remote peer.
+ *
+ * Reassemble the IBF from multiple pieces, and
+ * process the whole IBF once possible.
+ *
+ * @param cls the union operation
+ * @param msg the header of the message
+ */
+static void
+handle_union_p2p_ibf (void *cls,
+                      const struct IBFMessage *msg)
+{
+  struct Operation *op = cls;
+  unsigned int buckets_in_message;
+
+  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
+                       / IBF_BUCKET_SIZE;
+  if ((op->state->phase == PHASE_INVENTORY_PASSIVE) ||
+      (op->state->phase == PHASE_EXPECT_IBF))
+  {
+    op->state->phase = PHASE_EXPECT_IBF_CONT;
+    GNUNET_assert (NULL == op->state->remote_ibf);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Creating new ibf of size %u\n",
+         1 << msg->order);
+    op->state->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
+    op->state->salt_receive = ntohl (msg->salt);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Receiving new IBF with salt %u\n",
+         op->state->salt_receive);
+    if (NULL == op->state->remote_ibf)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to parse remote IBF, closing connection\n");
+      fail_union_operation (op);
+      return;
+    }
+    op->state->ibf_buckets_received = 0;
+    if (0 != ntohl (msg->offset))
+    {
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return;
+    }
+  }
+  else
+  {
+    GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received more of IBF\n");
+  }
+  GNUNET_assert (NULL != op->state->remote_ibf);
+
+  ibf_read_slice (&msg[1],
+                  op->state->ibf_buckets_received,
+                  buckets_in_message,
+                  op->state->remote_ibf);
+  op->state->ibf_buckets_received += buckets_in_message;
+
+  if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "received full ibf\n");
+    op->state->phase = PHASE_INVENTORY_ACTIVE;
+    if (GNUNET_OK !=
+        decode_and_send (op))
+    {
+      /* Internal error, best we can do is shut down */
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to decode IBF, closing connection\n");
+      fail_union_operation (op);
+      return;
+    }
+  }
+  GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Send a result message to the client indicating
+ * that there is a new element.
+ *
+ * @param op union operation
+ * @param element element to send
+ * @param status status to send with the new element
+ */
+static void
+send_client_element (struct Operation *op,
+                     const struct GNUNET_SETU_Element *element,
+                     int status)
+{
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_SETU_ResultMessage *rm;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "sending element (size %u) to client\n",
+       element->size);
+  GNUNET_assert (0 != op->client_request_id);
+  ev = GNUNET_MQ_msg_extra (rm,
+                            element->size,
+                            GNUNET_MESSAGE_TYPE_SET_RESULT);
+  if (NULL == ev)
+  {
+    GNUNET_MQ_discard (ev);
+    GNUNET_break (0);
+    return;
+  }
+  rm->result_status = htons (status);
+  rm->request_id = htonl (op->client_request_id);
+  rm->element_type = htons (element->element_type);
+  rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
+                                      op->state->key_to_element));
+  GNUNET_memcpy (&rm[1],
+                 element->data,
+                 element->size);
+  GNUNET_MQ_send (op->set->cs->mq,
+                  ev);
+}
+
+
+/**
+ * Signal to the client that the operation has finished and
+ * destroy the operation.
+ *
+ * @param cls operation to destroy
+ */
+static void
+send_client_done (void *cls)
+{
+  struct Operation *op = cls;
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_SETU_ResultMessage *rm;
+
+  if (GNUNET_YES == op->state->client_done_sent)
+  {
+    return;
+  }
+
+  if (PHASE_DONE != op->state->phase)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Union operation failed\n");
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# Union operations failed",
+                              1,
+                              GNUNET_NO);
+    ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
+    rm->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
+    rm->request_id = htonl (op->client_request_id);
+    rm->element_type = htons (0);
+    GNUNET_MQ_send (op->set->cs->mq,
+                    ev);
+    return;
+  }
+
+  op->state->client_done_sent = GNUNET_YES;
+
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# Union operations succeeded",
+                            1,
+                            GNUNET_NO);
+  LOG (GNUNET_ERROR_TYPE_INFO,
+       "Signalling client that union operation is done\n");
+  ev = GNUNET_MQ_msg (rm,
+                      GNUNET_MESSAGE_TYPE_SET_RESULT);
+  rm->request_id = htonl (op->client_request_id);
+  rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
+  rm->element_type = htons (0);
+  rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
+                                      op->state->key_to_element));
+  GNUNET_MQ_send (op->set->cs->mq,
+                  ev);
+}
+
+
+/**
+ * Tests if the operation is finished, and if so notify.
+ *
+ * @param op operation to check
+ */
+static void
+maybe_finish (struct Operation *op)
+{
+  unsigned int num_demanded;
+
+  num_demanded = GNUNET_CONTAINER_multihashmap_size (
+    op->state->demanded_hashes);
+
+  if (PHASE_FINISH_WAITING == op->state->phase)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "In PHASE_FINISH_WAITING, pending %u demands\n",
+         num_demanded);
+    if (0 == num_demanded)
+    {
+      struct GNUNET_MQ_Envelope *ev;
+
+      op->state->phase = PHASE_DONE;
+      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
+      GNUNET_MQ_send (op->mq,
+                      ev);
+      /* We now wait until the other peer sends P2P_OVER
+       * after it got all elements from us. */
+    }
+  }
+  if (PHASE_FINISH_CLOSING == op->state->phase)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "In PHASE_FINISH_CLOSING, pending %u demands\n",
+         num_demanded);
+    if (0 == num_demanded)
+    {
+      op->state->phase = PHASE_DONE;
+      send_client_done (op);
+      _GSS_operation_destroy2 (op);
+    }
+  }
+}
+
+
+/**
+ * Check an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+static int
+check_union_p2p_elements (void *cls,
+                          const struct GNUNET_SETU_ElementMessage *emsg)
+{
+  struct Operation *op = cls;
+
+  if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle an element message from a remote peer.
+ * Sent by the other peer either because we decoded an IBF and placed a demand,
+ * or because the other peer switched to full set transmission.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+static void
+handle_union_p2p_elements (void *cls,
+                           const struct GNUNET_SETU_ElementMessage *emsg)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  struct KeyEntry *ke;
+  uint16_t element_size;
+
+  element_size = ntohs (emsg->header.size) - sizeof(struct
+                                                    
GNUNET_SETU_ElementMessage);
+  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
+  GNUNET_memcpy (&ee[1],
+                 &emsg[1],
+                 element_size);
+  ee->element.size = element_size;
+  ee->element.data = &ee[1];
+  ee->element.element_type = ntohs (emsg->element_type);
+  ee->remote = GNUNET_YES;
+  GNUNET_SETU_element_hash (&ee->element,
+                            &ee->element_hash);
+  if (GNUNET_NO ==
+      GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
+                                            &ee->element_hash,
+                                            NULL))
+  {
+    /* We got something we didn't demand, since it's not in our map. */
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got element (size %u, hash %s) from peer\n",
+       (unsigned int) element_size,
+       GNUNET_h2s (&ee->element_hash));
+
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# received elements",
+                            1,
+                            GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# exchanged elements",
+                            1,
+                            GNUNET_NO);
+
+  op->state->received_total++;
+
+  ke = op_get_element (op, &ee->element_hash);
+  if (NULL != ke)
+  {
+    /* Got repeated element.  Should not happen since
+     * we track demands. */
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# repeated elements",
+                              1,
+                              GNUNET_NO);
+    ke->received = GNUNET_YES;
+    GNUNET_free (ee);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Registering new element from remote peer\n");
+    op->state->received_fresh++;
+    op_register_element (op, ee, GNUNET_YES);
+    /* only send results immediately if the client wants it */
+    send_client_element (op,
+                         &ee->element,
+                         GNUNET_SETU_STATUS_ADD_LOCAL);
+  }
+
+  if ((op->state->received_total > 8) &&
+      (op->state->received_fresh < op->state->received_total / 3))
+  {
+    /* The other peer gave us lots of old elements, there's something wrong. */
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+  GNUNET_CADET_receive_done (op->channel);
+  maybe_finish (op);
+}
+
+
+/**
+ * Check a full element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+static int
+check_union_p2p_full_element (void *cls,
+                              const struct GNUNET_SETU_ElementMessage *emsg)
+{
+  struct Operation *op = cls;
+
+  (void) op;
+  // FIXME: check that we expect full elements here?
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+static void
+handle_union_p2p_full_element (void *cls,
+                               const struct GNUNET_SETU_ElementMessage *emsg)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  struct KeyEntry *ke;
+  uint16_t element_size;
+
+  element_size = ntohs (emsg->header.size)
+                 - sizeof(struct GNUNET_SETU_ElementMessage);
+  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
+  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
+  ee->element.size = element_size;
+  ee->element.data = &ee[1];
+  ee->element.element_type = ntohs (emsg->element_type);
+  ee->remote = GNUNET_YES;
+  GNUNET_SETU_element_hash (&ee->element, &ee->element_hash);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got element (full diff, size %u, hash %s) from peer\n",
+       (unsigned int) element_size,
+       GNUNET_h2s (&ee->element_hash));
+
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# received elements",
+                            1,
+                            GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# exchanged elements",
+                            1,
+                            GNUNET_NO);
+
+  op->state->received_total++;
+
+  ke = op_get_element (op, &ee->element_hash);
+  if (NULL != ke)
+  {
+    /* Got repeated element.  Should not happen since
+     * we track demands. */
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# repeated elements",
+                              1,
+                              GNUNET_NO);
+    ke->received = GNUNET_YES;
+    GNUNET_free (ee);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Registering new element from remote peer\n");
+    op->state->received_fresh++;
+    op_register_element (op, ee, GNUNET_YES);
+    /* only send results immediately if the client wants it */
+    send_client_element (op,
+                         &ee->element,
+                         GNUNET_SETU_STATUS_ADD_LOCAL);
+  }
+
+  if ((GNUNET_YES == op->byzantine) &&
+      (op->state->received_total > 384 + op->state->received_fresh * 4) &&
+      (op->state->received_fresh < op->state->received_total / 6))
+  {
+    /* The other peer gave us lots of old elements, there's something wrong. */
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "Other peer sent only %llu/%llu fresh elements, failing operation\n",
+         (unsigned long long) op->state->received_fresh,
+         (unsigned long long) op->state->received_total);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+  GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Send offers (for GNUNET_Hash-es) in response
+ * to inquiries (for IBF_Key-s).
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+static int
+check_union_p2p_inquiry (void *cls,
+                         const struct InquiryMessage *msg)
+{
+  struct Operation *op = cls;
+  unsigned int num_keys;
+
+  if (op->state->phase != PHASE_INVENTORY_PASSIVE)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
+             / sizeof(struct IBF_Key);
+  if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
+      != num_keys * sizeof(struct IBF_Key))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Send offers (for GNUNET_Hash-es) in response
+ * to inquiries (for IBF_Key-s).
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+static void
+handle_union_p2p_inquiry (void *cls,
+                          const struct InquiryMessage *msg)
+{
+  struct Operation *op = cls;
+  const struct IBF_Key *ibf_key;
+  unsigned int num_keys;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received union inquiry\n");
+  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
+             / sizeof(struct IBF_Key);
+  ibf_key = (const struct IBF_Key *) &msg[1];
+  while (0 != num_keys--)
+  {
+    struct IBF_Key unsalted_key;
+
+    unsalt_key (ibf_key,
+                ntohl (msg->salt),
+                &unsalted_key);
+    send_offers_for_key (op,
+                         unsalted_key);
+    ibf_key++;
+  }
+  GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Iterator over hash map entries, called to
+ * destroy the linked list of colliding ibf key entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to iterate,
+ *         #GNUNET_NO if not.
+ */
+static int
+send_missing_full_elements_iter (void *cls,
+                                 uint32_t key,
+                                 void *value)
+{
+  struct Operation *op = cls;
+  struct KeyEntry *ke = value;
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_SETU_ElementMessage *emsg;
+  struct ElementEntry *ee = ke->element;
+
+  if (GNUNET_YES == ke->received)
+    return GNUNET_YES;
+  ev = GNUNET_MQ_msg_extra (emsg,
+                            ee->element.size,
+                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+  GNUNET_memcpy (&emsg[1],
+                 ee->element.data,
+                 ee->element.size);
+  emsg->element_type = htons (ee->element.element_type);
+  GNUNET_MQ_send (op->mq,
+                  ev);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Handle a request for full set transmission.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_union_p2p_request_full (void *cls,
+                               const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received request for full set transmission\n");
+  if (PHASE_EXPECT_IBF != op->state->phase)
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
+  // FIXME: we need to check that our set is larger than the
+  // byzantine_lower_bound by some threshold
+  send_full_set (op);
+  GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Handle a "full done" message.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_union_p2p_full_done (void *cls,
+                            const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+
+  switch (op->state->phase)
+  {
+  case PHASE_EXPECT_IBF:
+    {
+      struct GNUNET_MQ_Envelope *ev;
+
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "got FULL DONE, sending elements that other peer is missing\n");
+
+      /* send all the elements that did not come from the remote peer */
+      GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+                                               
&send_missing_full_elements_iter,
+                                               op);
+
+      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+      GNUNET_MQ_send (op->mq,
+                      ev);
+      op->state->phase = PHASE_DONE;
+      /* we now wait until the other peer sends us the OVER message*/
+    }
+    break;
+
+  case PHASE_FULL_SENDING:
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "got FULL DONE, finishing\n");
+      /* We sent the full set, and got the response for that.  We're done. */
+      op->state->phase = PHASE_DONE;
+      GNUNET_CADET_receive_done (op->channel);
+      send_client_done (op);
+      _GSS_operation_destroy2 (op);
+      return;
+    }
+    break;
+
+  default:
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Handle full done phase is %u\n",
+                (unsigned) op->state->phase);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+  GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Check a demand by the other peer for elements based on a list
+ * of `struct GNUNET_HashCode`s.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ * @return #GNUNET_OK if @a mh is well-formed
+ */
+static int
+check_union_p2p_demand (void *cls,
+                        const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  unsigned int num_hashes;
+
+  (void) op;
+  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
+               / sizeof(struct GNUNET_HashCode);
+  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
+      != num_hashes * sizeof(struct GNUNET_HashCode))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle a demand by the other peer for elements based on a list
+ * of `struct GNUNET_HashCode`s.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_union_p2p_demand (void *cls,
+                         const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  struct GNUNET_SETU_ElementMessage *emsg;
+  const struct GNUNET_HashCode *hash;
+  unsigned int num_hashes;
+  struct GNUNET_MQ_Envelope *ev;
+
+  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
+               / sizeof(struct GNUNET_HashCode);
+  for (hash = (const struct GNUNET_HashCode *) &mh[1];
+       num_hashes > 0;
+       hash++, num_hashes--)
+  {
+    ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
+                                            hash);
+    if (NULL == ee)
+    {
+      /* Demand for non-existing element. */
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return;
+    }
+    if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
+    {
+      /* Probably confused lazily copied sets. */
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return;
+    }
+    ev = GNUNET_MQ_msg_extra (emsg, ee->element.size,
+                              GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
+    GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
+    emsg->reserved = htons (0);
+    emsg->element_type = htons (ee->element.element_type);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
+         (void *) op,
+         (unsigned int) ee->element.size,
+         GNUNET_h2s (&ee->element_hash));
+    GNUNET_MQ_send (op->mq, ev);
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# exchanged elements",
+                              1,
+                              GNUNET_NO);
+  }
+  GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Check offer (of `struct GNUNET_HashCode`s).
+ *
+ * @param cls the union operation
+ * @param mh the message
+ * @return #GNUNET_OK if @a mh is well-formed
+ */
+static int
+check_union_p2p_offer (void *cls,
+                       const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  unsigned int num_hashes;
+
+  /* look up elements and send them */
+  if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
+      (op->state->phase != PHASE_INVENTORY_ACTIVE))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
+               / sizeof(struct GNUNET_HashCode);
+  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
+      num_hashes * sizeof(struct GNUNET_HashCode))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle offers (of `struct GNUNET_HashCode`s) and
+ * respond with demands (of `struct GNUNET_HashCode`s).
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_union_p2p_offer (void *cls,
+                        const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  const struct GNUNET_HashCode *hash;
+  unsigned int num_hashes;
+
+  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
+               / sizeof(struct GNUNET_HashCode);
+  for (hash = (const struct GNUNET_HashCode *) &mh[1];
+       num_hashes > 0;
+       hash++, num_hashes--)
+  {
+    struct ElementEntry *ee;
+    struct GNUNET_MessageHeader *demands;
+    struct GNUNET_MQ_Envelope *ev;
+
+    ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
+                                            hash);
+    if (NULL != ee)
+      if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
+        continue;
+
+    if (GNUNET_YES ==
+        GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
+                                                hash))
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Skipped sending duplicate demand\n");
+      continue;
+    }
+
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multihashmap_put (
+                     op->state->demanded_hashes,
+                     hash,
+                     NULL,
+                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "[OP %x] Requesting element (hash %s)\n",
+         (void *) op, GNUNET_h2s (hash));
+    ev = GNUNET_MQ_msg_header_extra (demands,
+                                     sizeof(struct GNUNET_HashCode),
+                                     GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
+    GNUNET_memcpy (&demands[1],
+                   hash,
+                   sizeof(struct GNUNET_HashCode));
+    GNUNET_MQ_send (op->mq, ev);
+  }
+  GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Handle a done message from a remote peer
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_union_p2p_done (void *cls,
+                       const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+
+  switch (op->state->phase)
+  {
+  case PHASE_INVENTORY_PASSIVE:
+    /* We got all requests, but still have to send our elements in response. */
+    op->state->phase = PHASE_FINISH_WAITING;
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "got DONE (as passive partner), waiting for our demands to be 
satisfied\n");
+    /* The active peer is done sending offers
+     * and inquiries.  This means that all
+     * our responses to that (demands and offers)
+     * must be in flight (queued or in mesh).
+     *
+     * We should notify the active peer once
+     * all our demands are satisfied, so that the active
+     * peer can quit if we gave it everything.
+     */GNUNET_CADET_receive_done (op->channel);
+    maybe_finish (op);
+    return;
+
+  case PHASE_INVENTORY_ACTIVE:
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "got DONE (as active partner), waiting to finish\n");
+    /* All demands of the other peer are satisfied,
+     * and we processed all offers, thus we know
+     * exactly what our demands must be.
+     *
+     * We'll close the channel
+     * to the other peer once our demands are met.
+     */op->state->phase = PHASE_FINISH_CLOSING;
+    GNUNET_CADET_receive_done (op->channel);
+    maybe_finish (op);
+    return;
+
+  default:
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+}
+
+
+/**
+ * Handle a over message from a remote peer
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_union_p2p_over (void *cls,
+                       const struct GNUNET_MessageHeader *mh)
+{
+  send_client_done (cls);
+}
+
+
+/**
+ * Initiate operation to evaluate a set union with a remote peer.
+ *
+ * @param op operation to perform (to be initialized)
+ * @param opaque_context message to be transmitted to the listener
+ *        to convince it to accept, may be NULL
+ */
+static struct OperationState *
+union_evaluate (struct Operation *op,
+                const struct GNUNET_MessageHeader *opaque_context)
+{
+  struct OperationState *state;
+  struct GNUNET_MQ_Envelope *ev;
+  struct OperationRequestMessage *msg;
+
+  ev = GNUNET_MQ_msg_nested_mh (msg,
+                                GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
+                                opaque_context);
+  if (NULL == ev)
+  {
+    /* the context message is too large */
+    GNUNET_break (0);
+    return NULL;
+  }
+  state = GNUNET_new (struct OperationState);
+  state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
+                                                                 GNUNET_NO);
+  /* copy the current generation's strata estimator for this operation */
+  state->se = strata_estimator_dup (op->set->state->se);
+  /* we started the operation, thus we have to send the operation request */
+  state->phase = PHASE_EXPECT_SE;
+  state->salt_receive = state->salt_send = 42; // FIXME?????
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Initiating union operation evaluation\n");
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# of total union operations",
+                            1,
+                            GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# of initiated union operations",
+                            1,
+                            GNUNET_NO);
+  GNUNET_MQ_send (op->mq,
+                  ev);
+
+  if (NULL != opaque_context)
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "sent op request with context message\n");
+  else
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "sent op request without context message\n");
+
+  op->state = state;
+  initialize_key_to_element (op);
+  state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
+    state->key_to_element);
+  return state;
+}
+
+
+/**
+ * Get the incoming socket associated with the given id.
+ *
+ * @param listener the listener to look in
+ * @param id id to look for
+ * @return the incoming socket associated with the id,
+ *         or NULL if there is none
+ */
+static struct Operation *
+get_incoming (uint32_t id)
+{
+  for (struct Listener *listener = listener_head;
+       NULL != listener;
+       listener = listener->next)
+  {
+    for (struct Operation *op = listener->op_head;
+         NULL != op;
+         op = op->next)
+      if (op->suggest_id == id)
+        return op;
+  }
+  return NULL;
+}
+
+
+/**
+ * Destroy an incoming request from a remote peer
+ *
+ * @param op remote request to destroy
+ */
+static void
+incoming_destroy (struct Operation *op)
+{
+  struct Listener *listener;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Destroying incoming operation %p\n",
+              op);
+  if (NULL != (listener = op->listener))
+  {
+    GNUNET_CONTAINER_DLL_remove (listener->op_head,
+                                 listener->op_tail,
+                                 op);
+    op->listener = NULL;
+  }
+  if (NULL != op->timeout_task)
+  {
+    GNUNET_SCHEDULER_cancel (op->timeout_task);
+    op->timeout_task = NULL;
+  }
+  _GSS_operation_destroy2 (op);
+}
+
+
+/**
+ * Is element @a ee part of the set used by @a op?
+ *
+ * @param ee element to test
+ * @param op operation the defines the set and its generation
+ * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
+ */
+int
+_GSS_is_element_of_operation (struct ElementEntry *ee,
+                              struct Operation *op)
+{
+  return ee->generation >= op->generation_created;
+}
+
+
+/**
+ * Destroy the given operation.  Used for any operation where both
+ * peers were known and that thus actually had a vt and channel.  Must
+ * not be used for operations where 'listener' is still set and we do
+ * not know the other peer.
+ *
+ * Call the implementation-specific cancel function of the operation.
+ * Disconnects from the remote peer.  Does not disconnect the client,
+ * as there may be multiple operations per set.
+ *
+ * @param op operation to destroy
+ */
+void
+_GSS_operation_destroy (struct Operation *op)
+{
+  struct Set *set = op->set;
+  struct GNUNET_CADET_Channel *channel;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Destroying operation %p\n", op);
+  GNUNET_assert (NULL == op->listener);
+  if (NULL != op->state)
+  {
+    union_op_cancel (op);
+    op->state = NULL;
+  }
+  if (NULL != set)
+  {
+    GNUNET_CONTAINER_DLL_remove (set->ops_head,
+                                 set->ops_tail,
+                                 op);
+    op->set = NULL;
+  }
+  if (NULL != op->context_msg)
+  {
+    GNUNET_free (op->context_msg);
+    op->context_msg = NULL;
+  }
+  if (NULL != (channel = op->channel))
+  {
+    /* This will free op; called conditionally as this helper function
+       is also called from within the channel disconnect handler. */
+    op->channel = NULL;
+    GNUNET_CADET_channel_destroy (channel);
+  }
+  /* We rely on the channel end handler to free 'op'. When 'op->channel' was 
NULL,
+   * there was a channel end handler that will free 'op' on the call stack. */
+}
+
+
+/**
+ * Callback called when a client connects to the service.
+ *
+ * @param cls closure for the service
+ * @param c the new client that connected to the service
+ * @param mq the message queue used to send messages to the client
+ * @return @a `struct ClientState`
+ */
+static void *
+client_connect_cb (void *cls,
+                   struct GNUNET_SERVICE_Client *c,
+                   struct GNUNET_MQ_Handle *mq)
+{
+  struct ClientState *cs;
+
+  num_clients++;
+  cs = GNUNET_new (struct ClientState);
+  cs->client = c;
+  cs->mq = mq;
+  return cs;
+}
+
+
+/**
+ * Iterator over hash map entries to free element entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value a `struct ElementEntry *` to be free'd
+ * @return #GNUNET_YES (continue to iterate)
+ */
+static int
+destroy_elements_iterator (void *cls,
+                           const struct GNUNET_HashCode *key,
+                           void *value)
+{
+  struct ElementEntry *ee = value;
+
+  GNUNET_free (ee);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Clean up after a client has disconnected
+ *
+ * @param cls closure, unused
+ * @param client the client to clean up after
+ * @param internal_cls the `struct ClientState`
+ */
+static void
+client_disconnect_cb (void *cls,
+                      struct GNUNET_SERVICE_Client *client,
+                      void *internal_cls)
+{
+  struct ClientState *cs = internal_cls;
+  struct Operation *op;
+  struct Listener *listener;
+  struct Set *set;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Client disconnected, cleaning up\n");
+  if (NULL != (set = cs->set))
+  {
+    struct SetContent *content = set->content;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Destroying client's set\n");
+    /* Destroy pending set operations */
+    while (NULL != set->ops_head)
+      _GSS_operation_destroy (set->ops_head);
+
+    /* Destroy operation-specific state */
+    GNUNET_assert (NULL != set->state);
+    if (NULL != set->state->se)
+    {
+      strata_estimator_destroy (set->state->se);
+      set->state->se = NULL;
+    }
+    GNUNET_free (set->state);
+
+    /* free set content (or at least decrement RC) */
+    set->content = NULL;
+    GNUNET_assert (0 != content->refcount);
+    content->refcount--;
+    if (0 == content->refcount)
+    {
+      GNUNET_assert (NULL != content->elements);
+      GNUNET_CONTAINER_multihashmap_iterate (content->elements,
+                                             &destroy_elements_iterator,
+                                             NULL);
+      GNUNET_CONTAINER_multihashmap_destroy (content->elements);
+      content->elements = NULL;
+      GNUNET_free (content);
+    }
+    GNUNET_free (set);
+  }
+
+  if (NULL != (listener = cs->listener))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Destroying client's listener\n");
+    GNUNET_CADET_close_port (listener->open_port);
+    listener->open_port = NULL;
+    while (NULL != (op = listener->op_head))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                  "Destroying incoming operation `%u' from peer `%s'\n",
+                  (unsigned int) op->client_request_id,
+                  GNUNET_i2s (&op->peer));
+      incoming_destroy (op);
+    }
+    GNUNET_CONTAINER_DLL_remove (listener_head,
+                                 listener_tail,
+                                 listener);
+    GNUNET_free (listener);
+  }
+  GNUNET_free (cs);
+  num_clients--;
+  if ( (GNUNET_YES == in_shutdown) &&
+       (0 == num_clients) )
+  {
+    if (NULL != cadet)
+    {
+      GNUNET_CADET_disconnect (cadet);
+      cadet = NULL;
+    }
+  }
+}
+
+
+/**
+ * Check a request for a set operation from another peer.
+ *
+ * @param cls the operation state
+ * @param msg the received message
+ * @return #GNUNET_OK if the channel should be kept alive,
+ *         #GNUNET_SYSERR to destroy the channel
+ */
+static int
+check_incoming_msg (void *cls,
+                    const struct OperationRequestMessage *msg)
+{
+  struct Operation *op = cls;
+  struct Listener *listener = op->listener;
+  const struct GNUNET_MessageHeader *nested_context;
+
+  /* double operation request */
+  if (0 != op->suggest_id)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  /* This should be equivalent to the previous condition, but can't hurt to 
check twice */
+  if (NULL == listener)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  nested_context = GNUNET_MQ_extract_nested_mh (msg);
+  if ((NULL != nested_context) &&
+      (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle a request for a set operation from another peer.  Checks if we
+ * have a listener waiting for such a request (and in that case initiates
+ * asking the listener about accepting the connection). If no listener
+ * is waiting, we queue the operation request in hope that a listener
+ * shows up soon (before timeout).
+ *
+ * This msg is expected as the first and only msg handled through the
+ * non-operation bound virtual table, acceptance of this operation replaces
+ * our virtual table and subsequent msgs would be routed differently (as
+ * we then know what type of operation this is).
+ *
+ * @param cls the operation state
+ * @param msg the received message
+ * @return #GNUNET_OK if the channel should be kept alive,
+ *         #GNUNET_SYSERR to destroy the channel
+ */
+static void
+handle_incoming_msg (void *cls,
+                     const struct OperationRequestMessage *msg)
+{
+  struct Operation *op = cls;
+  struct Listener *listener = op->listener;
+  const struct GNUNET_MessageHeader *nested_context;
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_SETU_RequestMessage *cmsg;
+
+  nested_context = GNUNET_MQ_extract_nested_mh (msg);
+  /* Make a copy of the nested_context (application-specific context
+     information that is opaque to set) so we can pass it to the
+     listener later on */
+  if (NULL != nested_context)
+    op->context_msg = GNUNET_copy_message (nested_context);
+  op->remote_element_count = ntohl (msg->element_count);
+  GNUNET_log (
+    GNUNET_ERROR_TYPE_DEBUG,
+    "Received P2P operation request (port %s) for active listener\n",
+    GNUNET_h2s (&op->listener->app_id));
+  GNUNET_assert (0 == op->suggest_id);
+  if (0 == suggest_id)
+    suggest_id++;
+  op->suggest_id = suggest_id++;
+  GNUNET_assert (NULL != op->timeout_task);
+  GNUNET_SCHEDULER_cancel (op->timeout_task);
+  op->timeout_task = NULL;
+  env = GNUNET_MQ_msg_nested_mh (cmsg,
+                                 GNUNET_MESSAGE_TYPE_SETU_REQUEST,
+                                 op->context_msg);
+  GNUNET_log (
+    GNUNET_ERROR_TYPE_DEBUG,
+    "Suggesting incoming request with accept id %u to listener %p of client 
%p\n",
+    op->suggest_id,
+    listener,
+    listener->cs);
+  cmsg->accept_id = htonl (op->suggest_id);
+  cmsg->peer_id = op->peer;
+  GNUNET_MQ_send (listener->cs->mq,
+                  env);
+  /* NOTE: GNUNET_CADET_receive_done() will be called in
+   #handle_client_accept() */
+}
+
+
+/**
+ * Called when a client wants to create a new set.  This is typically
+ * the first request from a client, and includes the type of set
+ * operation to be performed.
+ *
+ * @param cls client that sent the message
+ * @param m message sent by the client
+ */
+static void
+handle_client_create_set (void *cls,
+                          const struct GNUNET_SETU_CreateMessage *msg)
+{
+  struct ClientState *cs = cls;
+  struct Set *set;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Client created new set for union operation\n");
+  if (NULL != cs->set)
+  {
+    /* There can only be one set per client */
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (cs->client);
+    return;
+  }
+  set = GNUNET_new (struct Set);
+  {
+    struct SetState *set_state;
+
+    set_state = GNUNET_new (struct SetState); // FIXME: avoid this malloc, 
merge structs!
+    set_state->se = strata_estimator_create (SE_STRATA_COUNT,
+                                             SE_IBF_SIZE, SE_IBF_HASH_NUM);
+    if (NULL == set_state->se)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to allocate strata estimator\n");
+      GNUNET_free (set);
+      GNUNET_SERVICE_client_drop (cs->client);
+      return;
+    }
+    set->state = set_state;
+  }
+  set->content = GNUNET_new (struct SetContent);
+  set->content->refcount = 1;
+  set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
+                                                                 GNUNET_YES);
+  set->cs = cs;
+  cs->set = set;
+  GNUNET_SERVICE_client_continue (cs->client);
+}
+
+
+/**
+ * Timeout happens iff:
+ *  - we suggested an operation to our listener,
+ *    but did not receive a response in time
+ *  - we got the channel from a peer but no 
#GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
+ *
+ * @param cls channel context
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+incoming_timeout_cb (void *cls)
+{
+  struct Operation *op = cls;
+
+  op->timeout_task = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Remote peer's incoming request timed out\n");
+  incoming_destroy (op);
+}
+
+
+/**
+ * Method called whenever another peer has added us to a channel the
+ * other peer initiated.  Only called (once) upon reception of data
+ * from a channel we listen on.
+ *
+ * The channel context represents the operation itself and gets added
+ * to a DLL, from where it gets looked up when our local listener
+ * client responds to a proposed/suggested operation or connects and
+ * associates with this operation.
+ *
+ * @param cls closure
+ * @param channel new handle to the channel
+ * @param source peer that started the channel
+ * @return initial channel context for the channel
+ *         returns NULL on error
+ */
+static void *
+channel_new_cb (void *cls,
+                struct GNUNET_CADET_Channel *channel,
+                const struct GNUNET_PeerIdentity *source)
+{
+  struct Listener *listener = cls;
+  struct Operation *op;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "New incoming channel\n");
+  op = GNUNET_new (struct Operation);
+  op->listener = listener;
+  op->peer = *source;
+  op->channel = channel;
+  op->mq = GNUNET_CADET_get_mq (op->channel);
+  op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
+                                       UINT32_MAX);
+  op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
+                                                   &incoming_timeout_cb,
+                                                   op);
+  GNUNET_CONTAINER_DLL_insert (listener->op_head,
+                               listener->op_tail,
+                               op);
+  return op;
+}
+
+
+/**
+ * Function called whenever a channel is destroyed.  Should clean up
+ * any associated state.  It must NOT call
+ * GNUNET_CADET_channel_destroy() on the channel.
+ *
+ * The peer_disconnect function is part of a a virtual table set initially 
either
+ * when a peer creates a new channel with us, or once we create
+ * a new channel ourselves (evaluate).
+ *
+ * Once we know the exact type of operation (union/intersection), the vt is
+ * replaced with an operation specific instance (_GSS_[op]_vt).
+ *
+ * @param channel_ctx place where local state associated
+ *                   with the channel is stored
+ * @param channel connection to the other end (henceforth invalid)
+ */
+static void
+channel_end_cb (void *channel_ctx,
+                const struct GNUNET_CADET_Channel *channel)
+{
+  struct Operation *op = channel_ctx;
+
+  op->channel = NULL;
+  _GSS_operation_destroy2 (op);
+}
+
+
+/**
+ * This function probably should not exist
+ * and be replaced by inlining more specific
+ * logic in the various places where it is called.
+ */
+void
+_GSS_operation_destroy2 (struct Operation *op)
+{
+  struct GNUNET_CADET_Channel *channel;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "channel_end_cb called\n");
+  if (NULL != (channel = op->channel))
+  {
+    /* This will free op; called conditionally as this helper function
+       is also called from within the channel disconnect handler. */
+    op->channel = NULL;
+    GNUNET_CADET_channel_destroy (channel);
+  }
+  if (NULL != op->listener)
+  {
+    incoming_destroy (op);
+    return;
+  }
+  if (NULL != op->set)
+    send_client_done (op);
+  _GSS_operation_destroy (op);
+  GNUNET_free (op);
+}
+
+
+/**
+ * Function called whenever an MQ-channel's transmission window size changes.
+ *
+ * The first callback in an outgoing channel will be with a non-zero value
+ * and will mean the channel is connected to the destination.
+ *
+ * For an incoming channel it will be called immediately after the
+ * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
+ *
+ * @param cls Channel closure.
+ * @param channel Connection to the other end (henceforth invalid).
+ * @param window_size New window size. If the is more messages than buffer size
+ *                    this value will be negative..
+ */
+static void
+channel_window_cb (void *cls,
+                   const struct GNUNET_CADET_Channel *channel,
+                   int window_size)
+{
+  /* FIXME: not implemented, we could do flow control here... */
+}
+
+
+/**
+ * Called when a client wants to create a new listener.
+ *
+ * @param cls client that sent the message
+ * @param msg message sent by the client
+ */
+static void
+handle_client_listen (void *cls,
+                      const struct GNUNET_SETU_ListenMessage *msg)
+{
+  struct ClientState *cs = cls;
+  struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
+    GNUNET_MQ_hd_var_size (incoming_msg,
+                           GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
+                           struct OperationRequestMessage,
+                           NULL),
+    GNUNET_MQ_hd_var_size (union_p2p_ibf,
+                           GNUNET_MESSAGE_TYPE_SETU_P2P_IBF,
+                           struct IBFMessage,
+                           NULL),
+    GNUNET_MQ_hd_var_size (union_p2p_elements,
+                           GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS,
+                           struct GNUNET_SETU_ElementMessage,
+                           NULL),
+    GNUNET_MQ_hd_var_size (union_p2p_offer,
+                           GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_hd_var_size (union_p2p_inquiry,
+                           GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY,
+                           struct InquiryMessage,
+                           NULL),
+    GNUNET_MQ_hd_var_size (union_p2p_demand,
+                           GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_hd_fixed_size (union_p2p_done,
+                             GNUNET_MESSAGE_TYPE_SETU_P2P_DONE,
+                             struct GNUNET_MessageHeader,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (union_p2p_over,
+                             GNUNET_MESSAGE_TYPE_SETU_P2P_OVER,
+                             struct GNUNET_MessageHeader,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
+                             GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
+                             struct GNUNET_MessageHeader,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
+                             GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
+                             struct GNUNET_MessageHeader,
+                             NULL),
+    GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
+                           GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
+                           struct StrataEstimatorMessage,
+                           NULL),
+    GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
+                           GNUNET_MESSAGE_TYPE_SETU_P2P_SEC,
+                           struct StrataEstimatorMessage,
+                           NULL),
+    GNUNET_MQ_hd_var_size (union_p2p_full_element,
+                           GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
+                           struct GNUNET_SETU_ElementMessage,
+                           NULL),
+    GNUNET_MQ_handler_end ()
+  };
+  struct Listener *listener;
+
+  if (NULL != cs->listener)
+  {
+    /* max. one active listener per client! */
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (cs->client);
+    return;
+  }
+  listener = GNUNET_new (struct Listener);
+  listener->cs = cs;
+  cs->listener = listener;
+  listener->app_id = msg->app_id;
+  GNUNET_CONTAINER_DLL_insert (listener_head,
+                               listener_tail,
+                               listener);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "New listener created (port %s)\n",
+              GNUNET_h2s (&listener->app_id));
+  listener->open_port = GNUNET_CADET_open_port (cadet,
+                                                &msg->app_id,
+                                                &channel_new_cb,
+                                                listener,
+                                                &channel_window_cb,
+                                                &channel_end_cb,
+                                                cadet_handlers);
+  GNUNET_SERVICE_client_continue (cs->client);
+}
+
+
+/**
+ * Called when the listening client rejects an operation
+ * request by another peer.
+ *
+ * @param cls client that sent the message
+ * @param msg message sent by the client
+ */
+static void
+handle_client_reject (void *cls,
+                      const struct GNUNET_SETU_RejectMessage *msg)
+{
+  struct ClientState *cs = cls;
+  struct Operation *op;
+
+  op = get_incoming (ntohl (msg->accept_reject_id));
+  if (NULL == op)
+  {
+    /* no matching incoming operation for this reject;
+       could be that the other peer already disconnected... */
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Client rejected unknown operation %u\n",
+                (unsigned int) ntohl (msg->accept_reject_id));
+    GNUNET_SERVICE_client_continue (cs->client);
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Peer request (app %s) rejected by client\n",
+              GNUNET_h2s (&cs->listener->app_id));
+  _GSS_operation_destroy2 (op);
+  GNUNET_SERVICE_client_continue (cs->client);
+}
+
+
+/**
+ * Called when a client wants to add or remove an element to a set it inhabits.
+ *
+ * @param cls client that sent the message
+ * @param msg message sent by the client
+ */
+static int
+check_client_set_add (void *cls,
+                      const struct GNUNET_SETU_ElementMessage *msg)
+{
+  /* NOTE: Technically, we should probably check with the
+     block library whether the element we are given is well-formed */
+  return GNUNET_OK;
+}
+
+
+/**
+ * Called when a client wants to add or remove an element to a set it inhabits.
+ *
+ * @param cls client that sent the message
+ * @param msg message sent by the client
+ */
+static void
+handle_client_set_add (void *cls,
+                       const struct GNUNET_SETU_ElementMessage *msg)
+{
+  struct ClientState *cs = cls;
+  struct Set *set;
+  struct GNUNET_SETU_Element el;
+  struct ElementEntry *ee;
+  struct GNUNET_HashCode hash;
+
+  if (NULL == (set = cs->set))
+  {
+    /* client without a set requested an operation */
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (cs->client);
+    return;
+  }
+  GNUNET_SERVICE_client_continue (cs->client);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
+  GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
+  el.size = ntohs (msg->header.size) - sizeof(*msg);
+  el.data = &msg[1];
+  el.element_type = ntohs (msg->element_type);
+  GNUNET_SETU_element_hash (&el,
+                            &hash);
+  ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
+                                          &hash);
+  if (NULL == ee)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client inserts element %s of size %u\n",
+                GNUNET_h2s (&hash),
+                el.size);
+    ee = GNUNET_malloc (el.size + sizeof(*ee));
+    ee->element.size = el.size;
+    GNUNET_memcpy (&ee[1], el.data, el.size);
+    ee->element.data = &ee[1];
+    ee->element.element_type = el.element_type;
+    ee->remote = GNUNET_NO;
+    ee->generation = set->current_generation;
+    ee->element_hash = hash;
+    GNUNET_break (GNUNET_YES ==
+                  GNUNET_CONTAINER_multihashmap_put (
+                    set->content->elements,
+                    &ee->element_hash,
+                    ee,
+                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client inserted element %s of size %u twice (ignored)\n",
+                GNUNET_h2s (&hash),
+                el.size);
+    /* same element inserted twice */
+    return;
+  }
+  strata_estimator_insert (set->state->se,
+                           get_ibf_key (&ee->element_hash));
+}
+
+
+/**
+ * Advance the current generation of a set,
+ * adding exclusion ranges if necessary.
+ *
+ * @param set the set where we want to advance the generation
+ */
+static void
+advance_generation (struct Set *set)
+{
+  set->content->latest_generation++;
+  set->current_generation++;
+}
+
+
+/**
+ * Called when a client wants to initiate a set operation with another
+ * peer.  Initiates the CADET connection to the listener and sends the
+ * request.
+ *
+ * @param cls client that sent the message
+ * @param msg message sent by the client
+ * @return #GNUNET_OK if the message is well-formed
+ */
+static int
+check_client_evaluate (void *cls,
+                       const struct GNUNET_SETU_EvaluateMessage *msg)
+{
+  /* FIXME: suboptimal, even if the context below could be NULL,
+     there are malformed messages this does not check for... */
+  return GNUNET_OK;
+}
+
+
+/**
+ * Called when a client wants to initiate a set operation with another
+ * peer.  Initiates the CADET connection to the listener and sends the
+ * request.
+ *
+ * @param cls client that sent the message
+ * @param msg message sent by the client
+ */
+static void
+handle_client_evaluate (void *cls,
+                        const struct GNUNET_SETU_EvaluateMessage *msg)
+{
+  struct ClientState *cs = cls;
+  struct Operation *op = GNUNET_new (struct Operation);
+  const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
+    GNUNET_MQ_hd_var_size (incoming_msg,
+                           GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
+                           struct OperationRequestMessage,
+                           op),
+    GNUNET_MQ_hd_var_size (union_p2p_ibf,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
+                           struct IBFMessage,
+                           op),
+    GNUNET_MQ_hd_var_size (union_p2p_elements,
+                           GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
+                           struct GNUNET_SETU_ElementMessage,
+                           op),
+    GNUNET_MQ_hd_var_size (union_p2p_offer,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
+                           struct GNUNET_MessageHeader,
+                           op),
+    GNUNET_MQ_hd_var_size (union_p2p_inquiry,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
+                           struct InquiryMessage,
+                           op),
+    GNUNET_MQ_hd_var_size (union_p2p_demand,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
+                           struct GNUNET_MessageHeader,
+                           op),
+    GNUNET_MQ_hd_fixed_size (union_p2p_done,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
+                             struct GNUNET_MessageHeader,
+                             op),
+    GNUNET_MQ_hd_fixed_size (union_p2p_over,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
+                             struct GNUNET_MessageHeader,
+                             op),
+    GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                             struct GNUNET_MessageHeader,
+                             op),
+    GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+                             struct GNUNET_MessageHeader,
+                             op),
+    GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
+                           struct StrataEstimatorMessage,
+                           op),
+    GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
+                           struct StrataEstimatorMessage,
+                           op),
+    GNUNET_MQ_hd_var_size (union_p2p_full_element,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
+                           struct GNUNET_SETU_ElementMessage,
+                           op),
+    GNUNET_MQ_handler_end ()
+  };
+  struct Set *set;
+  const struct GNUNET_MessageHeader *context;
+
+  if (NULL == (set = cs->set))
+  {
+    GNUNET_break (0);
+    GNUNET_free (op);
+    GNUNET_SERVICE_client_drop (cs->client);
+    return;
+  }
+  op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
+                                       UINT32_MAX);
+  op->peer = msg->target_peer;
+  op->client_request_id = ntohl (msg->request_id);
+  op->byzantine = msg->byzantine;
+  op->byzantine_lower_bound = msg->byzantine_lower_bound;
+  op->force_full = msg->force_full;
+  op->force_delta = msg->force_delta;
+  context = GNUNET_MQ_extract_nested_mh (msg);
+
+  /* Advance generation values, so that
+     mutations won't interfer with the running operation. */
+  op->set = set;
+  op->generation_created = set->current_generation;
+  advance_generation (set);
+  GNUNET_CONTAINER_DLL_insert (set->ops_head,
+                               set->ops_tail,
+                               op);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Creating new CADET channel to port %s for set union\n",
+              GNUNET_h2s (&msg->app_id));
+  op->channel = GNUNET_CADET_channel_create (cadet,
+                                             op,
+                                             &msg->target_peer,
+                                             &msg->app_id,
+                                             &channel_window_cb,
+                                             &channel_end_cb,
+                                             cadet_handlers);
+  op->mq = GNUNET_CADET_get_mq (op->channel);
+  op->state = union_evaluate (op, context);
+  if (NULL == op->state)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (cs->client);
+    return;
+  }
+  GNUNET_SERVICE_client_continue (cs->client);
+}
+
+
+/**
+ * Handle a request from the client to cancel a running set operation.
+ *
+ * @param cls the client
+ * @param msg the message
+ */
+static void
+handle_client_cancel (void *cls,
+                      const struct GNUNET_SETU_CancelMessage *msg)
+{
+  struct ClientState *cs = cls;
+  struct Set *set;
+  struct Operation *op;
+  int found;
+
+  if (NULL == (set = cs->set))
+  {
+    /* client without a set requested an operation */
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (cs->client);
+    return;
+  }
+  found = GNUNET_NO;
+  for (op = set->ops_head; NULL != op; op = op->next)
+  {
+    if (op->client_request_id == ntohl (msg->request_id))
+    {
+      found = GNUNET_YES;
+      break;
+    }
+  }
+  if (GNUNET_NO == found)
+  {
+    /* It may happen that the operation was already destroyed due to
+     * the other peer disconnecting.  The client may not know about this
+     * yet and try to cancel the (just barely non-existent) operation.
+     * So this is not a hard error.
+     *///
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Client canceled non-existent op %u\n",
+                (uint32_t) ntohl (msg->request_id));
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client requested cancel for op %u\n",
+                (uint32_t) ntohl (msg->request_id));
+    _GSS_operation_destroy (op);
+  }
+  GNUNET_SERVICE_client_continue (cs->client);
+}
+
+
+/**
+ * Handle a request from the client to accept a set operation that
+ * came from a remote peer.  We forward the accept to the associated
+ * operation for handling
+ *
+ * @param cls the client
+ * @param msg the message
+ */
+static void
+handle_client_accept (void *cls,
+                      const struct GNUNET_SETU_AcceptMessage *msg)
+{
+  struct ClientState *cs = cls;
+  struct Set *set;
+  struct Operation *op;
+  struct GNUNET_SETU_ResultMessage *result_message;
+  struct GNUNET_MQ_Envelope *ev;
+  struct Listener *listener;
+
+  if (NULL == (set = cs->set))
+  {
+    /* client without a set requested to accept */
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (cs->client);
+    return;
+  }
+  op = get_incoming (ntohl (msg->accept_reject_id));
+  if (NULL == op)
+  {
+    /* It is not an error if the set op does not exist -- it may
+    * have been destroyed when the partner peer disconnected. */
+    GNUNET_log (
+      GNUNET_ERROR_TYPE_INFO,
+      "Client %p accepted request %u of listener %p that is no longer 
active\n",
+      cs,
+      ntohl (msg->accept_reject_id),
+      cs->listener);
+    ev = GNUNET_MQ_msg (result_message,
+                        GNUNET_MESSAGE_TYPE_SET_RESULT);
+    result_message->request_id = msg->request_id;
+    result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
+    GNUNET_MQ_send (set->cs->mq, ev);
+    GNUNET_SERVICE_client_continue (cs->client);
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Client accepting request %u\n",
+              (uint32_t) ntohl (msg->accept_reject_id));
+  listener = op->listener;
+  op->listener = NULL;
+  GNUNET_CONTAINER_DLL_remove (listener->op_head,
+                               listener->op_tail,
+                               op);
+  op->set = set;
+  GNUNET_CONTAINER_DLL_insert (set->ops_head,
+                               set->ops_tail,
+                               op);
+  op->client_request_id = ntohl (msg->request_id);
+  op->byzantine = msg->byzantine;
+  op->byzantine_lower_bound = msg->byzantine_lower_bound;
+  op->force_full = msg->force_full;
+  op->force_delta = msg->force_delta;
+
+  /* Advance generation values, so that future mutations do not
+     interfer with the running operation. */
+  op->generation_created = set->current_generation;
+  advance_generation (set);
+  GNUNET_assert (NULL == op->state);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "accepting set union operation\n");
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# of accepted union operations",
+                            1,
+                            GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# of total union operations",
+                            1,
+                            GNUNET_NO);
+  {
+    struct OperationState *state;
+    const struct StrataEstimator *se;
+    struct GNUNET_MQ_Envelope *ev;
+    struct StrataEstimatorMessage *strata_msg;
+    char *buf;
+    size_t len;
+    uint16_t type;
+
+    state = GNUNET_new (struct OperationState); // FIXME: merge with 'op' to 
avoid malloc!
+    state->se = strata_estimator_dup (op->set->state->se);
+    state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
+                                                                   GNUNET_NO);
+    state->salt_receive = state->salt_send = 42; // FIXME?????
+    op->state = state;
+    initialize_key_to_element (op);
+    state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
+      state->key_to_element);
+
+    /* kick off the operation */
+    se = state->se;
+    buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
+    len = strata_estimator_write (se,
+                                  buf);
+    if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
+      type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
+    else
+      type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
+    ev = GNUNET_MQ_msg_extra (strata_msg,
+                              len,
+                              type);
+    GNUNET_memcpy (&strata_msg[1],
+                   buf,
+                   len);
+    GNUNET_free (buf);
+    strata_msg->set_size
+      = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
+                         op->set->content->elements));
+    GNUNET_MQ_send (op->mq,
+                    ev);
+    state->phase = PHASE_EXPECT_IBF;
+
+    op->state = state;
+  }
+  /* Now allow CADET to continue, as we did not do this in
+   #handle_incoming_msg (as we wanted to first see if the
+     local client would accept the request). */
+  GNUNET_CADET_receive_done (op->channel);
+  GNUNET_SERVICE_client_continue (cs->client);
+}
+
+
+/**
+ * Called to clean up, after a shutdown has been requested.
+ *
+ * @param cls closure, NULL
+ */
+static void
+shutdown_task (void *cls)
+{
+  /* Delay actual shutdown to allow service to disconnect clients */
+  in_shutdown = GNUNET_YES;
+  if (0 == num_clients)
+  {
+    if (NULL != cadet)
+    {
+      GNUNET_CADET_disconnect (cadet);
+      cadet = NULL;
+    }
+  }
+  GNUNET_STATISTICS_destroy (_GSS_statistics,
+                             GNUNET_YES);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "handled shutdown request\n");
+}
+
+
+/**
+ * Function called by the service's run
+ * method to run service-specific setup code.
+ *
+ * @param cls closure
+ * @param cfg configuration to use
+ * @param service the initialized service
+ */
+static void
+run (void *cls,
+     const struct GNUNET_CONFIGURATION_Handle *cfg,
+     struct GNUNET_SERVICE_Handle *service)
+{
+  /* FIXME: need to modify SERVICE (!) API to allow
+     us to run a shutdown task *after* clients were
+     forcefully disconnected! */
+  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+                                 NULL);
+  _GSS_statistics = GNUNET_STATISTICS_create ("setu", cfg);
+  cadet = GNUNET_CADET_connect (cfg);
+  if (NULL == cadet)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                _ ("Could not connect to CADET service\n"));
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+}
+
+
+/**
+ * Define "main" method using service macro.
+ */
+GNUNET_SERVICE_MAIN (
+  "set",
+  GNUNET_SERVICE_OPTION_NONE,
+  &run,
+  &client_connect_cb,
+  &client_disconnect_cb,
+  NULL,
+  GNUNET_MQ_hd_fixed_size (client_accept,
+                           GNUNET_MESSAGE_TYPE_SETU_ACCEPT,
+                           struct GNUNET_SETU_AcceptMessage,
+                           NULL),
+  GNUNET_MQ_hd_var_size (client_set_add,
+                         GNUNET_MESSAGE_TYPE_SETU_ADD,
+                         struct GNUNET_SETU_ElementMessage,
+                         NULL),
+  GNUNET_MQ_hd_fixed_size (client_create_set,
+                           GNUNET_MESSAGE_TYPE_SETU_CREATE,
+                           struct GNUNET_SETU_CreateMessage,
+                           NULL),
+  GNUNET_MQ_hd_var_size (client_evaluate,
+                         GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
+                         struct GNUNET_SETU_EvaluateMessage,
+                         NULL),
+  GNUNET_MQ_hd_fixed_size (client_listen,
+                           GNUNET_MESSAGE_TYPE_SETU_LISTEN,
+                           struct GNUNET_SETU_ListenMessage,
+                           NULL),
+  GNUNET_MQ_hd_fixed_size (client_reject,
+                           GNUNET_MESSAGE_TYPE_SETU_REJECT,
+                           struct GNUNET_SETU_RejectMessage,
+                           NULL),
+  GNUNET_MQ_hd_fixed_size (client_cancel,
+                           GNUNET_MESSAGE_TYPE_SETU_CANCEL,
+                           struct GNUNET_SETU_CancelMessage,
+                           NULL),
+  GNUNET_MQ_handler_end ());
+
+
+/* end of gnunet-service-setu.c */
diff --git a/src/setu/gnunet-service-setu.h b/src/setu/gnunet-service-setu.h
new file mode 100644
index 000000000..eb6b7a8e5
--- /dev/null
+++ b/src/setu/gnunet-service-setu.h
@@ -0,0 +1,393 @@
+/*
+      This file is part of GNUnet
+      Copyright (C) 2013-2017 GNUnet e.V.
+
+      GNUnet is free software: you can redistribute it and/or modify it
+      under the terms of the GNU Affero General Public License as published
+      by the Free Software Foundation, either version 3 of the License,
+      or (at your option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      Affero General Public License for more details.
+
+      You should have received a copy of the GNU Affero General Public License
+      along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file set/gnunet-service-setu.h
+ * @brief common components for the implementation the different set operations
+ * @author Florian Dold
+ * @author Christian Grothoff
+ */
+#ifndef GNUNET_SERVICE_SETU_H_PRIVATE
+#define GNUNET_SERVICE_SETU_H_PRIVATE
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
+#include "gnunet_core_service.h"
+#include "gnunet_cadet_service.h"
+#include "gnunet_setu_service.h"
+#include "setu.h"
+
+
+/**
+ * Implementation-specific set state.  Used as opaque pointer, and
+ * specified further in the respective implementation.
+ */
+struct SetState;
+
+/**
+ * Implementation-specific set operation.  Used as opaque pointer, and
+ * specified further in the respective implementation.
+ */
+struct OperationState;
+
+/**
+ * A set that supports a specific operation with other peers.
+ */
+struct Set;
+
+/**
+ * Information about an element element in the set.  All elements are
+ * stored in a hash-table from their hash-code to their 'struct
+ * Element', so that the remove and add operations are reasonably
+ * fast.
+ */
+struct ElementEntry;
+
+/**
+ * Operation context used to execute a set operation.
+ */
+struct Operation;
+
+
+/**
+ * Information about an element element in the set.  All elements are
+ * stored in a hash-table from their hash-code to their `struct
+ * Element`, so that the remove and add operations are reasonably
+ * fast.
+ */
+struct ElementEntry
+{
+  /**
+   * The actual element. The data for the element
+   * should be allocated at the end of this struct.
+   */
+  struct GNUNET_SETU_Element element;
+
+  /**
+   * Hash of the element.  For set union: Will be used to derive the
+   * different IBF keys for different salts.
+   */
+  struct GNUNET_HashCode element_hash;
+
+  /**
+   * First generation that includes this element.
+   */
+  unsigned int generation;
+
+  /**
+   * #GNUNET_YES if the element is a remote element, and does not belong
+   * to the operation's set.
+   */
+  int remote;
+};
+
+
+/**
+ * A listener is inhabited by a client, and waits for evaluation
+ * requests from remote peers.
+ */
+struct Listener;
+
+
+/**
+ * State we keep per client.
+ */
+struct ClientState
+{
+  /**
+   * Set, if associated with the client, otherwise NULL.
+   */
+  struct Set *set;
+
+  /**
+   * Listener, if associated with the client, otherwise NULL.
+   */
+  struct Listener *listener;
+
+  /**
+   * Client handle.
+   */
+  struct GNUNET_SERVICE_Client *client;
+
+  /**
+   * Message queue.
+   */
+  struct GNUNET_MQ_Handle *mq;
+};
+
+
+/**
+ * Operation context used to execute a set operation.
+ */
+struct Operation
+{
+  /**
+   * Kept in a DLL of the listener, if @e listener is non-NULL.
+   */
+  struct Operation *next;
+
+  /**
+   * Kept in a DLL of the listener, if @e listener is non-NULL.
+   */
+  struct Operation *prev;
+
+  /**
+   * Channel to the peer.
+   */
+  struct GNUNET_CADET_Channel *channel;
+
+  /**
+   * Port this operation runs on.
+   */
+  struct Listener *listener;
+
+  /**
+   * Message queue for the channel.
+   */
+  struct GNUNET_MQ_Handle *mq;
+
+  /**
+   * Context message, may be NULL.
+   */
+  struct GNUNET_MessageHeader *context_msg;
+
+  /**
+   * Set associated with the operation, NULL until the spec has been
+   * associated with a set.
+   */
+  struct Set *set;
+
+  /**
+   * Operation-specific operation state.  Note that the exact
+   * type depends on this being a union or intersection operation
+   * (and thus on @e vt).
+   */
+  struct OperationState *state;
+
+  /**
+   * The identity of the requesting peer.  Needs to
+   * be stored here as the op spec might not have been created yet.
+   */
+  struct GNUNET_PeerIdentity peer;
+
+  /**
+   * Timeout task, if the incoming peer has not been accepted
+   * after the timeout, it will be disconnected.
+   */
+  struct GNUNET_SCHEDULER_Task *timeout_task;
+
+  /**
+   * Salt to use for the operation.
+   */
+  uint32_t salt;
+
+  /**
+   * Remote peers element count
+   */
+  uint32_t remote_element_count;
+
+  /**
+   * ID used to identify an operation between service and client
+   */
+  uint32_t client_request_id;
+
+  /**
+   * Always use delta operation instead of sending full sets,
+   * even it it's less efficient.
+   */
+  int force_delta;
+
+  /**
+   * Always send full sets, even if delta operations would
+   * be more efficient.
+   */
+  int force_full;
+
+  /**
+   * #GNUNET_YES to fail operations where Byzantine faults
+   * are suspected
+   */
+  int byzantine;
+
+  /**
+   * Lower bound for the set size, used only when
+   * byzantine mode is enabled.
+   */
+  int byzantine_lower_bound;
+
+  /**
+   * Unique request id for the request from a remote peer, sent to the
+   * client, which will accept or reject the request.  Set to '0' iff
+   * the request has not been suggested yet.
+   */
+  uint32_t suggest_id;
+
+  /**
+   * Generation in which the operation handle
+   * was created.
+   */
+  unsigned int generation_created;
+};
+
+
+/**
+ * SetContent stores the actual set elements, which may be shared by
+ * multiple generations derived from one set.
+ */
+struct SetContent
+{
+  /**
+   * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *elements;
+
+  /**
+   * Number of references to the content.
+   */
+  unsigned int refcount;
+
+  /**
+   * FIXME: document!
+   */
+  unsigned int latest_generation;
+
+  /**
+   * Number of concurrently active iterators.
+   */
+  int iterator_count;
+};
+
+
+struct GenerationRange
+{
+  /**
+   * First generation that is excluded.
+   */
+  unsigned int start;
+
+  /**
+   * Generation after the last excluded generation.
+   */
+  unsigned int end;
+};
+
+
+/**
+ * A set that supports a specific operation with other peers.
+ */
+struct Set
+{
+  /**
+   * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`).
+   */
+  struct Set *next;
+
+  /**
+   * Sets are held in a doubly linked list.
+   */
+  struct Set *prev;
+
+  /**
+   * Client that owns the set.  Only one client may own a set,
+   * and there can only be one set per client.
+   */
+  struct ClientState *cs;
+
+  /**
+   * Content, possibly shared by multiple sets,
+   * and thus reference counted.
+   */
+  struct SetContent *content;
+
+  /**
+   * Implementation-specific state.
+   */
+  struct SetState *state;
+
+  /**
+   * Evaluate operations are held in a linked list.
+   */
+  struct Operation *ops_head;
+
+  /**
+   * Evaluate operations are held in a linked list.
+   */
+  struct Operation *ops_tail;
+
+  /**
+   * Current generation, that is, number of previously executed
+   * operations and lazy copies on the underlying set content.
+   */
+  unsigned int current_generation;
+
+};
+
+
+extern struct GNUNET_STATISTICS_Handle *_GSS_statistics;
+
+
+/**
+ * Destroy the given operation.   Used for any operation where both
+ * peers were known and that thus actually had a vt and channel.  Must
+ * not be used for operations where 'listener' is still set and we do
+ * not know the other peer.
+ *
+ * Call the implementation-specific cancel function of the operation.
+ * Disconnects from the remote peer.  Does not disconnect the client,
+ * as there may be multiple operations per set.
+ *
+ * @param op operation to destroy
+ */
+void
+_GSS_operation_destroy (struct Operation *op);
+
+
+/**
+ * This function probably should not exist
+ * and be replaced by inlining more specific
+ * logic in the various places where it is called.
+ */
+void
+_GSS_operation_destroy2 (struct Operation *op);
+
+
+/**
+ * Get the table with implementing functions for set union.
+ *
+ * @return the operation specific VTable
+ */
+const struct SetVT *
+_GSS_union_vt (void);
+
+
+/**
+ * Is element @a ee part of the set used by @a op?
+ *
+ * @param ee element to test
+ * @param op operation the defines the set and its generation
+ * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
+ */
+int
+_GSS_is_element_of_operation (struct ElementEntry *ee,
+                              struct Operation *op);
+
+
+#endif
diff --git a/src/setu/gnunet-service-setu_protocol.h 
b/src/setu/gnunet-service-setu_protocol.h
new file mode 100644
index 000000000..a2803ee47
--- /dev/null
+++ b/src/setu/gnunet-service-setu_protocol.h
@@ -0,0 +1,226 @@
+/*
+     This file is part of GNUnet.
+     Copyright (C) 2013, 2014 GNUnet e.V.
+
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     Affero General Public License for more details.
+
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @author Florian Dold
+ * @author Christian Grothoff
+ * @file set/gnunet-service-set_protocol.h
+ * @brief Peer-to-Peer messages for gnunet set
+ */
+#ifndef SET_PROTOCOL_H
+#define SET_PROTOCOL_H
+
+#include "platform.h"
+#include "gnunet_common.h"
+
+
+GNUNET_NETWORK_STRUCT_BEGIN
+
+struct OperationRequestMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Operation to request, values from `enum GNUNET_SET_OperationType`
+   */
+  uint32_t operation GNUNET_PACKED;
+
+  /**
+   * For Intersection: my element count
+   */
+  uint32_t element_count GNUNET_PACKED;
+
+  /**
+   * Application-specific identifier of the request.
+   */
+  struct GNUNET_HashCode app_idX;
+
+  /* rest: optional message */
+};
+
+
+/**
+ * Message containing buckets of an invertible bloom filter.
+ *
+ * If an IBF has too many buckets for an IBF message,
+ * it is split into multiple messages.
+ */
+struct IBFMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Order of the whole ibf, where
+   * num_buckets = 2^order
+   */
+  uint8_t order;
+
+  /**
+   * Padding, must be 0.
+   */
+  uint8_t reserved1;
+
+  /**
+   * Padding, must be 0.
+   */
+  uint16_t reserved2 GNUNET_PACKED;
+
+  /**
+   * Offset of the strata in the rest of the message
+   */
+  uint32_t offset GNUNET_PACKED;
+
+  /**
+   * Salt used when hashing elements for this IBF.
+   */
+  uint32_t salt GNUNET_PACKED;
+
+  /* rest: buckets */
+};
+
+
+struct InquiryMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Salt used when hashing elements for this inquiry.
+   */
+  uint32_t salt GNUNET_PACKED;
+
+  /**
+   * Reserved, set to 0.
+   */
+  uint32_t reserved GNUNET_PACKED;
+
+  /* rest: inquiry IBF keys */
+};
+
+
+/**
+ * During intersection, the first (and possibly second) message
+ * send it the number of elements in the set, to allow the peers
+ * to decide who should start with the Bloom filter.
+ */
+struct IntersectionElementInfoMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * mutator used with this bloomfilter.
+   */
+  uint32_t sender_element_count GNUNET_PACKED;
+};
+
+
+/**
+ * Bloom filter messages exchanged for set intersection calculation.
+ */
+struct BFMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Number of elements the sender still has in the set.
+   */
+  uint32_t sender_element_count GNUNET_PACKED;
+
+  /**
+   * XOR of all hashes over all elements remaining in the set.
+   * Used to determine termination.
+   */
+  struct GNUNET_HashCode element_xor_hash;
+
+  /**
+   * Mutator used with this bloomfilter.
+   */
+  uint32_t sender_mutator GNUNET_PACKED;
+
+  /**
+   * Total length of the bloomfilter data.
+   */
+  uint32_t bloomfilter_total_length GNUNET_PACKED;
+
+  /**
+   * Number of bits (k-value) used in encoding the bloomfilter.
+   */
+  uint32_t bits_per_element GNUNET_PACKED;
+
+  /**
+   * rest: the sender's bloomfilter
+   */
+};
+
+
+/**
+ * Last message, send to confirm the final set.  Contains the element
+ * count as it is possible that the peer determined that we were done
+ * by getting the empty set, which in that case also needs to be
+ * communicated.
+ */
+struct IntersectionDoneMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Final number of elements in intersection.
+   */
+  uint32_t final_element_count GNUNET_PACKED;
+
+  /**
+   * XOR of all hashes over all elements remaining in the set.
+   */
+  struct GNUNET_HashCode element_xor_hash;
+};
+
+
+/**
+ * Strata estimator together with the peer's overall set size.
+ */
+struct StrataEstimatorMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE(C)
+   */
+  struct GNUNET_MessageHeader header;
+
+  uint64_t set_size;
+};
+
+GNUNET_NETWORK_STRUCT_END
+
+#endif
diff --git a/src/setu/gnunet-service-setu_strata_estimator.c 
b/src/setu/gnunet-service-setu_strata_estimator.c
new file mode 100644
index 000000000..0fa6a6f17
--- /dev/null
+++ b/src/setu/gnunet-service-setu_strata_estimator.c
@@ -0,0 +1,303 @@
+/*
+      This file is part of GNUnet
+      Copyright (C) 2012 GNUnet e.V.
+
+      GNUnet is free software: you can redistribute it and/or modify it
+      under the terms of the GNU Affero General Public License as published
+      by the Free Software Foundation, either version 3 of the License,
+      or (at your option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      Affero General Public License for more details.
+
+      You should have received a copy of the GNU Affero General Public License
+      along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file set/gnunet-service-setu_strata_estimator.c
+ * @brief invertible bloom filter
+ * @author Florian Dold
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "ibf.h"
+#include "gnunet-service-setu_strata_estimator.h"
+
+
+/**
+ * Should we try compressing the strata estimator? This will
+ * break compatibility with the 0.10.1-network.
+ */
+#define FAIL_10_1_COMPATIBILTIY 1
+
+
+/**
+ * Write the given strata estimator to the buffer.
+ *
+ * @param se strata estimator to serialize
+ * @param[out] buf buffer to write to, must be of appropriate size
+ * @return number of bytes written to @a buf
+ */
+size_t
+strata_estimator_write (const struct StrataEstimator *se,
+                        void *buf)
+{
+  char *sbuf = buf;
+  unsigned int i;
+  size_t osize;
+
+  GNUNET_assert (NULL != se);
+  for (i = 0; i < se->strata_count; i++)
+  {
+    ibf_write_slice (se->strata[i],
+                     0,
+                     se->ibf_size,
+                     &sbuf[se->ibf_size * IBF_BUCKET_SIZE * i]);
+  }
+  osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count;
+#if FAIL_10_1_COMPATIBILTIY
+  {
+    char *cbuf;
+    size_t nsize;
+
+    if (GNUNET_YES ==
+        GNUNET_try_compression (buf,
+                                osize,
+                                &cbuf,
+                                &nsize))
+    {
+      GNUNET_memcpy (buf, cbuf, nsize);
+      osize = nsize;
+      GNUNET_free (cbuf);
+    }
+  }
+#endif
+  return osize;
+}
+
+
+/**
+ * Read strata from the buffer into the given strata
+ * estimator.  The strata estimator must already be allocated.
+ *
+ * @param buf buffer to read from
+ * @param buf_len number of bytes in @a buf
+ * @param is_compressed is the data compressed?
+ * @param[out] se strata estimator to write to
+ * @return #GNUNET_OK on success
+ */
+int
+strata_estimator_read (const void *buf,
+                       size_t buf_len,
+                       int is_compressed,
+                       struct StrataEstimator *se)
+{
+  unsigned int i;
+  size_t osize;
+  char *dbuf;
+
+  dbuf = NULL;
+  if (GNUNET_YES == is_compressed)
+  {
+    osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count;
+    dbuf = GNUNET_decompress (buf,
+                              buf_len,
+                              osize);
+    if (NULL == dbuf)
+    {
+      GNUNET_break_op (0);    /* bad compressed input data */
+      return GNUNET_SYSERR;
+    }
+    buf = dbuf;
+    buf_len = osize;
+  }
+
+  if (buf_len != se->strata_count * se->ibf_size * IBF_BUCKET_SIZE)
+  {
+    GNUNET_break (0);  /* very odd error */
+    GNUNET_free (dbuf);
+    return GNUNET_SYSERR;
+  }
+
+  for (i = 0; i < se->strata_count; i++)
+  {
+    ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]);
+    buf += se->ibf_size * IBF_BUCKET_SIZE;
+  }
+  GNUNET_free (dbuf);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Add a key to the strata estimator.
+ *
+ * @param se strata estimator to add the key to
+ * @param key key to add
+ */
+void
+strata_estimator_insert (struct StrataEstimator *se,
+                         struct IBF_Key key)
+{
+  uint64_t v;
+  unsigned int i;
+
+  v = key.key_val;
+  /* count trailing '1'-bits of v */
+  for (i = 0; v & 1; v >>= 1, i++)
+    /* empty */;
+  ibf_insert (se->strata[i], key);
+}
+
+
+/**
+ * Remove a key from the strata estimator.
+ *
+ * @param se strata estimator to remove the key from
+ * @param key key to remove
+ */
+void
+strata_estimator_remove (struct StrataEstimator *se,
+                         struct IBF_Key key)
+{
+  uint64_t v;
+  unsigned int i;
+
+  v = key.key_val;
+  /* count trailing '1'-bits of v */
+  for (i = 0; v & 1; v >>= 1, i++)
+    /* empty */;
+  ibf_remove (se->strata[i], key);
+}
+
+
+/**
+ * Create a new strata estimator with the given parameters.
+ *
+ * @param strata_count number of stratas, that is, number of ibfs in the 
estimator
+ * @param ibf_size size of each ibf stratum
+ * @param ibf_hashnum hashnum parameter of each ibf
+ * @return a freshly allocated, empty strata estimator, NULL on error
+ */
+struct StrataEstimator *
+strata_estimator_create (unsigned int strata_count,
+                         uint32_t ibf_size,
+                         uint8_t ibf_hashnum)
+{
+  struct StrataEstimator *se;
+  unsigned int i;
+  unsigned int j;
+
+  se = GNUNET_new (struct StrataEstimator);
+  se->strata_count = strata_count;
+  se->ibf_size = ibf_size;
+  se->strata = GNUNET_new_array (strata_count,
+                                 struct InvertibleBloomFilter *);
+  for (i = 0; i < strata_count; i++)
+  {
+    se->strata[i] = ibf_create (ibf_size, ibf_hashnum);
+    if (NULL == se->strata[i])
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to allocate memory for strata estimator\n");
+      for (j = 0; j < i; j++)
+        ibf_destroy (se->strata[i]);
+      GNUNET_free (se);
+      return NULL;
+    }
+  }
+  return se;
+}
+
+
+/**
+ * Estimate set difference with two strata estimators,
+ * i.e. arrays of IBFs.
+ * Does not not modify its arguments.
+ *
+ * @param se1 first strata estimator
+ * @param se2 second strata estimator
+ * @return the estimated difference
+ */
+unsigned int
+strata_estimator_difference (const struct StrataEstimator *se1,
+                             const struct StrataEstimator *se2)
+{
+  unsigned int count;
+
+  GNUNET_assert (se1->strata_count == se2->strata_count);
+  count = 0;
+  for (int i = se1->strata_count - 1; i >= 0; i--)
+  {
+    struct InvertibleBloomFilter *diff;
+    /* number of keys decoded from the ibf */
+
+    /* FIXME: implement this without always allocating new IBFs */
+    diff = ibf_dup (se1->strata[i]);
+    ibf_subtract (diff, se2->strata[i]);
+    for (int ibf_count = 0; GNUNET_YES; ibf_count++)
+    {
+      int more;
+
+      more = ibf_decode (diff, NULL, NULL);
+      if (GNUNET_NO == more)
+      {
+        count += ibf_count;
+        break;
+      }
+      /* Estimate if decoding fails or would not terminate */
+      if ((GNUNET_SYSERR == more) || (ibf_count > diff->size))
+      {
+        ibf_destroy (diff);
+        return count * (1 << (i + 1));
+      }
+    }
+    ibf_destroy (diff);
+  }
+  return count;
+}
+
+
+/**
+ * Make a copy of a strata estimator.
+ *
+ * @param se the strata estimator to copy
+ * @return the copy
+ */
+struct StrataEstimator *
+strata_estimator_dup (struct StrataEstimator *se)
+{
+  struct StrataEstimator *c;
+  unsigned int i;
+
+  c = GNUNET_new (struct StrataEstimator);
+  c->strata_count = se->strata_count;
+  c->ibf_size = se->ibf_size;
+  c->strata = GNUNET_new_array (se->strata_count,
+                                struct InvertibleBloomFilter *);
+  for (i = 0; i < se->strata_count; i++)
+    c->strata[i] = ibf_dup (se->strata[i]);
+  return c;
+}
+
+
+/**
+ * Destroy a strata estimator, free all of its resources.
+ *
+ * @param se strata estimator to destroy.
+ */
+void
+strata_estimator_destroy (struct StrataEstimator *se)
+{
+  unsigned int i;
+
+  for (i = 0; i < se->strata_count; i++)
+    ibf_destroy (se->strata[i]);
+  GNUNET_free (se->strata);
+  GNUNET_free (se);
+}
diff --git a/src/setu/gnunet-service-setu_strata_estimator.h 
b/src/setu/gnunet-service-setu_strata_estimator.h
new file mode 100644
index 000000000..afdbcdbbf
--- /dev/null
+++ b/src/setu/gnunet-service-setu_strata_estimator.h
@@ -0,0 +1,169 @@
+/*
+      This file is part of GNUnet
+      Copyright (C) 2012 GNUnet e.V.
+
+      GNUnet is free software: you can redistribute it and/or modify it
+      under the terms of the GNU Affero General Public License as published
+      by the Free Software Foundation, either version 3 of the License,
+      or (at your option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      Affero General Public License for more details.
+
+      You should have received a copy of the GNU Affero General Public License
+      along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/gnunet-service-setu_strata_estimator.h
+ * @brief estimator of set difference
+ * @author Florian Dold
+ */
+
+#ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H
+#define GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H
+
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_util_lib.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#if 0                           /* keep Emacsens' auto-indent happy */
+}
+#endif
+#endif
+
+
+/**
+ * A handle to a strata estimator.
+ */
+struct StrataEstimator
+{
+  /**
+   * The IBFs of this strata estimator.
+   */
+  struct InvertibleBloomFilter **strata;
+
+  /**
+   * Size of the IBF array in @e strata
+   */
+  unsigned int strata_count;
+
+  /**
+   * Size of each IBF stratum (in bytes)
+   */
+  unsigned int ibf_size;
+};
+
+
+/**
+ * Write the given strata estimator to the buffer.
+ *
+ * @param se strata estimator to serialize
+ * @param[out] buf buffer to write to, must be of appropriate size
+ * @return number of bytes written to @a buf
+ */
+size_t
+strata_estimator_write (const struct StrataEstimator *se,
+                        void *buf);
+
+
+/**
+ * Read strata from the buffer into the given strata
+ * estimator.  The strata estimator must already be allocated.
+ *
+ * @param buf buffer to read from
+ * @param buf_len number of bytes in @a buf
+ * @param is_compressed is the data compressed?
+ * @param[out] se strata estimator to write to
+ * @return #GNUNET_OK on success
+ */
+int
+strata_estimator_read (const void *buf,
+                       size_t buf_len,
+                       int is_compressed,
+                       struct StrataEstimator *se);
+
+
+/**
+ * Create a new strata estimator with the given parameters.
+ *
+ * @param strata_count number of stratas, that is, number of ibfs in the 
estimator
+ * @param ibf_size size of each ibf stratum
+ * @param ibf_hashnum hashnum parameter of each ibf
+ * @return a freshly allocated, empty strata estimator, NULL on error
+ */
+struct StrataEstimator *
+strata_estimator_create (unsigned int strata_count,
+                         uint32_t ibf_size,
+                         uint8_t ibf_hashnum);
+
+
+/**
+ * Get an estimation of the symmetric difference of the elements
+ * contained in both strata estimators.
+ *
+ * @param se1 first strata estimator
+ * @param se2 second strata estimator
+ * @return abs(|se1| - |se2|)
+ */
+unsigned int
+strata_estimator_difference (const struct StrataEstimator *se1,
+                             const struct StrataEstimator *se2);
+
+
+/**
+ * Add a key to the strata estimator.
+ *
+ * @param se strata estimator to add the key to
+ * @param key key to add
+ */
+void
+strata_estimator_insert (struct StrataEstimator *se,
+                         struct IBF_Key key);
+
+
+/**
+ * Remove a key from the strata estimator.
+ *
+ * @param se strata estimator to remove the key from
+ * @param key key to remove
+ */
+void
+strata_estimator_remove (struct StrataEstimator *se,
+                         struct IBF_Key key);
+
+
+/**
+ * Destroy a strata estimator, free all of its resources.
+ *
+ * @param se strata estimator to destroy.
+ */
+void
+strata_estimator_destroy (struct StrataEstimator *se);
+
+
+/**
+ * Make a copy of a strata estimator.
+ *
+ * @param se the strata estimator to copy
+ * @return the copy
+ */
+struct StrataEstimator *
+strata_estimator_dup (struct StrataEstimator *se);
+
+
+#if 0                           /* keep Emacsens' auto-indent happy */
+{
+#endif
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/setu/gnunet-setu-ibf-profiler.c 
b/src/setu/gnunet-setu-ibf-profiler.c
new file mode 100644
index 000000000..944b63d30
--- /dev/null
+++ b/src/setu/gnunet-setu-ibf-profiler.c
@@ -0,0 +1,308 @@
+/*
+     This file is part of GNUnet.
+     Copyright (C) 2012 GNUnet e.V.
+
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     Affero General Public License for more details.
+
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/gnunet-set-ibf-profiler.c
+ * @brief tool for profiling the invertible bloom filter implementation
+ * @author Florian Dold
+ */
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+
+#include "ibf.h"
+
+static unsigned int asize = 10;
+static unsigned int bsize = 10;
+static unsigned int csize = 10;
+static unsigned int hash_num = 4;
+static unsigned int ibf_size = 80;
+
+/* FIXME: add parameter for this */
+static enum GNUNET_CRYPTO_Quality random_quality = GNUNET_CRYPTO_QUALITY_WEAK;
+
+static struct GNUNET_CONTAINER_MultiHashMap *set_a;
+static struct GNUNET_CONTAINER_MultiHashMap *set_b;
+/* common elements in a and b */
+static struct GNUNET_CONTAINER_MultiHashMap *set_c;
+
+static struct GNUNET_CONTAINER_MultiHashMap *key_to_hashcode;
+
+static struct InvertibleBloomFilter *ibf_a;
+static struct InvertibleBloomFilter *ibf_b;
+
+
+static void
+register_hashcode (struct GNUNET_HashCode *hash)
+{
+  struct GNUNET_HashCode replicated;
+  struct IBF_Key key;
+
+  key = ibf_key_from_hashcode (hash);
+  ibf_hashcode_from_key (key, &replicated);
+  (void) GNUNET_CONTAINER_multihashmap_put (
+    key_to_hashcode,
+    &replicated,
+    GNUNET_memdup (hash, sizeof *hash),
+    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+}
+
+
+static void
+iter_hashcodes (struct IBF_Key key,
+                GNUNET_CONTAINER_MulitHashMapIteratorCallback iter,
+                void *cls)
+{
+  struct GNUNET_HashCode replicated;
+
+  ibf_hashcode_from_key (key, &replicated);
+  GNUNET_CONTAINER_multihashmap_get_multiple (key_to_hashcode,
+                                              &replicated,
+                                              iter,
+                                              cls);
+}
+
+
+static int
+insert_iterator (void *cls, const struct GNUNET_HashCode *key, void *value)
+{
+  struct InvertibleBloomFilter *ibf = cls;
+
+  ibf_insert (ibf, ibf_key_from_hashcode (key));
+  return GNUNET_YES;
+}
+
+
+static int
+remove_iterator (void *cls, const struct GNUNET_HashCode *key, void *value)
+{
+  struct GNUNET_CONTAINER_MultiHashMap *hashmap = cls;
+
+  /* if remove fails, there just was a collision with another key */
+  (void) GNUNET_CONTAINER_multihashmap_remove (hashmap, value, NULL);
+  return GNUNET_YES;
+}
+
+
+static void
+run (void *cls,
+     char *const *args,
+     const char *cfgfile,
+     const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  struct GNUNET_HashCode id;
+  struct IBF_Key ibf_key;
+  int i;
+  int side;
+  int res;
+  struct GNUNET_TIME_Absolute start_time;
+  struct GNUNET_TIME_Relative delta_time;
+
+  set_a =
+    GNUNET_CONTAINER_multihashmap_create (((asize == 0) ? 1 : (asize + csize)),
+                                          GNUNET_NO);
+  set_b =
+    GNUNET_CONTAINER_multihashmap_create (((bsize == 0) ? 1 : (bsize + csize)),
+                                          GNUNET_NO);
+  set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize),
+                                                GNUNET_NO);
+
+  key_to_hashcode =
+    GNUNET_CONTAINER_multihashmap_create (((asize + bsize + csize == 0)
+                                           ? 1
+                                           : (asize + bsize + csize)),
+                                          GNUNET_NO);
+
+  printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n",
+          hash_num,
+          ibf_size,
+          asize,
+          bsize,
+          csize);
+
+  i = 0;
+  while (i < asize)
+  {
+    GNUNET_CRYPTO_hash_create_random (random_quality, &id);
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
+      continue;
+    GNUNET_break (GNUNET_OK ==
+                  GNUNET_CONTAINER_multihashmap_put (
+                    set_a,
+                    &id,
+                    NULL,
+                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    register_hashcode (&id);
+    i++;
+  }
+  i = 0;
+  while (i < bsize)
+  {
+    GNUNET_CRYPTO_hash_create_random (random_quality, &id);
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
+      continue;
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
+      continue;
+    GNUNET_break (GNUNET_OK ==
+                  GNUNET_CONTAINER_multihashmap_put (
+                    set_b,
+                    &id,
+                    NULL,
+                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    register_hashcode (&id);
+    i++;
+  }
+  i = 0;
+  while (i < csize)
+  {
+    GNUNET_CRYPTO_hash_create_random (random_quality, &id);
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
+      continue;
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
+      continue;
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_c, &id))
+      continue;
+    GNUNET_break (GNUNET_OK ==
+                  GNUNET_CONTAINER_multihashmap_put (
+                    set_c,
+                    &id,
+                    NULL,
+                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    register_hashcode (&id);
+    i++;
+  }
+
+  ibf_a = ibf_create (ibf_size, hash_num);
+  ibf_b = ibf_create (ibf_size, hash_num);
+  if ((NULL == ibf_a) || (NULL == ibf_b))
+  {
+    /* insufficient memory */
+    GNUNET_break (0);
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+
+
+  printf ("generated sets\n");
+
+  start_time = GNUNET_TIME_absolute_get ();
+
+  GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a);
+  GNUNET_CONTAINER_multihashmap_iterate (set_b, &insert_iterator, ibf_b);
+  GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_a);
+  GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_b);
+
+  delta_time = GNUNET_TIME_absolute_get_duration (start_time);
+
+  printf ("encoded in: %s\n",
+          GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
+
+  ibf_subtract (ibf_a, ibf_b);
+
+
+  start_time = GNUNET_TIME_absolute_get ();
+
+  for (i = 0; i <= asize + bsize; i++)
+  {
+    res = ibf_decode (ibf_a, &side, &ibf_key);
+    if (GNUNET_SYSERR == res)
+    {
+      printf ("decode failed, %u/%u elements left\n",
+              GNUNET_CONTAINER_multihashmap_size (set_a)
+              + GNUNET_CONTAINER_multihashmap_size (set_b),
+              asize + bsize);
+      return;
+    }
+    if (GNUNET_NO == res)
+    {
+      if ((0 == GNUNET_CONTAINER_multihashmap_size (set_b)) &&
+          (0 == GNUNET_CONTAINER_multihashmap_size (set_a)))
+      {
+        delta_time = GNUNET_TIME_absolute_get_duration (start_time);
+        printf ("decoded successfully in: %s\n",
+                GNUNET_STRINGS_relative_time_to_string (delta_time, 
GNUNET_NO));
+      }
+      else
+      {
+        printf ("decode missed elements (should never happen)\n");
+      }
+      return;
+    }
+
+    if (side == 1)
+      iter_hashcodes (ibf_key, remove_iterator, set_a);
+    if (side == -1)
+      iter_hashcodes (ibf_key, remove_iterator, set_b);
+  }
+  printf ("cyclic IBF, %u/%u elements left\n",
+          GNUNET_CONTAINER_multihashmap_size (set_a)
+          + GNUNET_CONTAINER_multihashmap_size (set_b),
+          asize + bsize);
+}
+
+
+int
+main (int argc, char **argv)
+{
+  struct GNUNET_GETOPT_CommandLineOption options[] = {
+    GNUNET_GETOPT_option_uint ('A',
+                               "asize",
+                               NULL,
+                               gettext_noop ("number of element in set A-B"),
+                               &asize),
+
+    GNUNET_GETOPT_option_uint ('B',
+                               "bsize",
+                               NULL,
+                               gettext_noop ("number of element in set B-A"),
+                               &bsize),
+
+    GNUNET_GETOPT_option_uint ('C',
+                               "csize",
+                               NULL,
+                               gettext_noop (
+                                 "number of common elements in A and B"),
+                               &csize),
+
+    GNUNET_GETOPT_option_uint ('k',
+                               "hash-num",
+                               NULL,
+                               gettext_noop ("hash num"),
+                               &hash_num),
+
+    GNUNET_GETOPT_option_uint ('s',
+                               "ibf-size",
+                               NULL,
+                               gettext_noop ("ibf size"),
+                               &ibf_size),
+
+    GNUNET_GETOPT_OPTION_END
+  };
+
+  GNUNET_PROGRAM_run2 (argc,
+                       argv,
+                       "gnunet-consensus-ibf",
+                       "help",
+                       options,
+                       &run,
+                       NULL,
+                       GNUNET_YES);
+  return 0;
+}
diff --git a/src/setu/gnunet-setu-profiler.c b/src/setu/gnunet-setu-profiler.c
new file mode 100644
index 000000000..8d6a2dc8c
--- /dev/null
+++ b/src/setu/gnunet-setu-profiler.c
@@ -0,0 +1,499 @@
+/*
+      This file is part of GNUnet
+      Copyright (C) 2013 GNUnet e.V.
+
+      GNUnet is free software: you can redistribute it and/or modify it
+      under the terms of the GNU Affero General Public License as published
+      by the Free Software Foundation, either version 3 of the License,
+      or (at your option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      Affero General Public License for more details.
+
+      You should have received a copy of the GNU Affero General Public License
+      along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file setu/gnunet-setu-profiler.c
+ * @brief profiling tool for set
+ * @author Florian Dold
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_statistics_service.h"
+#include "gnunet_setu_service.h"
+#include "gnunet_testbed_service.h"
+
+
+static int ret;
+
+static unsigned int num_a = 5;
+static unsigned int num_b = 5;
+static unsigned int num_c = 20;
+
+static char *op_str = "union";
+
+const static struct GNUNET_CONFIGURATION_Handle *config;
+
+struct SetInfo
+{
+  char *id;
+  struct GNUNET_SETU_Handle *set;
+  struct GNUNET_SETU_OperationHandle *oh;
+  struct GNUNET_CONTAINER_MultiHashMap *sent;
+  struct GNUNET_CONTAINER_MultiHashMap *received;
+  int done;
+} info1, info2;
+
+static struct GNUNET_CONTAINER_MultiHashMap *common_sent;
+
+static struct GNUNET_HashCode app_id;
+
+static struct GNUNET_PeerIdentity local_peer;
+
+static struct GNUNET_SETU_ListenHandle *set_listener;
+
+static int byzantine;
+static unsigned int force_delta;
+static unsigned int force_full;
+static unsigned int element_size = 32;
+
+/**
+ * Handle to the statistics service.
+ */
+static struct GNUNET_STATISTICS_Handle *statistics;
+
+/**
+ * The profiler will write statistics
+ * for all peers to the file with this name.
+ */
+static char *statistics_filename;
+
+/**
+ * The profiler will write statistics
+ * for all peers to this file.
+ */
+static FILE *statistics_file;
+
+
+static int
+map_remove_iterator (void *cls,
+                     const struct GNUNET_HashCode *key,
+                     void *value)
+{
+  struct GNUNET_CONTAINER_MultiHashMap *m = cls;
+  int ret;
+
+  GNUNET_assert (NULL != key);
+
+  ret = GNUNET_CONTAINER_multihashmap_remove_all (m, key);
+  if (GNUNET_OK != ret)
+    printf ("spurious element\n");
+  return GNUNET_YES;
+}
+
+
+/**
+ * Callback function to process statistic values.
+ *
+ * @param cls closure
+ * @param subsystem name of subsystem that created the statistic
+ * @param name the name of the datum
+ * @param value the current value
+ * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if 
not
+ * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
+ */
+static int
+statistics_result (void *cls,
+                   const char *subsystem,
+                   const char *name,
+                   uint64_t value,
+                   int is_persistent)
+{
+  if (NULL != statistics_file)
+  {
+    fprintf (statistics_file, "%s\t%s\t%lu\n", subsystem, name, (unsigned
+                                                                 long) value);
+  }
+  return GNUNET_OK;
+}
+
+
+static void
+statistics_done (void *cls,
+                 int success)
+{
+  GNUNET_assert (GNUNET_YES == success);
+  if (NULL != statistics_file)
+    fclose (statistics_file);
+  GNUNET_SCHEDULER_shutdown ();
+}
+
+
+static void
+check_all_done (void)
+{
+  if ((info1.done == GNUNET_NO) || (info2.done == GNUNET_NO))
+    return;
+
+  GNUNET_CONTAINER_multihashmap_iterate (info1.received, map_remove_iterator,
+                                         info2.sent);
+  GNUNET_CONTAINER_multihashmap_iterate (info2.received, map_remove_iterator,
+                                         info1.sent);
+
+  printf ("set a: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (
+            info1.sent));
+  printf ("set b: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (
+            info2.sent));
+
+  if (NULL == statistics_filename)
+  {
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+
+  statistics_file = fopen (statistics_filename, "w");
+  GNUNET_STATISTICS_get (statistics, NULL, NULL,
+                         &statistics_done,
+                         &statistics_result, NULL);
+}
+
+
+static void
+set_result_cb (void *cls,
+               const struct GNUNET_SETU_Element *element,
+               uint64_t current_size,
+               enum GNUNET_SETU_Status status)
+{
+  struct SetInfo *info = cls;
+
+  GNUNET_assert (GNUNET_NO == info->done);
+  switch (status)
+  {
+  case GNUNET_SETU_STATUS_DONE:
+    info->done = GNUNET_YES;
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s done\n", info->id);
+    check_all_done ();
+    info->oh = NULL;
+    return;
+
+  case GNUNET_SETU_STATUS_FAILURE:
+    info->oh = NULL;
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n");
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+
+  case GNUNET_SETU_STATUS_ADD_LOCAL:
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: local element\n", info->id);
+    break;
+  default:
+    GNUNET_assert (0);
+  }
+
+  if (element->size != element_size)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "wrong element size: %u, expected %u\n",
+                element->size,
+                (unsigned int) sizeof(struct GNUNET_HashCode));
+    GNUNET_assert (0);
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: got element (%s)\n",
+              info->id, GNUNET_h2s (element->data));
+  GNUNET_assert (NULL != element->data);
+  struct GNUNET_HashCode data_hash;
+  GNUNET_CRYPTO_hash (element->data, element_size, &data_hash);
+  GNUNET_CONTAINER_multihashmap_put (info->received,
+                                     &data_hash, NULL,
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+}
+
+
+static void
+set_listen_cb (void *cls,
+               const struct GNUNET_PeerIdentity *other_peer,
+               const struct GNUNET_MessageHeader *context_msg,
+               struct GNUNET_SETU_Request *request)
+{
+  /* max. 2 options plus terminator */
+  struct GNUNET_SETU_Option opts[3] = { { 0 } };
+  unsigned int n_opts = 0;
+
+  if (NULL == request)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "listener failed\n");
+    return;
+  }
+  GNUNET_assert (NULL == info2.oh);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "set listen cb called\n");
+  if (byzantine)
+  {
+    opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
+                                                     
GNUNET_SETU_OPTION_BYZANTINE };
+  }
+  GNUNET_assert (! (force_full && force_delta));
+  if (force_full)
+  {
+    opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
+                                                     
GNUNET_SETU_OPTION_FORCE_FULL };
+  }
+  if (force_delta)
+  {
+    opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
+                                                     
GNUNET_SETU_OPTION_FORCE_DELTA };
+  }
+
+  opts[n_opts].type = 0;
+  info2.oh = GNUNET_SETU_accept (request,
+                                 opts,
+                                 set_result_cb, &info2);
+  GNUNET_SETU_commit (info2.oh, info2.set);
+}
+
+
+static int
+set_insert_iterator (void *cls,
+                     const struct GNUNET_HashCode *key,
+                     void *value)
+{
+  struct GNUNET_SETU_Handle *set = cls;
+  struct GNUNET_SETU_Element el;
+
+  el.element_type = 0;
+  el.data = value;
+  el.size = element_size;
+  GNUNET_SETU_add_element (set, &el, NULL, NULL);
+  return GNUNET_YES;
+}
+
+
+static void
+handle_shutdown (void *cls)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Shutting down set profiler\n");
+  if (NULL != set_listener)
+  {
+    GNUNET_SETU_listen_cancel (set_listener);
+    set_listener = NULL;
+  }
+  if (NULL != info1.oh)
+  {
+    GNUNET_SETU_operation_cancel (info1.oh);
+    info1.oh = NULL;
+  }
+  if (NULL != info2.oh)
+  {
+    GNUNET_SETU_operation_cancel (info2.oh);
+    info2.oh = NULL;
+  }
+  if (NULL != info1.set)
+  {
+    GNUNET_SETU_destroy (info1.set);
+    info1.set = NULL;
+  }
+  if (NULL != info2.set)
+  {
+    GNUNET_SETU_destroy (info2.set);
+    info2.set = NULL;
+  }
+  GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
+}
+
+
+static void
+run (void *cls,
+     const struct GNUNET_CONFIGURATION_Handle *cfg,
+     struct GNUNET_TESTING_Peer *peer)
+{
+  unsigned int i;
+  struct GNUNET_HashCode hash;
+  /* max. 2 options plus terminator */
+  struct GNUNET_SETU_Option opts[3] = { { 0 } };
+  unsigned int n_opts = 0;
+
+  config = cfg;
+
+  GNUNET_assert (element_size > 0);
+
+  if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &local_peer))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
+    ret = 0;
+    return;
+  }
+
+  statistics = GNUNET_STATISTICS_create ("set-profiler", cfg);
+
+  GNUNET_SCHEDULER_add_shutdown (&handle_shutdown, NULL);
+
+  info1.id = "a";
+  info2.id = "b";
+
+  info1.sent = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO);
+  info2.sent = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO);
+  common_sent = GNUNET_CONTAINER_multihashmap_create (num_c + 1, GNUNET_NO);
+
+  info1.received = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO);
+  info2.received = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO);
+
+  for (i = 0; i < num_a; i++)
+  {
+    char *data = GNUNET_malloc (element_size);
+    GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, 
element_size);
+    GNUNET_CRYPTO_hash (data, element_size, &hash);
+    GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, data,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+  }
+
+  for (i = 0; i < num_b; i++)
+  {
+    char *data = GNUNET_malloc (element_size);
+    GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, 
element_size);
+    GNUNET_CRYPTO_hash (data, element_size, &hash);
+    GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, data,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+  }
+
+  for (i = 0; i < num_c; i++)
+  {
+    char *data = GNUNET_malloc (element_size);
+    GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, 
element_size);
+    GNUNET_CRYPTO_hash (data, element_size, &hash);
+    GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, data,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+  }
+
+  GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &app_id);
+
+  /* FIXME: also implement intersection etc. */
+  info1.set = GNUNET_SETU_create (config);
+  info2.set = GNUNET_SETU_create (config);
+
+  GNUNET_CONTAINER_multihashmap_iterate (info1.sent, set_insert_iterator,
+                                         info1.set);
+  GNUNET_CONTAINER_multihashmap_iterate (info2.sent, set_insert_iterator,
+                                         info2.set);
+  GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator,
+                                         info1.set);
+  GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator,
+                                         info2.set);
+
+  set_listener = GNUNET_SETU_listen (config,
+                                     &app_id,
+                                     &set_listen_cb,
+                                     NULL);
+
+
+  if (byzantine)
+  {
+    opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
+                                                     
GNUNET_SETU_OPTION_BYZANTINE };
+  }
+  GNUNET_assert (! (force_full && force_delta));
+  if (force_full)
+  {
+    opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
+                                                     
GNUNET_SETU_OPTION_FORCE_FULL };
+  }
+  if (force_delta)
+  {
+    opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
+                                                     
GNUNET_SETU_OPTION_FORCE_DELTA };
+  }
+
+  opts[n_opts].type = 0;
+
+  info1.oh = GNUNET_SETU_prepare (&local_peer, &app_id, NULL,
+                                  opts,
+                                  set_result_cb, &info1);
+  GNUNET_SETU_commit (info1.oh, info1.set);
+  GNUNET_SETU_destroy (info1.set);
+  info1.set = NULL;
+}
+
+
+static void
+pre_run (void *cls, char *const *args, const char *cfgfile,
+         const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  if (0 != GNUNET_TESTING_peer_run ("set-profiler",
+                                    cfgfile,
+                                    &run, NULL))
+    ret = 2;
+}
+
+
+int
+main (int argc, char **argv)
+{
+  struct GNUNET_GETOPT_CommandLineOption options[] = {
+    GNUNET_GETOPT_option_uint ('A',
+                               "num-first",
+                               NULL,
+                               gettext_noop ("number of values"),
+                               &num_a),
+
+    GNUNET_GETOPT_option_uint ('B',
+                               "num-second",
+                               NULL,
+                               gettext_noop ("number of values"),
+                               &num_b),
+
+    GNUNET_GETOPT_option_flag ('b',
+                               "byzantine",
+                               gettext_noop ("use byzantine mode"),
+                               &byzantine),
+
+    GNUNET_GETOPT_option_uint ('f',
+                               "force-full",
+                               NULL,
+                               gettext_noop ("force sending full set"),
+                               &force_full),
+
+    GNUNET_GETOPT_option_uint ('d',
+                               "force-delta",
+                               NULL,
+                               gettext_noop ("number delta operation"),
+                               &force_delta),
+
+    GNUNET_GETOPT_option_uint ('C',
+                               "num-common",
+                               NULL,
+                               gettext_noop ("number of values"),
+                               &num_c),
+
+    GNUNET_GETOPT_option_string ('x',
+                                 "operation",
+                                 NULL,
+                                 gettext_noop ("operation to execute"),
+                                 &op_str),
+
+    GNUNET_GETOPT_option_uint ('w',
+                               "element-size",
+                               NULL,
+                               gettext_noop ("element size"),
+                               &element_size),
+
+    GNUNET_GETOPT_option_filename ('s',
+                                   "statistics",
+                                   "FILENAME",
+                                   gettext_noop ("write statistics to file"),
+                                   &statistics_filename),
+
+    GNUNET_GETOPT_OPTION_END
+  };
+
+  GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set-profiler",
+                       "help",
+                       options, &pre_run, NULL, GNUNET_YES);
+  return ret;
+}
diff --git a/src/setu/ibf.c b/src/setu/ibf.c
new file mode 100644
index 000000000..1532afceb
--- /dev/null
+++ b/src/setu/ibf.c
@@ -0,0 +1,409 @@
+/*
+      This file is part of GNUnet
+      Copyright (C) 2012 GNUnet e.V.
+
+      GNUnet is free software: you can redistribute it and/or modify it
+      under the terms of the GNU Affero General Public License as published
+      by the Free Software Foundation, either version 3 of the License,
+      or (at your option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      Affero General Public License for more details.
+
+      You should have received a copy of the GNU Affero General Public License
+      along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/ibf.c
+ * @brief implementation of the invertible bloom filter
+ * @author Florian Dold
+ */
+
+#include "ibf.h"
+
+/**
+ * Compute the key's hash from the key.
+ * Redefine to use a different hash function.
+ */
+#define IBF_KEY_HASH_VAL(k) (GNUNET_CRYPTO_crc32_n (&(k), sizeof(struct \
+                                                                 IBF_KeyHash)))
+
+/**
+ * Create a key from a hashcode.
+ *
+ * @param hash the hashcode
+ * @return a key
+ */
+struct IBF_Key
+ibf_key_from_hashcode (const struct GNUNET_HashCode *hash)
+{
+  return *(struct IBF_Key *) hash;
+}
+
+
+/**
+ * Create a hashcode from a key, by replicating the key
+ * until the hascode is filled
+ *
+ * @param key the key
+ * @param dst hashcode to store the result in
+ */
+void
+ibf_hashcode_from_key (struct IBF_Key key,
+                       struct GNUNET_HashCode *dst)
+{
+  struct IBF_Key *p;
+  unsigned int i;
+  const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode)
+                                         / sizeof(struct IBF_Key);
+
+  p = (struct IBF_Key *) dst;
+  for (i = 0; i < keys_per_hashcode; i++)
+    *p++ = key;
+}
+
+
+/**
+ * Create an invertible bloom filter.
+ *
+ * @param size number of IBF buckets
+ * @param hash_num number of buckets one element is hashed in
+ * @return the newly created invertible bloom filter, NULL on error
+ */
+struct InvertibleBloomFilter *
+ibf_create (uint32_t size, uint8_t hash_num)
+{
+  struct InvertibleBloomFilter *ibf;
+
+  GNUNET_assert (0 != size);
+
+  ibf = GNUNET_new (struct InvertibleBloomFilter);
+  ibf->count = GNUNET_malloc_large (size * sizeof(uint8_t));
+  if (NULL == ibf->count)
+  {
+    GNUNET_free (ibf);
+    return NULL;
+  }
+  ibf->key_sum = GNUNET_malloc_large (size * sizeof(struct IBF_Key));
+  if (NULL == ibf->key_sum)
+  {
+    GNUNET_free (ibf->count);
+    GNUNET_free (ibf);
+    return NULL;
+  }
+  ibf->key_hash_sum = GNUNET_malloc_large (size * sizeof(struct IBF_KeyHash));
+  if (NULL == ibf->key_hash_sum)
+  {
+    GNUNET_free (ibf->key_sum);
+    GNUNET_free (ibf->count);
+    GNUNET_free (ibf);
+    return NULL;
+  }
+  ibf->size = size;
+  ibf->hash_num = hash_num;
+
+  return ibf;
+}
+
+
+/**
+ * Store unique bucket indices for the specified key in dst.
+ */
+static void
+ibf_get_indices (const struct InvertibleBloomFilter *ibf,
+                 struct IBF_Key key,
+                 int *dst)
+{
+  uint32_t filled;
+  uint32_t i;
+  uint32_t bucket;
+
+  bucket = GNUNET_CRYPTO_crc32_n (&key, sizeof key);
+  for (i = 0, filled = 0; filled < ibf->hash_num; i++)
+  {
+    unsigned int j;
+    uint64_t x;
+    for (j = 0; j < filled; j++)
+      if (dst[j] == bucket)
+        goto try_next;
+    dst[filled++] = bucket % ibf->size;
+try_next:;
+    x = ((uint64_t) bucket << 32) | i;
+    bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x);
+  }
+}
+
+
+static void
+ibf_insert_into (struct InvertibleBloomFilter *ibf,
+                 struct IBF_Key key,
+                 const int *buckets, int side)
+{
+  int i;
+
+  for (i = 0; i < ibf->hash_num; i++)
+  {
+    const int bucket = buckets[i];
+    ibf->count[bucket].count_val += side;
+    ibf->key_sum[bucket].key_val ^= key.key_val;
+    ibf->key_hash_sum[bucket].key_hash_val
+      ^= IBF_KEY_HASH_VAL (key);
+  }
+}
+
+
+/**
+ * Insert a key into an IBF.
+ *
+ * @param ibf the IBF
+ * @param key the element's hash code
+ */
+void
+ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
+{
+  int buckets[ibf->hash_num];
+
+  GNUNET_assert (ibf->hash_num <= ibf->size);
+  ibf_get_indices (ibf, key, buckets);
+  ibf_insert_into (ibf, key, buckets, 1);
+}
+
+
+/**
+ * Remove a key from an IBF.
+ *
+ * @param ibf the IBF
+ * @param key the element's hash code
+ */
+void
+ibf_remove (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
+{
+  int buckets[ibf->hash_num];
+
+  GNUNET_assert (ibf->hash_num <= ibf->size);
+  ibf_get_indices (ibf, key, buckets);
+  ibf_insert_into (ibf, key, buckets, -1);
+}
+
+
+/**
+ * Test is the IBF is empty, i.e. all counts, keys and key hashes are zero.
+ */
+static int
+ibf_is_empty (struct InvertibleBloomFilter *ibf)
+{
+  int i;
+
+  for (i = 0; i < ibf->size; i++)
+  {
+    if (0 != ibf->count[i].count_val)
+      return GNUNET_NO;
+    if (0 != ibf->key_hash_sum[i].key_hash_val)
+      return GNUNET_NO;
+    if (0 != ibf->key_sum[i].key_val)
+      return GNUNET_NO;
+  }
+  return GNUNET_YES;
+}
+
+
+/**
+ * Decode and remove an element from the IBF, if possible.
+ *
+ * @param ibf the invertible bloom filter to decode
+ * @param ret_side sign of the cell's count where the decoded element came 
from.
+ *                 A negative sign indicates that the element was recovered
+ *                 resides in an IBF that was previously subtracted from.
+ * @param ret_id receives the hash code of the decoded element, if successful
+ * @return GNUNET_YES if decoding an element was successful,
+ *         GNUNET_NO if the IBF is empty,
+ *         GNUNET_SYSERR if the decoding has failed
+ */
+int
+ibf_decode (struct InvertibleBloomFilter *ibf,
+            int *ret_side, struct IBF_Key *ret_id)
+{
+  struct IBF_KeyHash hash;
+  int i;
+  int buckets[ibf->hash_num];
+
+  GNUNET_assert (NULL != ibf);
+
+  for (i = 0; i < ibf->size; i++)
+  {
+    int j;
+    int hit;
+
+    /* we can only decode from pure buckets */
+    if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val))
+      continue;
+
+    hash.key_hash_val = IBF_KEY_HASH_VAL (ibf->key_sum[i]);
+
+    /* test if the hash matches the key */
+    if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val)
+      continue;
+
+    /* test if key in bucket hits its own location,
+     * if not, the key hash was subject to collision */
+    hit = GNUNET_NO;
+    ibf_get_indices (ibf, ibf->key_sum[i], buckets);
+    for (j = 0; j < ibf->hash_num; j++)
+      if (buckets[j] == i)
+        hit = GNUNET_YES;
+
+    if (GNUNET_NO == hit)
+      continue;
+
+    if (NULL != ret_side)
+      *ret_side = ibf->count[i].count_val;
+    if (NULL != ret_id)
+      *ret_id = ibf->key_sum[i];
+
+    /* insert on the opposite side, effectively removing the element */
+    ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val);
+
+    return GNUNET_YES;
+  }
+
+  if (GNUNET_YES == ibf_is_empty (ibf))
+    return GNUNET_NO;
+  return GNUNET_SYSERR;
+}
+
+
+/**
+ * Write buckets from an ibf to a buffer.
+ * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
+ *
+ * @param ibf the ibf to write
+ * @param start with which bucket to start
+ * @param count how many buckets to write
+ * @param buf buffer to write the data to
+ */
+void
+ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start,
+                 uint32_t count, void *buf)
+{
+  struct IBF_Key *key_dst;
+  struct IBF_KeyHash *key_hash_dst;
+  struct IBF_Count *count_dst;
+
+  GNUNET_assert (start + count <= ibf->size);
+
+  /* copy keys */
+  key_dst = (struct IBF_Key *) buf;
+  GNUNET_memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst);
+  key_dst += count;
+  /* copy key hashes */
+  key_hash_dst = (struct IBF_KeyHash *) key_dst;
+  GNUNET_memcpy (key_hash_dst, ibf->key_hash_sum + start, count
+                 * sizeof *key_hash_dst);
+  key_hash_dst += count;
+  /* copy counts */
+  count_dst = (struct IBF_Count *) key_hash_dst;
+  GNUNET_memcpy (count_dst, ibf->count + start, count * sizeof *count_dst);
+}
+
+
+/**
+ * Read buckets from a buffer into an ibf.
+ *
+ * @param buf pointer to the buffer to read from
+ * @param start which bucket to start at
+ * @param count how many buckets to read
+ * @param ibf the ibf to read from
+ */
+void
+ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct
+                InvertibleBloomFilter *ibf)
+{
+  struct IBF_Key *key_src;
+  struct IBF_KeyHash *key_hash_src;
+  struct IBF_Count *count_src;
+
+  GNUNET_assert (count > 0);
+  GNUNET_assert (start + count <= ibf->size);
+
+  /* copy keys */
+  key_src = (struct IBF_Key *) buf;
+  GNUNET_memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src);
+  key_src += count;
+  /* copy key hashes */
+  key_hash_src = (struct IBF_KeyHash *) key_src;
+  GNUNET_memcpy (ibf->key_hash_sum + start, key_hash_src, count
+                 * sizeof *key_hash_src);
+  key_hash_src += count;
+  /* copy counts */
+  count_src = (struct IBF_Count *) key_hash_src;
+  GNUNET_memcpy (ibf->count + start, count_src, count * sizeof *count_src);
+}
+
+
+/**
+ * Subtract ibf2 from ibf1, storing the result in ibf1.
+ * The two IBF's must have the same parameters size and hash_num.
+ *
+ * @param ibf1 IBF that is subtracted from
+ * @param ibf2 IBF that will be subtracted from ibf1
+ */
+void
+ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct
+              InvertibleBloomFilter *ibf2)
+{
+  int i;
+
+  GNUNET_assert (ibf1->size == ibf2->size);
+  GNUNET_assert (ibf1->hash_num == ibf2->hash_num);
+
+  for (i = 0; i < ibf1->size; i++)
+  {
+    ibf1->count[i].count_val -= ibf2->count[i].count_val;
+    ibf1->key_hash_sum[i].key_hash_val ^= ibf2->key_hash_sum[i].key_hash_val;
+    ibf1->key_sum[i].key_val ^= ibf2->key_sum[i].key_val;
+  }
+}
+
+
+/**
+ * Create a copy of an IBF, the copy has to be destroyed properly.
+ *
+ * @param ibf the IBF to copy
+ */
+struct InvertibleBloomFilter *
+ibf_dup (const struct InvertibleBloomFilter *ibf)
+{
+  struct InvertibleBloomFilter *copy;
+
+  copy = GNUNET_malloc (sizeof *copy);
+  copy->hash_num = ibf->hash_num;
+  copy->size = ibf->size;
+  copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size
+                                      * sizeof(struct IBF_KeyHash));
+  copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof(struct
+                                                                  IBF_Key));
+  copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof(struct
+                                                              IBF_Count));
+  return copy;
+}
+
+
+/**
+ * Destroy all resources associated with the invertible bloom filter.
+ * No more ibf_*-functions may be called on ibf after calling destroy.
+ *
+ * @param ibf the intertible bloom filter to destroy
+ */
+void
+ibf_destroy (struct InvertibleBloomFilter *ibf)
+{
+  GNUNET_free (ibf->key_sum);
+  GNUNET_free (ibf->key_hash_sum);
+  GNUNET_free (ibf->count);
+  GNUNET_free (ibf);
+}
diff --git a/src/setu/ibf.h b/src/setu/ibf.h
new file mode 100644
index 000000000..7c2ab33b1
--- /dev/null
+++ b/src/setu/ibf.h
@@ -0,0 +1,255 @@
+/*
+      This file is part of GNUnet
+      Copyright (C) 2012 GNUnet e.V.
+
+      GNUnet is free software: you can redistribute it and/or modify it
+      under the terms of the GNU Affero General Public License as published
+      by the Free Software Foundation, either version 3 of the License,
+      or (at your option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      Affero General Public License for more details.
+
+      You should have received a copy of the GNU Affero General Public License
+      along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/ibf.h
+ * @brief invertible bloom filter
+ * @author Florian Dold
+ */
+
+#ifndef GNUNET_CONSENSUS_IBF_H
+#define GNUNET_CONSENSUS_IBF_H
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#if 0                           /* keep Emacsens' auto-indent happy */
+}
+#endif
+#endif
+
+
+/**
+ * Keys that can be inserted into and removed from an IBF.
+ */
+struct IBF_Key
+{
+  uint64_t key_val;
+};
+
+
+/**
+ * Hash of an IBF key.
+ */
+struct IBF_KeyHash
+{
+  uint32_t key_hash_val;
+};
+
+
+/**
+ * Type of the count field of IBF buckets.
+ */
+struct IBF_Count
+{
+  int8_t count_val;
+};
+
+
+/**
+ * Size of one ibf bucket in bytes
+ */
+#define IBF_BUCKET_SIZE (sizeof(struct IBF_Count) + sizeof(struct IBF_Key)   \
+                         + sizeof(struct IBF_KeyHash))
+
+
+/**
+ * Invertible bloom filter (IBF).
+ *
+ * An IBF is a counting bloom filter that has the ability to restore
+ * the hashes of its stored elements with high probability.
+ */
+struct InvertibleBloomFilter
+{
+  /**
+   * How many cells does this IBF have?
+   */
+  uint32_t size;
+
+  /**
+   * In how many cells do we hash one element?
+   * Usually 4 or 3.
+   */
+  uint8_t hash_num;
+
+  /**
+   * Xor sums of the elements' keys, used to identify the elements.
+   * Array of 'size' elements.
+   */
+  struct IBF_Key *key_sum;
+
+  /**
+   * Xor sums of the hashes of the keys of inserted elements.
+   * Array of 'size' elements.
+   */
+  struct IBF_KeyHash *key_hash_sum;
+
+  /**
+   * How many times has a bucket been hit?
+   * Can be negative, as a result of IBF subtraction.
+   * Array of 'size' elements.
+   */
+  struct IBF_Count *count;
+};
+
+
+/**
+ * Write buckets from an ibf to a buffer.
+ * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
+ *
+ * @param ibf the ibf to write
+ * @param start with which bucket to start
+ * @param count how many buckets to write
+ * @param buf buffer to write the data to
+ */
+void
+ibf_write_slice (const struct InvertibleBloomFilter *ibf,
+                 uint32_t start,
+                 uint32_t count,
+                 void *buf);
+
+
+/**
+ * Read buckets from a buffer into an ibf.
+ *
+ * @param buf pointer to the buffer to read from
+ * @param start which bucket to start at
+ * @param count how many buckets to read
+ * @param ibf the ibf to write to
+ */
+void
+ibf_read_slice (const void *buf,
+                uint32_t start,
+                uint32_t count,
+                struct InvertibleBloomFilter *ibf);
+
+
+/**
+ * Create a key from a hashcode.
+ *
+ * @param hash the hashcode
+ * @return a key
+ */
+struct IBF_Key
+ibf_key_from_hashcode (const struct GNUNET_HashCode *hash);
+
+
+/**
+ * Create a hashcode from a key, by replicating the key
+ * until the hascode is filled
+ *
+ * @param key the key
+ * @param dst hashcode to store the result in
+ */
+void
+ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst);
+
+
+/**
+ * Create an invertible bloom filter.
+ *
+ * @param size number of IBF buckets
+ * @param hash_num number of buckets one element is hashed in, usually 3 or 4
+ * @return the newly created invertible bloom filter, NULL on error
+ */
+struct InvertibleBloomFilter *
+ibf_create (uint32_t size, uint8_t hash_num);
+
+
+/**
+ * Insert a key into an IBF.
+ *
+ * @param ibf the IBF
+ * @param key the element's hash code
+ */
+void
+ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key);
+
+
+/**
+ * Remove a key from an IBF.
+ *
+ * @param ibf the IBF
+ * @param key the element's hash code
+ */
+void
+ibf_remove (struct InvertibleBloomFilter *ibf, struct IBF_Key key);
+
+
+/**
+ * Subtract ibf2 from ibf1, storing the result in ibf1.
+ * The two IBF's must have the same parameters size and hash_num.
+ *
+ * @param ibf1 IBF that is subtracted from
+ * @param ibf2 IBF that will be subtracted from ibf1
+ */
+void
+ibf_subtract (struct InvertibleBloomFilter *ibf1,
+              const struct InvertibleBloomFilter *ibf2);
+
+
+/**
+ * Decode and remove an element from the IBF, if possible.
+ *
+ * @param ibf the invertible bloom filter to decode
+ * @param ret_side sign of the cell's count where the decoded element came 
from.
+ *                 A negative sign indicates that the element was recovered
+ *                 resides in an IBF that was previously subtracted from.
+ * @param ret_id receives the hash code of the decoded element, if successful
+ * @return #GNUNET_YES if decoding an element was successful,
+ *         #GNUNET_NO if the IBF is empty,
+ *         #GNUNET_SYSERR if the decoding has failed
+ */
+int
+ibf_decode (struct InvertibleBloomFilter *ibf,
+            int *ret_side,
+            struct IBF_Key *ret_id);
+
+
+/**
+ * Create a copy of an IBF, the copy has to be destroyed properly.
+ *
+ * @param ibf the IBF to copy
+ */
+struct InvertibleBloomFilter *
+ibf_dup (const struct InvertibleBloomFilter *ibf);
+
+
+/**
+ * Destroy all resources associated with the invertible bloom filter.
+ * No more ibf_*-functions may be called on ibf after calling destroy.
+ *
+ * @param ibf the intertible bloom filter to destroy
+ */
+void
+ibf_destroy (struct InvertibleBloomFilter *ibf);
+
+
+#if 0                           /* keep Emacsens' auto-indent happy */
+{
+#endif
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/setu/ibf_sim.c b/src/setu/ibf_sim.c
new file mode 100644
index 000000000..6415d00e1
--- /dev/null
+++ b/src/setu/ibf_sim.c
@@ -0,0 +1,142 @@
+/*
+      This file is part of GNUnet
+      Copyright (C) 2013 GNUnet e.V.
+
+      GNUnet is free software: you can redistribute it and/or modify it
+      under the terms of the GNU Affero General Public License as published
+      by the Free Software Foundation, either version 3 of the License,
+      or (at your option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      Affero General Public License for more details.
+
+      You should have received a copy of the GNU Affero General Public License
+      along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/ibf_sim.c
+ * @brief implementation of simulation for invertible bloom filter
+ * @author Florian Dold
+ *
+ * This code was used for some internal experiments, it is not
+ * build or shipped as part of the GNUnet system.
+ */
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#define MAX_IBF_DECODE 16
+
+/* report average over how many rounds? */
+#define ROUNDS 100000
+
+/* enable one of the three below */
+// simple fix
+#define FIX1 0
+// possibly slightly better fix for large IBF_DECODE values
+#define FIX2 1
+
+// SIGCOMM algorithm
+#define STRATA 0
+
+// print each value?
+#define VERBOSE 0
+// avoid assembly? (ASM is about 50% faster)
+#define SLOW 0
+
+int
+main (int argc, char **argv)
+{
+  unsigned int round;
+  unsigned int buckets[31]; // max is 2^31 as 'random' returns only between 0 
and 2^31
+  unsigned int i;
+  int j;
+  unsigned int r;
+  unsigned int ret;
+  unsigned long long total;
+  unsigned int want;
+  double predict;
+
+  srandom (time (NULL));
+  total = 0;
+  want = atoi (argv[1]);
+  for (round = 0; round < ROUNDS; round++)
+  {
+    memset (buckets, 0, sizeof(buckets));
+    for (i = 0; i < want; i++)
+    {
+      /* FIXME: might want to use 'better' PRNG to avoid
+         PRNG-induced biases */
+      r = random ();
+      if (0 == r)
+        continue;
+#if SLOW
+      for (j = 0; (j < 31) && (0 == (r & (1 << j))); j++)
+        ;
+#else
+      /* use assembly / gcc */
+      j = __builtin_ffs (r) - 1;
+#endif
+      buckets[j]++;
+    }
+    ret = 0;
+    predict = 0.0;
+    for (j = 31; j >= 0; j--)
+    {
+#if FIX1
+      /* improved algorithm, for 1000 elements with IBF-DECODE 8, I
+         get 990/1000 elements on average over 1 million runs; key
+         idea being to stop short of the 'last' possible IBF as
+         otherwise a "lowball" per-chance would unduely influence the
+         result */if ((j > 0) &&
+          (buckets[j - 1] > MAX_IBF_DECODE))
+      {
+        ret *= (1 << (j + 1));
+        break;
+      }
+#endif
+#if FIX2
+      /* another improvement: don't just always cut off the last one,
+         but rather try to predict based on all previous values where
+         that "last" one is; additional prediction can only really
+         work if MAX_IBF_DECODE is sufficiently high */
+      if ((j > 0) &&
+          ((buckets[j - 1] > MAX_IBF_DECODE) ||
+           (predict > MAX_IBF_DECODE)))
+      {
+        ret *= (1 << (j + 1));
+        break;
+      }
+#endif
+#if STRATA
+      /* original algorithm, for 1000 elements with IBF-DECODE 8,
+         I get 920/1000 elements on average over 1 million runs */
+      if (buckets[j] > MAX_IBF_DECODE)
+      {
+        ret *= (1 << (j + 1));
+        break;
+      }
+#endif
+      ret += buckets[j];
+      predict = (buckets[j] + 2.0 * predict) / 2.0;
+    }
+#if VERBOSE
+    fprintf (stderr, "%u ", ret);
+#endif
+    total += ret;
+  }
+  fprintf (stderr, "\n");
+  fprintf (stdout, "average %llu\n", total / ROUNDS);
+  return 0;
+}
+
+
+/* TODO: should calculate stddev of the results to also be able to
+   say something about the stability of the results, outside of
+   large-scale averages -- gaining 8% precision at the expense of
+   50% additional variance might not be worth it... */
diff --git a/src/setu/plugin_block_setu_test.c 
b/src/setu/plugin_block_setu_test.c
new file mode 100644
index 000000000..1de086092
--- /dev/null
+++ b/src/setu/plugin_block_setu_test.c
@@ -0,0 +1,123 @@
+/*
+     This file is part of GNUnet
+     Copyright (C) 2017 GNUnet e.V.
+
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     Affero General Public License for more details.
+
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/plugin_block_set_test.c
+ * @brief set test block, recognizes elements with non-zero first byte as 
invalid
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "gnunet_block_plugin.h"
+#include "gnunet_block_group_lib.h"
+
+
+/**
+ * Function called to validate a reply or a request.  For
+ * request evaluation, simply pass "NULL" for the reply_block.
+ *
+ * @param cls closure
+ * @param ctx block context
+ * @param type block type
+ * @param group block group to use
+ * @param eo control flags
+ * @param query original query (hash)
+ * @param xquery extrended query data (can be NULL, depending on type)
+ * @param xquery_size number of bytes in xquery
+ * @param reply_block response to validate
+ * @param reply_block_size number of bytes in reply block
+ * @return characterization of result
+ */
+static enum GNUNET_BLOCK_EvaluationResult
+block_plugin_set_test_evaluate (void *cls,
+                                struct GNUNET_BLOCK_Context *ctx,
+                                enum GNUNET_BLOCK_Type type,
+                                struct GNUNET_BLOCK_Group *group,
+                                enum GNUNET_BLOCK_EvaluationOptions eo,
+                                const struct GNUNET_HashCode *query,
+                                const void *xquery,
+                                size_t xquery_size,
+                                const void *reply_block,
+                                size_t reply_block_size)
+{
+  if ((NULL == reply_block) ||
+      (reply_block_size == 0) ||
+      (0 != ((char *) reply_block)[0]))
+    return GNUNET_BLOCK_EVALUATION_RESULT_INVALID;
+  return GNUNET_BLOCK_EVALUATION_OK_MORE;
+}
+
+
+/**
+ * Function called to obtain the key for a block.
+ *
+ * @param cls closure
+ * @param type block type
+ * @param block block to get the key for
+ * @param block_size number of bytes in block
+ * @param key set to the key (query) for the given block
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR if type not supported
+ *         (or if extracting a key from a block of this type does not work)
+ */
+static int
+block_plugin_set_test_get_key (void *cls,
+                               enum GNUNET_BLOCK_Type type,
+                               const void *block,
+                               size_t block_size,
+                               struct GNUNET_HashCode *key)
+{
+  return GNUNET_SYSERR;
+}
+
+
+/**
+ * Entry point for the plugin.
+ */
+void *
+libgnunet_plugin_block_set_test_init (void *cls)
+{
+  static enum GNUNET_BLOCK_Type types[] = {
+    GNUNET_BLOCK_TYPE_SET_TEST,
+    GNUNET_BLOCK_TYPE_ANY       /* end of list */
+  };
+  struct GNUNET_BLOCK_PluginFunctions *api;
+
+  api = GNUNET_new (struct GNUNET_BLOCK_PluginFunctions);
+  api->evaluate = &block_plugin_set_test_evaluate;
+  api->get_key = &block_plugin_set_test_get_key;
+  api->types = types;
+  return api;
+}
+
+
+/**
+ * Exit point from the plugin.
+ */
+void *
+libgnunet_plugin_block_set_test_done (void *cls)
+{
+  struct GNUNET_BLOCK_PluginFunctions *api = cls;
+
+  GNUNET_free (api);
+  return NULL;
+}
+
+
+/* end of plugin_block_set_test.c */
diff --git a/src/setu/setu.h b/src/setu/setu.h
new file mode 100644
index 000000000..f1c5df92b
--- /dev/null
+++ b/src/setu/setu.h
@@ -0,0 +1,304 @@
+/*
+     This file is part of GNUnet.
+     Copyright (C) 2012-2014, 2020 GNUnet e.V.
+
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     Affero General Public License for more details.
+
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file set/set.h
+ * @brief messages used for the set union api
+ * @author Florian Dold
+ * @author Christian Grothoff
+ */
+#ifndef SET_H
+#define SET_H
+
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_set_service.h"
+
+GNUNET_NETWORK_STRUCT_BEGIN
+
+/**
+ * Message sent by the client to the service to ask starting
+ * a new set to perform operations with.  Includes the desired
+ * set operation type.
+ */
+struct GNUNET_SETU_CreateMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SETU_CREATE
+   */
+  struct GNUNET_MessageHeader header;
+
+};
+
+
+/**
+ * Message sent by the client to the service to start listening for
+ * incoming requests to perform a certain type of set operation for a
+ * certain type of application.
+ */
+struct GNUNET_SETU_ListenMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SETU_LISTEN
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Always zero.
+   */
+  uint32_t reserved GNUNET_PACKED;
+
+  /**
+   * application id
+   */
+  struct GNUNET_HashCode app_id;
+};
+
+
+/**
+ * Message sent by a listening client to the service to accept
+ * performing the operation with the other peer.
+ */
+struct GNUNET_SETU_AcceptMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SETU_ACCEPT
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * ID of the incoming request we want to accept.
+   */
+  uint32_t accept_reject_id GNUNET_PACKED;
+
+  /**
+   * Request ID to identify responses.
+   */
+  uint32_t request_id GNUNET_PACKED;
+
+  /**
+   * Always use delta operation instead of sending full sets,
+   * even it it's less efficient.
+   */
+  uint8_t force_delta;
+
+  /**
+   * Always send full sets, even if delta operations would
+   * be more efficient.
+   */
+  uint8_t force_full;
+
+  /**
+   * #GNUNET_YES to fail operations where Byzantine faults
+   * are suspected
+   */
+  uint8_t byzantine;
+
+  /**
+   * Lower bound for the set size, used only when
+   * byzantine mode is enabled.
+   */
+  uint8_t byzantine_lower_bound;
+};
+
+
+/**
+ * Message sent by a listening client to the service to reject
+ * performing the operation with the other peer.
+ */
+struct GNUNET_SETU_RejectMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SETU_REJECT
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * ID of the incoming request we want to reject.
+   */
+  uint32_t accept_reject_id GNUNET_PACKED;
+};
+
+
+/**
+ * A request for an operation with another client.
+ */
+struct GNUNET_SETU_RequestMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SETU_REQUEST.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * ID of the to identify the request when accepting or
+   * rejecting it.
+   */
+  uint32_t accept_id GNUNET_PACKED;
+
+  /**
+   * Identity of the requesting peer.
+   */
+  struct GNUNET_PeerIdentity peer_id;
+
+  /* rest: context message, that is, application-specific
+     message to convince listener to pick up */
+};
+
+
+/**
+ * Message sent by client to service to initiate a set operation as a
+ * client (not as listener).  A set (which determines the operation
+ * type) must already exist in association with this client.
+ */
+struct GNUNET_SETU_EvaluateMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SETU_EVALUATE
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Id of our set to evaluate, chosen implicitly by the client when it
+   * calls #GNUNET_SETU_commit().
+   */
+  uint32_t request_id GNUNET_PACKED;
+
+  /**
+   * Peer to evaluate the operation with
+   */
+  struct GNUNET_PeerIdentity target_peer;
+
+  /**
+   * Application id
+   */
+  struct GNUNET_HashCode app_id;
+
+  /**
+   * Always use delta operation instead of sending full sets,
+   * even it it's less efficient.
+   */
+  uint8_t force_delta;
+
+  /**
+   * Always send full sets, even if delta operations would
+   * be more efficient.
+   */
+  uint8_t force_full;
+
+  /**
+   * #GNUNET_YES to fail operations where Byzantine faults
+   * are suspected
+   */
+  uint8_t byzantine;
+
+  /**
+   * Lower bound for the set size, used only when
+   * byzantine mode is enabled.
+   */
+  uint8_t byzantine_lower_bound;
+
+  /* rest: context message, that is, application-specific
+     message to convince listener to pick up */
+};
+
+
+/**
+ * Message sent by the service to the client to indicate an
+ * element that is removed (set intersection) or added
+ * (set union) or part of the final result, depending on
+ * options specified for the operation.
+ */
+struct GNUNET_SETU_ResultMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SETU_RESULT
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Current set size.
+   */
+  uint64_t current_size;
+
+  /**
+   * id the result belongs to
+   */
+  uint32_t request_id GNUNET_PACKED;
+
+  /**
+   * Was the evaluation successful? Contains
+   * an `enum GNUNET_SETU_Status` in NBO.
+   */
+  uint16_t result_status GNUNET_PACKED;
+
+  /**
+   * Type of the element attachted to the message, if any.
+   */
+  uint16_t element_type GNUNET_PACKED;
+
+  /* rest: the actual element */
+};
+
+
+/**
+ * Message sent by client to the service to add
+ * an element to the set.
+ */
+struct GNUNET_SETU_ElementMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SETU_ADD
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Type of the element to add or remove.
+   */
+  uint16_t element_type GNUNET_PACKED;
+
+  /**
+   * For alignment, always zero.
+   */
+  uint16_t reserved GNUNET_PACKED;
+
+  /* rest: the actual element */
+};
+
+
+/**
+ * Sent to the service by the client in order to cancel a set operation.
+ */
+struct GNUNET_SETU_CancelMessage
+{
+  /**
+   * Type: #GNUNET_MESSAGE_TYPE_SETU_CANCEL
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * ID of the request we want to cancel.
+   */
+  uint32_t request_id GNUNET_PACKED;
+};
+
+
+GNUNET_NETWORK_STRUCT_END
+
+#endif
diff --git a/src/setu/setu_api.c b/src/setu/setu_api.c
new file mode 100644
index 000000000..48260de55
--- /dev/null
+++ b/src/setu/setu_api.c
@@ -0,0 +1,872 @@
+/*
+     This file is part of GNUnet.
+     Copyright (C) 2012-2016, 2020 GNUnet e.V.
+
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     Affero General Public License for more details.
+
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file set/setu_api.c
+ * @brief api for the set union service
+ * @author Florian Dold
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_protocols.h"
+#include "gnunet_setu_service.h"
+#include "setu.h"
+
+
+#define LOG(kind, ...) GNUNET_log_from (kind, "set-api", __VA_ARGS__)
+
+/**
+ * Opaque handle to a set.
+ */
+struct GNUNET_SETU_Handle
+{
+  /**
+   * Message queue for @e client.
+   */
+  struct GNUNET_MQ_Handle *mq;
+
+  /**
+   * Linked list of operations on the set.
+   */
+  struct GNUNET_SETU_OperationHandle *ops_head;
+
+  /**
+   * Linked list of operations on the set.
+   */
+  struct GNUNET_SETU_OperationHandle *ops_tail;
+
+  /**
+   * Should the set be destroyed once all operations are gone?
+   * #GNUNET_SYSERR if #GNUNET_SETU_destroy() must raise this flag,
+   * #GNUNET_YES if #GNUNET_SETU_destroy() did raise this flag.
+   */
+  int destroy_requested;
+
+  /**
+   * Has the set become invalid (e.g. service died)?
+   */
+  int invalid;
+
+};
+
+
+/**
+ * Handle for a set operation request from another peer.
+ */
+struct GNUNET_SETU_Request
+{
+  /**
+   * Id of the request, used to identify the request when
+   * accepting/rejecting it.
+   */
+  uint32_t accept_id;
+
+  /**
+   * Has the request been accepted already?
+   * #GNUNET_YES/#GNUNET_NO
+   */
+  int accepted;
+};
+
+
+/**
+ * Handle to an operation.  Only known to the service after committing
+ * the handle with a set.
+ */
+struct GNUNET_SETU_OperationHandle
+{
+  /**
+   * Function to be called when we have a result,
+   * or an error.
+   */
+  GNUNET_SETU_ResultIterator result_cb;
+
+  /**
+   * Closure for @e result_cb.
+   */
+  void *result_cls;
+
+  /**
+   * Local set used for the operation,
+   * NULL if no set has been provided by conclude yet.
+   */
+  struct GNUNET_SETU_Handle *set;
+
+  /**
+   * Message sent to the server on calling conclude,
+   * NULL if conclude has been called.
+   */
+  struct GNUNET_MQ_Envelope *conclude_mqm;
+
+  /**
+   * Address of the request if in the conclude message,
+   * used to patch the request id into the message when the set is known.
+   */
+  uint32_t *request_id_addr;
+
+  /**
+   * Handles are kept in a linked list.
+   */
+  struct GNUNET_SETU_OperationHandle *prev;
+
+  /**
+   * Handles are kept in a linked list.
+   */
+  struct GNUNET_SETU_OperationHandle *next;
+
+  /**
+   * Request ID to identify the operation within the set.
+   */
+  uint32_t request_id;
+};
+
+
+/**
+ * Opaque handle to a listen operation.
+ */
+struct GNUNET_SETU_ListenHandle
+{
+  /**
+   * Message queue for the client.
+   */
+  struct GNUNET_MQ_Handle*mq;
+
+  /**
+   * Configuration handle for the listener, stored
+   * here to be able to reconnect transparently on
+   * connection failure.
+   */
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+  /**
+   * Function to call on a new incoming request,
+   * or on error.
+   */
+  GNUNET_SETU_ListenCallback listen_cb;
+
+  /**
+   * Closure for @e listen_cb.
+   */
+  void *listen_cls;
+
+  /**
+   * Application ID we listen for.
+   */
+  struct GNUNET_HashCode app_id;
+
+  /**
+   * Time to wait until we try to reconnect on failure.
+   */
+  struct GNUNET_TIME_Relative reconnect_backoff;
+
+  /**
+   * Task for reconnecting when the listener fails.
+   */
+  struct GNUNET_SCHEDULER_Task *reconnect_task;
+
+};
+
+
+/**
+ * Check that the given @a msg is well-formed.
+ *
+ * @param cls closure
+ * @param msg message to check
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_result (void *cls,
+              const struct GNUNET_SETU_ResultMessage *msg)
+{
+  /* minimum size was already checked, everything else is OK! */
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle result message for a set operation.
+ *
+ * @param cls the set
+ * @param mh the message
+ */
+static void
+handle_result (void *cls,
+               const struct GNUNET_SETU_ResultMessage *msg)
+{
+  struct GNUNET_SETU_Handle *set = cls;
+  struct GNUNET_SETU_OperationHandle *oh;
+  struct GNUNET_SETU_Element e;
+  enum GNUNET_SETU_Status result_status;
+  int destroy_set;
+
+  GNUNET_assert (NULL != set->mq);
+  result_status = (enum GNUNET_SETU_Status) ntohs (msg->result_status);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got result message with status %d\n",
+       result_status);
+  oh = GNUNET_MQ_assoc_get (set->mq,
+                            ntohl (msg->request_id));
+  if (NULL == oh)
+  {
+    /* 'oh' can be NULL if we canceled the operation, but the service
+       did not get the cancel message yet. */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Ignoring result from canceled operation\n");
+    return;
+  }
+
+  switch (result_status)
+  {
+  case GNUNET_SETU_STATUS_ADD_LOCAL:
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Treating result as element\n");
+    e.data = &msg[1];
+    e.size = ntohs (msg->header.size)
+             - sizeof(struct GNUNET_SETU_ResultMessage);
+    e.element_type = ntohs (msg->element_type);
+    if (NULL != oh->result_cb)
+      oh->result_cb (oh->result_cls,
+                     &e,
+                     GNUNET_ntohll (msg->current_size),
+                     result_status);
+    return;
+  case GNUNET_SETU_STATUS_FAILURE:
+  case GNUNET_SETU_STATUS_DONE:
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Treating result as final status\n");
+    GNUNET_MQ_assoc_remove (set->mq,
+                            ntohl (msg->request_id));
+    GNUNET_CONTAINER_DLL_remove (set->ops_head,
+                                 set->ops_tail,
+                                 oh);
+    /* Need to do this calculation _before_ the result callback,
+       as IF the application still has a valid set handle, it
+       may trigger destruction of the set during the callback. */
+    destroy_set = (GNUNET_YES == set->destroy_requested) &&
+                  (NULL == set->ops_head);
+    if (NULL != oh->result_cb)
+    {
+      oh->result_cb (oh->result_cls,
+                     NULL,
+                     GNUNET_ntohll (msg->current_size),
+                     result_status);
+    }
+    else
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "No callback for final status\n");
+    }
+    if (destroy_set)
+      GNUNET_SETU_destroy (set);
+    GNUNET_free (oh);
+    return;
+  }
+}
+
+
+/**
+ * Destroy the given set operation.
+ *
+ * @param oh set operation to destroy
+ */
+static void
+set_operation_destroy (struct GNUNET_SETU_OperationHandle *oh)
+{
+  struct GNUNET_SETU_Handle *set = oh->set;
+  struct GNUNET_SETU_OperationHandle *h_assoc;
+
+  if (NULL != oh->conclude_mqm)
+    GNUNET_MQ_discard (oh->conclude_mqm);
+  /* is the operation already commited? */
+  if (NULL != set)
+  {
+    GNUNET_CONTAINER_DLL_remove (set->ops_head,
+                                 set->ops_tail,
+                                 oh);
+    h_assoc = GNUNET_MQ_assoc_remove (set->mq,
+                                      oh->request_id);
+    GNUNET_assert ((NULL == h_assoc) ||
+                   (h_assoc == oh));
+  }
+  GNUNET_free (oh);
+}
+
+
+/**
+ * Cancel the given set operation.  We need to send an explicit cancel
+ * message, as all operations one one set communicate using one
+ * handle.
+ *
+ * @param oh set operation to cancel
+ */
+void
+GNUNET_SETU_operation_cancel (struct GNUNET_SETU_OperationHandle *oh)
+{
+  struct GNUNET_SETU_Handle *set = oh->set;
+  struct GNUNET_SETU_CancelMessage *m;
+  struct GNUNET_MQ_Envelope *mqm;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Cancelling SET operation\n");
+  if (NULL != set)
+  {
+    mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SETU_CANCEL);
+    m->request_id = htonl (oh->request_id);
+    GNUNET_MQ_send (set->mq, mqm);
+  }
+  set_operation_destroy (oh);
+  if ((NULL != set) &&
+      (GNUNET_YES == set->destroy_requested) &&
+      (NULL == set->ops_head))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Destroying set after operation cancel\n");
+    GNUNET_SETU_destroy (set);
+  }
+}
+
+
+/**
+ * We encountered an error communicating with the set service while
+ * performing a set operation. Report to the application.
+ *
+ * @param cls the `struct GNUNET_SETU_Handle`
+ * @param error error code
+ */
+static void
+handle_client_set_error (void *cls,
+                         enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_SETU_Handle *set = cls;
+
+  LOG (GNUNET_ERROR_TYPE_ERROR,
+       "Handling client set error %d\n",
+       error);
+  while (NULL != set->ops_head)
+  {
+    if ((NULL != set->ops_head->result_cb) &&
+        (GNUNET_NO == set->destroy_requested))
+      set->ops_head->result_cb (set->ops_head->result_cls,
+                                NULL,
+                                0,
+                                GNUNET_SETU_STATUS_FAILURE);
+    set_operation_destroy (set->ops_head);
+  }
+  set->invalid = GNUNET_YES;
+}
+
+
+/**
+ * Create an empty set, supporting the specified operation.
+ *
+ * @param cfg configuration to use for connecting to the
+ *        set service
+ * @return a handle to the set
+ */
+struct GNUNET_SETU_Handle *
+GNUNET_SETU_create (const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  struct GNUNET_SETU_Handle *set = GNUNET_new (struct GNUNET_SETU_Handle);
+  struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+    GNUNET_MQ_hd_var_size (result,
+                           GNUNET_MESSAGE_TYPE_SETU_RESULT,
+                           struct GNUNET_SETU_ResultMessage,
+                           set),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_SETU_CreateMessage *create_msg;
+
+  set->mq = GNUNET_CLIENT_connect (cfg,
+                                   "setu",
+                                   mq_handlers,
+                                   &handle_client_set_error,
+                                   set);
+  if (NULL == set->mq)
+  {
+    GNUNET_free (set);
+    return NULL;
+  }
+  mqm = GNUNET_MQ_msg (create_msg,
+                       GNUNET_MESSAGE_TYPE_SETU_CREATE);
+  GNUNET_MQ_send (set->mq,
+                  mqm);
+  return set;
+}
+
+
+/**
+ * Add an element to the given set.  After the element has been added
+ * (in the sense of being transmitted to the set service), @a cont
+ * will be called.  Multiple calls to GNUNET_SETU_add_element() can be
+ * queued.
+ *
+ * @param set set to add element to
+ * @param element element to add to the set
+ * @param cb continuation called after the element has been added
+ * @param cb_cls closure for @a cb
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
+ *         set is invalid (e.g. the set service crashed)
+ */
+int
+GNUNET_SETU_add_element (struct GNUNET_SETU_Handle *set,
+                         const struct GNUNET_SETU_Element *element,
+                         GNUNET_SCHEDULER_TaskCallback cb,
+                         void *cb_cls)
+{
+  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_SETU_ElementMessage *msg;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "adding element of type %u to set %p\n",
+       (unsigned int) element->element_type,
+       set);
+  GNUNET_assert (NULL != set);
+  if (GNUNET_YES == set->invalid)
+  {
+    if (NULL != cb)
+      cb (cb_cls);
+    return GNUNET_SYSERR;
+  }
+  mqm = GNUNET_MQ_msg_extra (msg,
+                             element->size,
+                             GNUNET_MESSAGE_TYPE_SETU_ADD);
+  msg->element_type = htons (element->element_type);
+  GNUNET_memcpy (&msg[1],
+                 element->data,
+                 element->size);
+  GNUNET_MQ_notify_sent (mqm,
+                         cb,
+                         cb_cls);
+  GNUNET_MQ_send (set->mq,
+                  mqm);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Destroy the set handle if no operations are left, mark the set
+ * for destruction otherwise.
+ *
+ * @param set set handle to destroy
+ */
+void
+GNUNET_SETU_destroy (struct GNUNET_SETU_Handle *set)
+{
+  /* destroying set while iterator is active is currently
+     not supported; we should expand the API to allow
+     clients to explicitly cancel the iteration! */
+  GNUNET_assert (NULL != set);
+  if ((NULL != set->ops_head) ||
+      (GNUNET_SYSERR == set->destroy_requested))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Set operations are pending, delaying set destruction\n");
+    set->destroy_requested = GNUNET_YES;
+    return;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Really destroying set\n");
+  if (NULL != set->mq)
+  {
+    GNUNET_MQ_destroy (set->mq);
+    set->mq = NULL;
+  }
+  GNUNET_free (set);
+}
+
+
+/**
+ * Prepare a set operation to be evaluated with another peer.
+ * The evaluation will not start until the client provides
+ * a local set with #GNUNET_SETU_commit().
+ *
+ * @param other_peer peer with the other set
+ * @param app_id hash for the application using the set
+ * @param context_msg additional information for the request
+ * @param result_cb called on error or success
+ * @param result_cls closure for @e result_cb
+ * @return a handle to cancel the operation
+ */
+struct GNUNET_SETU_OperationHandle *
+GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer,
+                     const struct GNUNET_HashCode *app_id,
+                     const struct GNUNET_MessageHeader *context_msg,
+                     const struct GNUNET_SETU_Option options[],
+                     GNUNET_SETU_ResultIterator result_cb,
+                     void *result_cls)
+{
+  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_SETU_OperationHandle *oh;
+  struct GNUNET_SETU_EvaluateMessage *msg;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Client prepares set union operation\n");
+  oh = GNUNET_new (struct GNUNET_SETU_OperationHandle);
+  oh->result_cb = result_cb;
+  oh->result_cls = result_cls;
+  mqm = GNUNET_MQ_msg_nested_mh (msg,
+                                 GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
+                                 context_msg);
+  msg->app_id = *app_id;
+  msg->target_peer = *other_peer;
+  for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
+  {
+    switch (opt->type)
+    {
+    case GNUNET_SETU_OPTION_BYZANTINE:
+      msg->byzantine = GNUNET_YES;
+      msg->byzantine_lower_bound = opt->v.num;
+      break;
+    case GNUNET_SETU_OPTION_FORCE_FULL:
+      msg->force_full = GNUNET_YES;
+      break;
+    case GNUNET_SETU_OPTION_FORCE_DELTA:
+      msg->force_delta = GNUNET_YES;
+      break;
+    default:
+      LOG (GNUNET_ERROR_TYPE_ERROR,
+           "Option with type %d not recognized\n",
+           (int) opt->type);
+    }
+  }
+  oh->conclude_mqm = mqm;
+  oh->request_id_addr = &msg->request_id;
+  return oh;
+}
+
+
+/**
+ * Connect to the set service in order to listen for requests.
+ *
+ * @param cls the `struct GNUNET_SETU_ListenHandle *` to connect
+ */
+static void
+listen_connect (void *cls);
+
+
+/**
+ * Check validity of request message for a listen operation
+ *
+ * @param cls the listen handle
+ * @param msg the message
+ * @return #GNUNET_OK if the message is well-formed
+ */
+static int
+check_request (void *cls,
+               const struct GNUNET_SETU_RequestMessage *msg)
+{
+  const struct GNUNET_MessageHeader *context_msg;
+
+  if (ntohs (msg->header.size) == sizeof(*msg))
+    return GNUNET_OK; /* no context message is OK */
+  context_msg = GNUNET_MQ_extract_nested_mh (msg);
+  if (NULL == context_msg)
+  {
+    /* malformed context message is NOT ok */
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle request message for a listen operation
+ *
+ * @param cls the listen handle
+ * @param msg the message
+ */
+static void
+handle_request (void *cls,
+                const struct GNUNET_SETU_RequestMessage *msg)
+{
+  struct GNUNET_SETU_ListenHandle *lh = cls;
+  struct GNUNET_SETU_Request req;
+  const struct GNUNET_MessageHeader *context_msg;
+  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_SETU_RejectMessage *rmsg;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Processing incoming operation request with id %u\n",
+       ntohl (msg->accept_id));
+  /* we got another valid request => reset the backoff */
+  lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  req.accept_id = ntohl (msg->accept_id);
+  req.accepted = GNUNET_NO;
+  context_msg = GNUNET_MQ_extract_nested_mh (msg);
+  /* calling #GNUNET_SETU_accept() in the listen cb will set req->accepted */
+  lh->listen_cb (lh->listen_cls,
+                 &msg->peer_id,
+                 context_msg,
+                 &req);
+  if (GNUNET_YES == req.accepted)
+    return; /* the accept-case is handled in #GNUNET_SETU_accept() */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Rejected request %u\n",
+       ntohl (msg->accept_id));
+  mqm = GNUNET_MQ_msg (rmsg,
+                       GNUNET_MESSAGE_TYPE_SETU_REJECT);
+  rmsg->accept_reject_id = msg->accept_id;
+  GNUNET_MQ_send (lh->mq,
+                  mqm);
+}
+
+
+/**
+ * Our connection with the set service encountered an error,
+ * re-initialize with exponential back-off.
+ *
+ * @param cls the `struct GNUNET_SETU_ListenHandle *`
+ * @param error reason for the disconnect
+ */
+static void
+handle_client_listener_error (void *cls,
+                              enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_SETU_ListenHandle *lh = cls;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Listener broke down (%d), re-connecting\n",
+       (int) error);
+  GNUNET_MQ_destroy (lh->mq);
+  lh->mq = NULL;
+  lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
+                                                     &listen_connect,
+                                                     lh);
+  lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
+}
+
+
+/**
+ * Connect to the set service in order to listen for requests.
+ *
+ * @param cls the `struct GNUNET_SETU_ListenHandle *` to connect
+ */
+static void
+listen_connect (void *cls)
+{
+  struct GNUNET_SETU_ListenHandle *lh = cls;
+  struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+    GNUNET_MQ_hd_var_size (request,
+                           GNUNET_MESSAGE_TYPE_SETU_REQUEST,
+                           struct GNUNET_SETU_RequestMessage,
+                           lh),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_SETU_ListenMessage *msg;
+
+  lh->reconnect_task = NULL;
+  GNUNET_assert (NULL == lh->mq);
+  lh->mq = GNUNET_CLIENT_connect (lh->cfg,
+                                  "setu",
+                                  mq_handlers,
+                                  &handle_client_listener_error,
+                                  lh);
+  if (NULL == lh->mq)
+    return;
+  mqm = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_SETU_LISTEN);
+  msg->app_id = lh->app_id;
+  GNUNET_MQ_send (lh->mq,
+                  mqm);
+}
+
+
+/**
+ * Wait for set operation requests for the given application id
+ *
+ * @param cfg configuration to use for connecting to
+ *            the set service, needs to be valid for the lifetime of the 
listen handle
+ * @param app_id id of the application that handles set operation requests
+ * @param listen_cb called for each incoming request matching the operation
+ *                  and application id
+ * @param listen_cls handle for @a listen_cb
+ * @return a handle that can be used to cancel the listen operation
+ */
+struct GNUNET_SETU_ListenHandle *
+GNUNET_SETU_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                    const struct GNUNET_HashCode *app_id,
+                    GNUNET_SETU_ListenCallback listen_cb,
+                    void *listen_cls)
+{
+  struct GNUNET_SETU_ListenHandle *lh;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Starting listener for app %s\n",
+       GNUNET_h2s (app_id));
+  lh = GNUNET_new (struct GNUNET_SETU_ListenHandle);
+  lh->listen_cb = listen_cb;
+  lh->listen_cls = listen_cls;
+  lh->cfg = cfg;
+  lh->app_id = *app_id;
+  lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  listen_connect (lh);
+  if (NULL == lh->mq)
+  {
+    GNUNET_free (lh);
+    return NULL;
+  }
+  return lh;
+}
+
+
+/**
+ * Cancel the given listen operation.
+ *
+ * @param lh handle for the listen operation
+ */
+void
+GNUNET_SETU_listen_cancel (struct GNUNET_SETU_ListenHandle *lh)
+{
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Canceling listener %s\n",
+       GNUNET_h2s (&lh->app_id));
+  if (NULL != lh->mq)
+  {
+    GNUNET_MQ_destroy (lh->mq);
+    lh->mq = NULL;
+  }
+  if (NULL != lh->reconnect_task)
+  {
+    GNUNET_SCHEDULER_cancel (lh->reconnect_task);
+    lh->reconnect_task = NULL;
+  }
+  GNUNET_free (lh);
+}
+
+
+/**
+ * Accept a request we got via #GNUNET_SETU_listen.  Must be called during
+ * #GNUNET_SETU_listen, as the 'struct GNUNET_SETU_Request' becomes invalid
+ * afterwards.
+ * Call #GNUNET_SETU_commit to provide the local set to use for the operation,
+ * and to begin the exchange with the remote peer.
+ *
+ * @param request request to accept
+ * @param result_mode specified how results will be returned,
+ *        see `enum GNUNET_SETU_ResultMode`.
+ * @param result_cb callback for the results
+ * @param result_cls closure for @a result_cb
+ * @return a handle to cancel the operation
+ */
+struct GNUNET_SETU_OperationHandle *
+GNUNET_SETU_accept (struct GNUNET_SETU_Request *request,
+                    const struct GNUNET_SETU_Option options[],
+                    GNUNET_SETU_ResultIterator result_cb,
+                    void *result_cls)
+{
+  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_SETU_OperationHandle *oh;
+  struct GNUNET_SETU_AcceptMessage *msg;
+
+  GNUNET_assert (GNUNET_NO == request->accepted);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Client accepts set union operation with id %u\n",
+       request->accept_id);
+  request->accepted = GNUNET_YES;
+  mqm = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_SETU_ACCEPT);
+  msg->accept_reject_id = htonl (request->accept_id);
+  oh = GNUNET_new (struct GNUNET_SETU_OperationHandle);
+  oh->result_cb = result_cb;
+  oh->result_cls = result_cls;
+  oh->conclude_mqm = mqm;
+  oh->request_id_addr = &msg->request_id;
+  return oh;
+}
+
+
+/**
+ * Commit a set to be used with a set operation.
+ * This function is called once we have fully constructed
+ * the set that we want to use for the operation.  At this
+ * time, the P2P protocol can then begin to exchange the
+ * set information and call the result callback with the
+ * result information.
+ *
+ * @param oh handle to the set operation
+ * @param set the set to use for the operation
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
+ *         set is invalid (e.g. the set service crashed)
+ */
+int
+GNUNET_SETU_commit (struct GNUNET_SETU_OperationHandle *oh,
+                    struct GNUNET_SETU_Handle *set)
+{
+  if (NULL != oh->set)
+  {
+    /* Some other set was already committed for this
+     * operation, there is a logic bug in the client of this API */
+    GNUNET_break (0);
+    return GNUNET_OK;
+  }
+  GNUNET_assert (NULL != set);
+  if (GNUNET_YES == set->invalid)
+    return GNUNET_SYSERR;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Client commits to SET\n");
+  GNUNET_assert (NULL != oh->conclude_mqm);
+  oh->set = set;
+  GNUNET_CONTAINER_DLL_insert (set->ops_head,
+                               set->ops_tail,
+                               oh);
+  oh->request_id = GNUNET_MQ_assoc_add (set->mq,
+                                        oh);
+  *oh->request_id_addr = htonl (oh->request_id);
+  GNUNET_MQ_send (set->mq,
+                  oh->conclude_mqm);
+  oh->conclude_mqm = NULL;
+  oh->request_id_addr = NULL;
+  return GNUNET_OK;
+}
+
+
+/**
+ * Hash a set element.
+ *
+ * @param element the element that should be hashed
+ * @param[out] ret_hash a pointer to where the hash of @a element
+ *        should be stored
+ */
+void
+GNUNET_SETU_element_hash (const struct GNUNET_SETU_Element *element,
+                          struct GNUNET_HashCode *ret_hash)
+{
+  struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
+
+  /* It's not guaranteed that the element data is always after the element 
header,
+     so we need to hash the chunks separately. */
+  GNUNET_CRYPTO_hash_context_read (ctx,
+                                   &element->size,
+                                   sizeof(uint16_t));
+  GNUNET_CRYPTO_hash_context_read (ctx,
+                                   &element->element_type,
+                                   sizeof(uint16_t));
+  GNUNET_CRYPTO_hash_context_read (ctx,
+                                   element->data,
+                                   element->size);
+  GNUNET_CRYPTO_hash_context_finish (ctx,
+                                     ret_hash);
+}
+
+
+/* end of setu_api.c */
diff --git a/src/setu/test_setu_api.c b/src/setu/test_setu_api.c
new file mode 100644
index 000000000..95119873c
--- /dev/null
+++ b/src/setu/test_setu_api.c
@@ -0,0 +1,360 @@
+/*
+     This file is part of GNUnet.
+     Copyright (C) 2012 GNUnet e.V.
+
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     Affero General Public License for more details.
+
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/test_setu_api.c
+ * @brief testcase for setu_api.c
+ * @author Florian Dold
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_testing_lib.h"
+#include "gnunet_setu_service.h"
+
+
+static struct GNUNET_PeerIdentity local_id;
+
+static struct GNUNET_HashCode app_id;
+
+static struct GNUNET_SETU_Handle *set1;
+
+static struct GNUNET_SETU_Handle *set2;
+
+static struct GNUNET_SETU_ListenHandle *listen_handle;
+
+static struct GNUNET_SETU_OperationHandle *oh1;
+
+static struct GNUNET_SETU_OperationHandle *oh2;
+
+static const struct GNUNET_CONFIGURATION_Handle *config;
+
+static int ret;
+
+static struct GNUNET_SCHEDULER_Task *tt;
+
+
+static void
+result_cb_set1 (void *cls,
+                const struct GNUNET_SETU_Element *element,
+                uint64_t size,
+                enum GNUNET_SETU_Status status)
+{
+  switch (status)
+  {
+  case GNUNET_SETU_STATUS_ADD_LOCAL:
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n");
+    break;
+
+  case GNUNET_SETU_STATUS_FAILURE:
+    GNUNET_break (0);
+    oh1 = NULL;
+    fprintf (stderr, "set 1: received failure status!\n");
+    ret = 1;
+    if (NULL != tt)
+    {
+      GNUNET_SCHEDULER_cancel (tt);
+      tt = NULL;
+    }
+    GNUNET_SCHEDULER_shutdown ();
+    break;
+
+  case GNUNET_SETU_STATUS_DONE:
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n");
+    oh1 = NULL;
+    if (NULL != set1)
+    {
+      GNUNET_SETU_destroy (set1);
+      set1 = NULL;
+    }
+    if (NULL == set2)
+    {
+      GNUNET_SCHEDULER_cancel (tt);
+      tt = NULL;
+      GNUNET_SCHEDULER_shutdown ();
+    }
+    break;
+
+  default:
+    GNUNET_assert (0);
+  }
+}
+
+
+static void
+result_cb_set2 (void *cls,
+                const struct GNUNET_SETU_Element *element,
+                uint64_t size,
+                enum GNUNET_SETU_Status status)
+{
+  switch (status)
+  {
+  case GNUNET_SETU_STATUS_ADD_LOCAL:
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n");
+    break;
+
+  case GNUNET_SETU_STATUS_FAILURE:
+    GNUNET_break (0);
+    oh2 = NULL;
+    fprintf (stderr, "set 2: received failure status\n");
+    GNUNET_SCHEDULER_shutdown ();
+    ret = 1;
+    break;
+
+  case GNUNET_SETU_STATUS_DONE:
+    oh2 = NULL;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n");
+    GNUNET_SETU_destroy (set2);
+    set2 = NULL;
+    if (NULL == set1)
+    {
+      GNUNET_SCHEDULER_cancel (tt);
+      tt = NULL;
+      GNUNET_SCHEDULER_shutdown ();
+    }
+    break;
+
+  default:
+    GNUNET_assert (0);
+  }
+}
+
+
+static void
+listen_cb (void *cls,
+           const struct GNUNET_PeerIdentity *other_peer,
+           const struct GNUNET_MessageHeader *context_msg,
+           struct GNUNET_SETU_Request *request)
+{
+  GNUNET_assert (NULL != context_msg);
+  GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n");
+  oh2 = GNUNET_SETU_accept (request,
+                            (struct GNUNET_SETU_Option[]){ 0 },
+                            &result_cb_set2,
+                            NULL);
+  GNUNET_SETU_commit (oh2, set2);
+}
+
+
+/**
+ * Start the set operation.
+ *
+ * @param cls closure, unused
+ */
+static void
+start (void *cls)
+{
+  struct GNUNET_MessageHeader context_msg;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n");
+  context_msg.size = htons (sizeof context_msg);
+  context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY);
+  listen_handle = GNUNET_SETU_listen (config,
+                                      &app_id,
+                                      &listen_cb,
+                                      NULL);
+  oh1 = GNUNET_SETU_prepare (&local_id,
+                             &app_id,
+                             &context_msg,
+                             (struct GNUNET_SETU_Option[]){ 0 },
+                             &result_cb_set1,
+                             NULL);
+  GNUNET_SETU_commit (oh1, set1);
+}
+
+
+/**
+ * Initialize the second set, continue
+ *
+ * @param cls closure, unused
+ */
+static void
+init_set2 (void *cls)
+{
+  struct GNUNET_SETU_Element element;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
+
+  element.element_type = 0;
+  element.data = "hello";
+  element.size = strlen (element.data);
+  GNUNET_SETU_add_element (set2, &element, NULL, NULL);
+  element.data = "quux";
+  element.size = strlen (element.data);
+  GNUNET_SETU_add_element (set2, &element, NULL, NULL);
+  element.data = "baz";
+  element.size = strlen (element.data);
+  GNUNET_SETU_add_element (set2, &element, &start, NULL);
+}
+
+
+/**
+ * Initialize the first set, continue.
+ */
+static void
+init_set1 (void)
+{
+  struct GNUNET_SETU_Element element;
+
+  element.element_type = 0;
+  element.data = "hello";
+  element.size = strlen (element.data);
+  GNUNET_SETU_add_element (set1, &element, NULL, NULL);
+  element.data = "bar";
+  element.size = strlen (element.data);
+  GNUNET_SETU_add_element (set1, &element, &init_set2, NULL);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
+}
+
+
+/**
+ * Function run on timeout.
+ *
+ * @param cls closure
+ */
+static void
+timeout_fail (void *cls)
+{
+  tt = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n");
+  GNUNET_SCHEDULER_shutdown ();
+  ret = 1;
+}
+
+
+/**
+ * Function run on shutdown.
+ *
+ * @param cls closure
+ */
+static void
+do_shutdown (void *cls)
+{
+  if (NULL != tt)
+  {
+    GNUNET_SCHEDULER_cancel (tt);
+    tt = NULL;
+  }
+  if (NULL != oh1)
+  {
+    GNUNET_SETU_operation_cancel (oh1);
+    oh1 = NULL;
+  }
+  if (NULL != oh2)
+  {
+    GNUNET_SETU_operation_cancel (oh2);
+    oh2 = NULL;
+  }
+  if (NULL != set1)
+  {
+    GNUNET_SETU_destroy (set1);
+    set1 = NULL;
+  }
+  if (NULL != set2)
+  {
+    GNUNET_SETU_destroy (set2);
+    set2 = NULL;
+  }
+  if (NULL != listen_handle)
+  {
+    GNUNET_SETU_listen_cancel (listen_handle);
+    listen_handle = NULL;
+  }
+}
+
+
+/**
+ * Signature of the 'main' function for a (single-peer) testcase that
+ * is run using 'GNUNET_TESTING_peer_run'.
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer that was started
+ * @param peer identity of the peer that was created
+ */
+static void
+run (void *cls,
+     const struct GNUNET_CONFIGURATION_Handle *cfg,
+     struct GNUNET_TESTING_Peer *peer)
+{
+  struct GNUNET_SETU_OperationHandle *my_oh;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Running preparatory tests\n");
+  tt = GNUNET_SCHEDULER_add_delayed (
+    GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
+    &timeout_fail,
+    NULL);
+  GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
+
+  config = cfg;
+  GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg,
+                                                               &local_id));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "my id (from CRYPTO): %s\n",
+              GNUNET_i2s (&local_id));
+  GNUNET_TESTING_peer_get_identity (peer,
+                                    &local_id);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "my id (from TESTING): %s\n",
+              GNUNET_i2s (&local_id));
+  set1 = GNUNET_SETU_create (cfg);
+  set2 = GNUNET_SETU_create (cfg);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Created sets %p and %p for union operation\n",
+              set1,
+              set2);
+  GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
+
+  /* test if canceling an uncommited request works! */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Launching and instantly stopping set operation\n");
+  my_oh = GNUNET_SETU_prepare (&local_id,
+                               &app_id,
+                               NULL,
+                               (struct GNUNET_SETU_Option[]){ 0 },
+                               NULL,
+                               NULL);
+  GNUNET_SETU_operation_cancel (my_oh);
+
+  /* test the real set reconciliation */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Running real set-reconciliation\n");
+  init_set1 ();
+}
+
+
+int
+main (int argc, char **argv)
+{
+  GNUNET_log_setup ("test_setu_api",
+                    "WARNING",
+                    NULL);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Launching peer\n");
+  if (0 !=
+      GNUNET_TESTING_peer_run ("test_setu_api",
+                               "test_setu.conf",
+                               &run,
+                               NULL))
+  {
+    return 1;
+  }
+  return ret;
+}

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]