freepooma-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH] MPI SendReceive


From: Richard Guenther
Subject: [PATCH] MPI SendReceive
Date: Tue, 30 Dec 2003 21:41:07 +0100 (CET)

Hi!

This is now the MPI version of SendReceive.h, including changes to
RemoteEngine.h which handles (de-)serialization of engines.  The latter
change allows optimizing away one of the three(!) copies we are doing
currently for communicating an engine at receive time:
- receive into message buffer
- deserialize into temporary brick engine
- copy temporary brick engine to target view

the message buffer is now directly deserialized into the target view (for
non-Cheetah operation, with Cheetah this is not possible).  Patch which
removes a fourth(!!) copy we're doing at guard update follows.

Tested as usual.

Ok?

Richard.


2003Dec30  Richard Guenther <address@hidden>

        * src/Engine/RemoteEngine.h: add deserializer into existing
        engine.
        src/Tulip/SendReceive.h: add MPI variant.

===== RemoteEngine.h 1.9 vs 1.16 =====
--- 1.9/r2/src/Engine/RemoteEngine.h    Wed Dec 10 11:19:05 2003
+++ 1.16/r2/src/Engine/RemoteEngine.h   Tue Dec 30 21:26:06 2003
@@ -1239,6 +1241,7 @@
     t = *a;
     buffer_m   += change;
     total_m    += change;
+    Cheetah::Serialize<Cheetah::CHEETAH, T>::cleanup(a);
   }

   char *buffer_m;
@@ -1248,6 +1251,9 @@

 namespace Cheetah {

+// All these serializers/deserializers share a common header,
+// namely domain and compressed flag.
+
 template<int Dim, class T>
 class Serialize<CHEETAH, Engine<Dim, T, BrickView> >
 {
@@ -1261,6 +1267,8 @@
     int nBytes=0;

     nBytes += Serialize<CHEETAH, Domain_t>::size(a.domain());
+    bool compressed = false;
+    nBytes += Serialize<CHEETAH, bool>::size(compressed);
     nBytes += a.domain().size() * Serialize<CHEETAH, T>::size(T());

     return nBytes;
@@ -1278,6 +1286,11 @@
     buffer += change;
     nBytes += change;

+    bool compressed = false;
+    change = Serialize<CHEETAH, bool>::pack(compressed, buffer);
+    buffer += change;
+    nBytes += change;
+
     EngineElemSerialize op(buffer);

     change = EngineBlockSerialize::apply(op, a, dom);
@@ -1287,20 +1300,54 @@
     return nBytes;
   }

+  // We support a special unpack to avoid an extra copy.
+
   static inline int
-  unpack(Engine_t* &a, char *buffer)
+  unpack(Engine_t &a, char *buffer)
   {
-    // We'll unpack into a Brick rather than a BrickView, since
-    // we just copy from it anyway.
+    Interval<Dim> *dom;

-    PAssert(false);
-  }
+    int change;
+    int nBytes=0;

-  static inline void
-  cleanup(Engine_t* a)
-  {
-    delete a;
+    change = Serialize<CHEETAH, Domain_t>::unpack(dom, buffer);
+    buffer += change;
+    nBytes += change;
+
+    bool *compressed;
+    change = Serialize<CHEETAH, bool>::unpack(compressed, buffer);
+    buffer += change;
+    nBytes += change;
+
+    // domains dont match probably, but at least their sizes must
+    for (int i=0; i<Dim; ++i)
+      PAssert((*dom)[i].size() == a.domain()[i].size());
+
+    if (*compressed)
+    {
+      T *value;
+      change = Serialize<CHEETAH, T>::unpack(value, buffer);
+
+      // we can't use usual array assignment here, because this would
+      // irritate the scheduler and lead to bogous results
+      Array<Engine_t::dimensions, T, typename Engine_t::Tag_t> lhs;
+      lhs.engine() = a;
+      Array<Engine_t::dimensions, T, ConstantFunction> rhs(*dom);
+      rhs.engine().setConstant(*value);
+      KernelEvaluator<InlineKernelTag>::evaluate(lhs, OpAssign(), rhs);
+    } else {
+      EngineElemDeSerialize op(buffer);
+
+      change = EngineBlockSerialize::apply(op, a, a.domain());
+    }
+    nBytes += change;
+
+    Serialize<CHEETAH, Domain_t>::cleanup(dom);
+    Serialize<CHEETAH, bool>::cleanup(compressed);
+
+    return nBytes;
   }
+
 };

 template<int Dim, class T>
@@ -1316,6 +1363,8 @@
     int nBytes=0;

     nBytes += Serialize<CHEETAH, Domain_t>::size(a.domain());
+    bool compressed = false;
+    nBytes += Serialize<CHEETAH, bool>::size(compressed);
     nBytes += a.domain().size() * Serialize<CHEETAH, T>::size(T());

     return nBytes;
@@ -1333,6 +1382,11 @@
     buffer += change;
     nBytes += change;

+    bool compressed = false;
+    change = Serialize<CHEETAH, bool>::pack(compressed, buffer);
+    buffer += change;
+    nBytes += change;
+
     EngineElemSerialize op(buffer);

     change = EngineBlockSerialize::apply(op, a, dom);
@@ -1342,6 +1396,8 @@
     return nBytes;
   }

+  // Old-style unpack with extra copy.
+
   static inline int
   unpack(Engine_t* &a, char *buffer)
   {
@@ -1354,6 +1410,12 @@
     buffer += change;
     nBytes += change;

+    bool *compressed;
+    change = Serialize<CHEETAH, bool>::unpack(compressed, buffer);
+    buffer += change;
+    nBytes += change;
+    PAssert(!*compressed);
+
     a = new Engine<Dim, T, Brick>(*dom);

     EngineElemDeSerialize op(buffer);
@@ -1362,6 +1424,9 @@

     nBytes += change;

+    Serialize<CHEETAH, Domain_t>::cleanup(dom);
+    Serialize<CHEETAH, bool>::cleanup(compressed);
+
     return nBytes;
   }

@@ -1370,6 +1435,7 @@
   {
     delete a;
   }
+
 };

 template<int Dim, class T>
@@ -1386,7 +1452,10 @@

     nBytes += Serialize<CHEETAH, Domain_t>::size(a.domain());

-    bool compressed = a.compressed();
+    // we cannot use a.compressed() here, because we need to
+    // set up a big enough receive buffer and the compressed
+    // flag is not valid across contexts.
+    bool compressed = false;
     nBytes += Serialize<CHEETAH, bool>::size(compressed);

     if (compressed)
@@ -1433,6 +1502,8 @@
     return nBytes;
   }

+  // Old-style unpack with extra copy.
+
   static inline int
   unpack(Engine_t* &a, char *buffer)
   {
@@ -1446,7 +1517,6 @@
     nBytes += change;

     bool *compressed;
-
     change = Serialize<CHEETAH, bool>::unpack(compressed, buffer);
     buffer += change;
     nBytes += change;
@@ -1469,6 +1539,9 @@
     }
     nBytes += change;

+    Serialize<CHEETAH, Domain_t>::cleanup(dom);
+    Serialize<CHEETAH, bool>::cleanup(compressed);
+
     return nBytes;
   }

@@ -1477,6 +1550,7 @@
   {
     delete a;
   }
+
 };

 template<int Dim, class T>
@@ -1493,7 +1567,10 @@

     nBytes += Serialize<CHEETAH, Domain_t>::size(a.domain());

-    bool compressed = a.compressed();
+    // we cannot use a.compressed() here, because we need to
+    // set up a big enough receive buffer and the compressed
+    // flag is not valid across contexts.
+    bool compressed = false;
     nBytes += Serialize<CHEETAH, bool>::size(compressed);

     if (compressed)
@@ -1541,8 +1618,10 @@
     return nBytes;
   }

+  // We support a special unpack to avoid an extra copy.
+
   static inline int
-  unpack(Engine_t* &a, char *buffer)
+  unpack(Engine_t &a, char *buffer)
   {
     Interval<Dim> *dom;

@@ -1554,40 +1633,36 @@
     nBytes += change;

     bool *compressed;
-
     change = Serialize<CHEETAH, bool>::unpack(compressed, buffer);
     buffer += change;
     nBytes += change;

+    // domains dont match probably, but at least their sizes must
+    for (int i=0; i<Dim; ++i)
+      PAssert((*dom)[i].size() == a.domain()[i].size());
+
     if (*compressed)
     {
       T *value;

       change = Serialize<CHEETAH, T>::unpack(value, buffer);

-      Engine<Dim, T, CompressibleBrick> foo(*dom, *value);
-
-      a = new Engine_t(foo, *dom);
+      // we can't use usual array assignment here, because this would
+      // irritate the scheduler and lead to bogous results
+      a.compressedReadWrite() = *value;
     }
     else
     {
-      Engine<Dim, T, CompressibleBrick> foo(*dom);
-
       EngineElemDeSerialize op(buffer);

-      change = EngineBlockSerialize::apply(op, foo, *dom);
-
-      a = new Engine_t(foo, *dom);
+      change = EngineBlockSerialize::apply(op, a, *dom);
     }
     nBytes += change;

-    return nBytes;
-  }
+    Serialize<CHEETAH, Domain_t>::cleanup(dom);
+    Serialize<CHEETAH, bool>::cleanup(compressed);

-  static inline void
-  cleanup(Engine_t* a)
-  {
-    delete a;
+    return nBytes;
   }
 };

--- SendReceive.h       2003-10-21 20:47:59.000000000 +0200
+++ /tmp/SendReceive.h  2003-12-30 21:34:17.000000000 +0100
@@ -57,9 +57,11 @@
 // Includes:
 //-----------------------------------------------------------------------------

+#include "Tulip/Messaging.h"
 #include "Pooma/Pooma.h"
 #include "Evaluator/InlineEvaluator.h"
-#include "Tulip/Messaging.h"
+#include "Evaluator/RequestLocks.h"
+#include "Engine/DataObject.h"
 #include "Utilities/PAssert.h"

 //-----------------------------------------------------------------------------
@@ -268,14 +270,228 @@
   {
     PAssert(fromContext >= 0);
     int tag = Pooma::receiveTag(fromContext);
-    Pooma::scheduler().handOff(new ReceiveIterate<View,
-                              IncomingView>(view,
-                                            fromContext, tag));
+    Pooma::scheduler().handOff(new ReceiveIterate<View, IncomingView>
+                                       (view, fromContext, tag));
   }
 };


-#else // not POOMA_CHEETAH
+#elif POOMA_MPI
+
+
+/**
+ * A SendIterate requests a read lock on a piece of data.  When that read lock
+ * is granted, we call a cheetah matching handler to send the data to the
+ * appropriate context.  We construct the SendIterate with a tag that is used
+ * to match the appropriate ReceiveIterate on the remote context.
+ */
+
+template<class View>
+class SendIterate
+  : public Pooma::Iterate_t
+{
+public:
+  SendIterate(const View &view, int toContext, int tag)
+    : Pooma::Iterate_t(Pooma::scheduler()),
+      toContext_m(toContext),
+      tag_m(tag),
+      view_m(view)
+  {
+    PAssert(toContext >= 0);
+
+    hintAffinity(engineFunctor(view_m,
+                              DataObjectRequest<BlockAffinity>()));
+
+#if POOMA_REORDER_ITERATES
+    // Priority interface was added to r2 version of serial async so that
+    // message send iterates would run before any other iterates.
+    priority(-1);
+#endif
+
+    DataObjectRequest<WriteRequest> writeReq(*this);
+    DataObjectRequest<ReadRequest> readReq(writeReq);
+    engineFunctor(view_m, readReq);
+  }
+
+  virtual void run()
+  {
+    typedef Cheetah::Serialize<Cheetah::CHEETAH, View> Serialize_t;
+
+    // serialize and send buffer
+    int length = Serialize_t::size(view_m);
+    buffer_m = new char[length];
+    Serialize_t::pack(view_m, buffer_m);
+    MPI_Request *request = Smarts::SystemContext::getMPIRequest(this);
+    int res = MPI_Isend(buffer_m, length, MPI_CHAR, toContext_m, tag_m,
+                       MPI_COMM_WORLD, request);
+    PAssert(res == MPI_SUCCESS);
+
+    // release locks
+    DataObjectRequest<WriteRelease> writeReq;
+    DataObjectRequest<ReadRelease> readReq(writeReq);
+    engineFunctor(view_m, readReq);
+  }
+
+  virtual ~SendIterate()
+  {
+    // cleanup temporary objects.
+    delete[] buffer_m;
+  }
+
+private:
+
+  // Context we're sending the data to.
+
+  int toContext_m;
+
+  // A tag used to match the sent data with the right receive.
+
+  int tag_m;
+
+  // Communication buffer.
+
+  char *buffer_m;
+
+  // The data we're sending (typically a view of an array).
+
+  View view_m;
+};
+
+
+/**
+ * ReceiveIterate requests a write lock on a piece of data.  When that lock
+ * is granted, we register the data with the cheetah matching handler which
+ * will fill the block when a message arrives.  The write lock is released
+ * by the matching handler.
+ */
+
+template<class View, class IncomingView>
+class ReceiveIterate
+  : public Pooma::Iterate_t
+{
+public:
+
+  typedef ReceiveIterate<View, IncomingView> This_t;
+
+  ReceiveIterate(const View &view, int fromContext, int tag)
+    : Pooma::Iterate_t(Pooma::scheduler()),
+      fromContext_m(fromContext),
+      tag_m(tag), buffer_m(NULL),
+      view_m(view)
+  {
+    PAssert(fromContext >= 0);
+
+    hintAffinity(engineFunctor(view,
+                              DataObjectRequest<BlockAffinity>()));
+
+#if POOMA_REORDER_ITERATES
+    // Priority interface was added to r2 version of serial async so that
+    // message receive iterates would run after any other iterates.
+    priority(-1);
+#endif
+
+    DataObjectRequest<WriteRequest> writeReq(*this);
+    engineFunctor(view, writeReq);
+
+    Pooma::addIncomingMessage();
+
+    // pre-allocate incoming buffer and issue async receive
+    // we may hog on requests here - so maybe we need to conditionalize
+    // this a bit on request availability?
+    if (Smarts::SystemContext::haveLotsOfMPIRequests()) {
+      int length = Cheetah::Serialize<Cheetah::CHEETAH, View>::size(view_m);
+      buffer_m = new char[length];
+      MPI_Request *request = Smarts::SystemContext::getMPIRequest(this);
+      int res = MPI_Irecv(buffer_m, length, MPI_CHAR, fromContext_m, tag_m,
+                         MPI_COMM_WORLD, request);
+      PAssert(res == MPI_SUCCESS);
+    }
+  }
+
+  virtual void run()
+  {
+    // nothing - work is done in destructor, if we had enough requests free
+    if (!buffer_m) {
+      int length = Cheetah::Serialize<Cheetah::CHEETAH, View>::size(view_m);
+      buffer_m = new char[length];
+      MPI_Request *request = Smarts::SystemContext::getMPIRequest(this);
+      int res = MPI_Irecv(buffer_m, length, MPI_CHAR, fromContext_m, tag_m,
+                         MPI_COMM_WORLD, request);
+      PAssert(res == MPI_SUCCESS);
+    }
+  }
+
+  virtual ~ReceiveIterate()
+  {
+    typedef Cheetah::Serialize<Cheetah::CHEETAH, View> Serialize_t;
+
+    // de-serialize into target view directly
+    Serialize_t::unpack(view_m, buffer_m);
+
+    // cleanup temporary objects
+    delete[] buffer_m;
+
+    // release locks
+    DataObjectRequest<WriteRelease> writeReq;
+    engineFunctor(view_m, writeReq);
+
+    Pooma::gotIncomingMessage();
+  }
+
+private:
+
+  // Context we're sending the data to.
+
+  int fromContext_m;
+
+  // A tag used to match the sent data with the right send.
+
+  int tag_m;
+
+  // Communication buffer.
+
+  char *buffer_m;
+
+  // The place to put the data we're receiving (typically a view of the
+  // engine).;
+
+  View view_m;
+};
+
+/**
+ * SendReceive contains two static functions, send(view, context) and
+ * receive(view, context).  These functions encapsulate generating matching
+ * tags for the send and receive and launching the iterates to perform the
+ * send and receive.
+ */
+
+struct SendReceive
+{
+  template<class View>
+  static
+  void send(const View &view, int toContext)
+  {
+    int tag = Pooma::sendTag(toContext);
+    Pooma::scheduler().handOff(new SendIterate<View>(view, toContext, tag));
+  }
+};
+
+template<class IncomingView>
+struct Receive
+{
+  template<class View>
+  static
+  void receive(const View &view, int fromContext)
+  {
+    PAssert(fromContext >= 0);
+    int tag = Pooma::receiveTag(fromContext);
+    Pooma::scheduler().handOff(new ReceiveIterate<View, IncomingView>
+                                       (view, fromContext, tag));
+  }
+};
+
+
+#else // not POOMA_MESSAGING


 /**
@@ -305,7 +521,8 @@
   }
 };

-#endif // not POOMA_CHEETAH
+
+#endif // not POOMA_MESSAGING

 //////////////////////////////////////////////////////////////////////

reply via email to

[Prev in Thread] Current Thread [Next in Thread]