freepooma-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH] MPI support for SerialAsync scheduler


From: Richard Guenther
Subject: [PATCH] MPI support for SerialAsync scheduler
Date: Fri, 2 Jan 2004 13:20:35 +0100 (CET)

Hi!

This patch moves SerialAsync to the state I have it.  So this patch maybe
somewhat hard to follow, so I'll go through the obfuscating parts first:
- it moves commentary to doxygen style
- it moves Iterate<SerialAsync> definition up due to dependency issues

Apart from this, the patch introduces a std::stack<int> for tracking the
current generation.  This is necessary for MPI messaging to avoid
deadlocks waiting for communication on one end that hasn't been issued at
the remote end yet.  Basically the only places where a full
blockAndEvaluate() is safe, is, if we're not inside a generation.  And we
need to sometimes wait for communication to complete due to a limited
amount of MPI_Requests we can have in fly.

For asyncronous MPI operation the scheduler maintains the necessary
MPI_Request structures and has the ability to wait on the completion of
the asyncronous requests.  This makes necessary the deferred destruction
of the Iterates done via a reference count that is incremented on every
MPI request issued and decremented on every MPI request completed.  This
same mechanism may possibly used to solve the Cheetah use-after-destruct
issue -- I'll prepare a seperate patch for this.

So, I hope I didn't forget something in the patch.

The patch was tested as usual.

Ok to commit?

Thanks, Richard.


2004Jan02  Richard Guenther <address@hidden>

        * src/Threads/IterateSchedulers/SerialAsync.h: doxygenifize,
        add std::stack<int> for generation tracking, add support for
        asyncronous MPI requests.
        src/Threads/IterateSchedulers/SerialAsync.cmpl.cpp: define
        new static variables.

--- /home/richard/src/pooma/cvs/r2/src/Threads/IterateSchedulers/SerialAsync.h  
2000-06-09 00:16:50.000000000 +0200
+++ Threads/IterateSchedulers/SerialAsync.h     2004-01-02 00:40:16.000000000 
+0100
@@ -42,48 +42,38 @@
 // DataObject<SerialAsync>
 //-----------------------------------------------------------------------------

-#include <iostream>
-
 #ifndef _SerialAsync_h_
 #define _SerialAsync_h_
-/*
-LIBRARY:
-        SerialAsync
-
-CLASSES: IterateScheduler
-
-CLASSES: DataObject
-
-CLASSES: Iterate
-
-OVERVIEW
-        SerialAsync IterateScheduler is a policy template to create a
-        dependence graphs and executes the graph respecting the
-        dependencies without using threads. There is no parallelism,
-        but Iterates may be executed out-of-order with respect to the
-        program text.
-
------------------------------------------------------------------------------*/
-
-//////////////////////////////////////////////////////////////////////

-//-----------------------------------------------------------------------------
-// Overview:
-// Smarts classes for times when you want no threads but you do want
-// dataflow evaluation.
-//-----------------------------------------------------------------------------
-
-//-----------------------------------------------------------------------------
-// Typedefs:
-//-----------------------------------------------------------------------------
+/** @file
+ * @ingroup IterateSchedulers
+ * @brief
+ * Smarts classes for times when you want no threads but you do want
+ * dataflow evaluation.
+ *
+ * SerialAsync IterateScheduler is a policy template to create a
+ * dependence graphs and executes the graph respecting the
+ * dependencies without using threads.
+ * There is no (thread level) parallelism, but Iterates may be executed
+ * out-of-order with respect to the program text. Also this scheduler is
+ * used for message based parallelism in which case asyncronous execution
+ * leads to reduced communication latencies.
+ */

 //-----------------------------------------------------------------------------
 // Includes:
 //-----------------------------------------------------------------------------

 #include <list>
+#include <vector>
+#include <map>
+#include <set>
+#include <functional>
+#include <stack>
 #include "Threads/IterateSchedulers/IterateScheduler.h"
 #include "Threads/IterateSchedulers/Runnable.h"
+#include "Tulip/Messaging.h"
+#include "Utilities/PAssert.h"

 //-----------------------------------------------------------------------------
 // Forward Declarations:
@@ -94,76 +84,261 @@

 namespace Smarts {

-#define MYID 0
-#define MAX_CPUS 1
-//
-// Tag class for specializing IterateScheduler, Iterate and DataObject.
-//
+/**
+ * Tag class for specializing IterateScheduler, Iterate and DataObject.
+ */
+
 struct SerialAsync
 {
-  enum Action { Read, Write};
+  enum Action { Read, Write };
 };


-//-----------------------------------------------------------------------------
+/**
+ * Iterate<SerialAsync> is used to implement the SerialAsync
+ * scheduling policy.
+ *
+ * An Iterate is a non-blocking unit of concurrency that is used
+ * to describe a chunk of work. It inherits from the Runnable
+ * class and as all subclasses of Runnable, the user specializes
+ * the run() method to specify the operation.
+ * Iterate<SerialAsync> is a further specialization of the
+ * Iterate class to use the SerialAsync Scheduling algorithm to
+ * generate the data dependency graph for a data-driven
+ * execution.
+ */
+
+template<>
+class Iterate<SerialAsync> : public Runnable
+{
+  friend class IterateScheduler<SerialAsync>;
+  friend class DataObject<SerialAsync>;
+
+public:
+
+  typedef DataObject<SerialAsync> DataObject_t;
+  typedef IterateScheduler<SerialAsync> IterateScheduler_t;
+
+
+  /// The Constructor for this class takes the IterateScheduler and a
+  /// CPU affinity.  CPU affinity has a default value of -1 which means
+  /// it may run on any CPU available.
+
+  inline Iterate(IterateScheduler<SerialAsync> & s, int affinity=-1)
+    : scheduler_m(s), notifications_m(1), generation_m(-1), togo_m(1)
+  {}
+
+  /// The dtor is virtual because the subclasses will need to add to it.
+
+  virtual ~Iterate() {}
+
+  /// The run method does the core work of the Iterate.
+  /// It is supplied by the subclass.
+
+  virtual void run() = 0;
+
+  //@name Stubs for the affinities
+  /// There is no such thing in serial.
+  //@{
+
+  inline int affinity() const {return 0;}
+
+  inline int hintAffinity() const {return 0;}
+
+  inline void affinity(int) {}
+
+  inline void hintAffinity(int) {}
+
+  //@}
+
+  /// Notify is used to indicate to the Iterate that one of the data
+  /// objects it had requested has been granted. To do this, we dec a
+  /// dependence counter which, if equal to 0, the Iterate is ready for
+  /// execution.
+
+  void notify()
+  {
+    if (--notifications_m == 0)
+      add(this);
+  }
+
+  /// How many notifications remain?
+
+  int notifications() const { return notifications_m; }
+
+  void addNotification() { notifications_m++; }
+
+  int& generation() { return generation_m; }
+
+  int& togo() { return togo_m; }
+
+protected:
+
+  /// What scheduler are we working with?
+  IterateScheduler<SerialAsync> &scheduler_m;
+
+  /// How many notifications should we receive before we can run?
+  int notifications_m;
+
+  /// Which generation we were issued in.
+  int generation_m;
+
+  /// How many times we need to go past a "did something" to be ready
+  /// for destruction?
+  int togo_m;
+
+};
+
+
+/**
+ * FIXME.
+ */

 struct SystemContext
 {
   void addNCpus(int) {}
   void wait() {}
   void concurrency(int){}
-  int concurrency() {return 1;}
+  int concurrency() { return 1; }
   void mustRunOn() {}

   // We have a separate message queue because they are
   // higher priority.
+  typedef Iterate<SerialAsync> *IteratePtr_t;
   static std::list<RunnablePtr_t> workQueueMessages_m;
   static std::list<RunnablePtr_t> workQueue_m;
+#if POOMA_MPI
+  static MPI_Request requests_m[1024];
+  static std::map<int, IteratePtr_t> allocated_requests_m;
+  static std::set<int> free_requests_m;
+#endif
+
+
+#if POOMA_MPI

-  ///////////////////////////
-  // This function lets you check if there are iterates that are
-  // ready to run.
-  inline static
-  bool workReady()
+  /// Query, if we have lots of MPI_Request slots available
+
+  static bool haveLotsOfMPIRequests()
   {
-    return !(workQueue_m.empty() && workQueueMessages_m.empty());
+    return free_requests_m.size() > 1024/2;
   }

-  ///////////////////////////
-  // Run an iterate if one is ready.
-  inline static
-  void runSomething()
+  /// Get a MPI_Request slot, associated with an iterate
+
+  static MPI_Request* getMPIRequest(IteratePtr_t p)
   {
-    if (!workQueueMessages_m.empty())
-    {
-      // Get the top iterate.
-      // Delete it from the queue.
-      // Run the iterate.
-      // Delete the iterate.  This could put more iterates in the queue.
+    PInsist(!free_requests_m.empty(), "No free MPIRequest slots.");
+    int i = *free_requests_m.begin();
+    free_requests_m.erase(free_requests_m.begin());
+    allocated_requests_m[i] = p;
+    p->togo()++;
+    return &requests_m[i];
+  }

-      RunnablePtr_t p = workQueueMessages_m.front();
-      workQueueMessages_m.pop_front();
-      p->execute();
+  static void releaseMPIRequest(int i)
+  {
+    IteratePtr_t p = allocated_requests_m[i];
+    allocated_requests_m.erase(i);
+    free_requests_m.insert(i);
+    if (--(p->togo()) == 0)
       delete p;
-    }
+  }
+
+  static bool waitForSomeRequests(bool mayBlock)
+  {
+    if (allocated_requests_m.empty())
+      return false;
+
+    int last_used_request = allocated_requests_m.rbegin()->first;
+    int finished[last_used_request+1];
+    MPI_Status statuses[last_used_request+1];
+    int nr_finished;
+    int res;
+    if (mayBlock)
+      res = MPI_Waitsome(last_used_request+1, requests_m,
+                        &nr_finished, finished, statuses);
     else
-    {
-      if (!workQueue_m.empty())
-      {
-       RunnablePtr_t p = workQueue_m.front();
-       workQueue_m.pop_front();
-       p->execute();
-       delete p;
+      res = MPI_Testsome(last_used_request+1, requests_m,
+                        &nr_finished, finished, statuses);
+    PAssert(res == MPI_SUCCESS || res == MPI_ERR_IN_STATUS);
+    if (nr_finished == MPI_UNDEFINED)
+      return false;
+
+    // release finised requests
+    while (nr_finished--) {
+      if (res == MPI_ERR_IN_STATUS) {
+       if (statuses[nr_finished].MPI_ERROR != MPI_SUCCESS) {
+         char msg[MPI_MAX_ERROR_STRING+1];
+         int len;
+         MPI_Error_string(statuses[nr_finished].MPI_ERROR, msg, &len);
+         msg[len] = '\0';
+         PInsist(0, msg);
+       }
       }
+      releaseMPIRequest(finished[nr_finished]);
     }
+    return true;
+  }
+
+#else
+
+  static bool waitForSomeRequests(bool mayBlock)
+  {
+    return false;
+  }
+
+#endif
+
+
+  /// This function lets you check if there are iterates that are
+  /// ready to run.
+
+  static bool workReady()
+  {
+    return !(workQueue_m.empty()
+            && workQueueMessages_m.empty()
+#if POOMA_MPI
+            && allocated_requests_m.empty()
+#endif
+            );
+  }
+
+  /// Run an iterate if one is ready.  Returns if progress
+  /// was made.
+
+  static bool runSomething(bool mayBlock = true)
+  {
+    // do work in this order to minimize communication latency:
+    // - issue all messages
+    // - do some regular work
+    // - wait for messages to complete
+
+    RunnablePtr_t p = NULL;
+    if (!workQueueMessages_m.empty()) {
+      p = workQueueMessages_m.front();
+      workQueueMessages_m.pop_front();
+    } else if (!workQueue_m.empty()) {
+      p = workQueue_m.front();
+      workQueue_m.pop_front();
+    }
+
+    if (p) {
+      p->execute();
+      Iterate<SerialAsync> *it = dynamic_cast<IteratePtr_t>(p);
+      if (it) {
+       if (--(it->togo()) == 0)
+         delete it;
+      } else
+       delete p;
+      return true;
+
+    } else
+      return waitForSomeRequests(mayBlock);
   }

 };

-inline void addRunnable(RunnablePtr_t rn)
-{
-  SystemContext::workQueue_m.push_front(rn);
-}
+/// Adds a runnable to the appropriate work-queue.

 inline void add(RunnablePtr_t rn)
 {
@@ -182,25 +357,18 @@
 inline  void wait() {}
 inline  void mustRunOn(){}

-/*------------------------------------------------------------------------
-CLASS
-       IterateScheduler_Serial_Async
-
-       Implements a asynchronous scheduler for a data driven execution.
-       Specializes a IterateScheduler.
-
-KEYWORDS
-       Data-parallelism, Native-interface, IterateScheduler.
-
-DESCRIPTION
-
-        The SerialAsync IterateScheduler, Iterate and DataObject
-       implement a SMARTS scheduler that does dataflow without threads.
-       What that means is that when you hand iterates to the
-       IterateScheduler it stores them up until you call
-       IterateScheduler::blockingEvaluate(), at which point it evaluates
-       iterates until the queue is empty.
------------------------------------------------------------------------------*/
+
+/**
+ * Implements a asynchronous scheduler for a data driven execution.
+ * Specializes a IterateScheduler.
+ *
+ * The SerialAsync IterateScheduler, Iterate and DataObject
+ * implement a SMARTS scheduler that does dataflow without threads.
+ * What that means is that when you hand iterates to the
+ * IterateScheduler it stores them up until you call
+ * IterateScheduler::blockingEvaluate(), at which point it evaluates
+ * iterates until the queue is empty.
+ */

 template<>
 class IterateScheduler<SerialAsync>
@@ -212,196 +380,128 @@
   typedef DataObject<SerialAsync> DataObject_t;
   typedef Iterate<SerialAsync> Iterate_t;

-  ///////////////////////////
-  // Constructor
-  //
-  IterateScheduler() {}
-
-  ///////////////////////////
-  // Destructor
-  //
-  ~IterateScheduler() {}
-  void setConcurrency(int) {}
-
-  //---------------------------------------------------------------------------
-  // Mutators.
-  //---------------------------------------------------------------------------
-
-  ///////////////////////////
-  // Tells the scheduler that the parser thread is starting a new
-  // data-parallel statement.  Any Iterate that is handed off to the
-  // scheduler between beginGeneration() and endGeneration() belongs
-  // to the same data-paralllel statement and therefore has the same
-  // generation number.
-  //
-  inline void beginGeneration() { }
-
-  ///////////////////////////
-  // Tells the scheduler that no more Iterates will be handed off for
-  // the data parallel statement that was begun with a
-  // beginGeneration().
-  //
-  inline void endGeneration() {}
-
-  ///////////////////////////
-  // The parser thread calls this method to evaluate the generated
-  // graph until all the nodes in the dependence graph has been
-  // executed by the scheduler.  That is to say, the scheduler
-  // executes all the Iterates that has been handed off to it by the
-  // parser thread.
-  //
-  inline
-  void blockingEvaluate();
-
-  ///////////////////////////
-  // The parser thread calls this method to ask the scheduler to run
-  // the given Iterate when the dependence on that Iterate has been
-  // satisfied.
-  //
-  inline void handOff(Iterate<SerialAsync>* it);
+  IterateScheduler()
+    : generation_m(0)
+  {}

-  inline
-  void releaseIterates() { }
+  ~IterateScheduler() {}

-protected:
-private:
+  void setConcurrency(int) {}

-  typedef std::list<Iterate_t*> Container_t;
-  typedef Container_t::iterator Iterator_t;
+  /// Tells the scheduler that the parser thread is starting a new
+  /// data-parallel statement.  Any Iterate that is handed off to the
+  /// scheduler between beginGeneration() and endGeneration() belongs
+  /// to the same data-paralllel statement and therefore has the same
+  /// generation number.
+  /// Nested invocations are handled as being part of the outermost
+  /// generation.

-};
+  void beginGeneration()
+  {
+    // Ensure proper overflow behavior.
+    if (++generation_m < 0)
+      generation_m = 0;
+    generationStack_m.push(generation_m);
+  }

-//-----------------------------------------------------------------------------
+  /// Tells the scheduler that no more Iterates will be handed off for
+  /// the data parallel statement that was begun with a
+  /// beginGeneration().

-/*------------------------------------------------------------------------
-CLASS
-       Iterate_SerialAsync
-
-       Iterate<SerialAsync> is used to implement the SerialAsync
-       scheduling policy.
-
-KEYWORDS
-       Data_Parallelism, Native_Interface, IterateScheduler, Data_Flow.
-
-DESCRIPTION
-        An Iterate is a non-blocking unit of concurrency that is used
-       to describe a chunk of work. It inherits from the Runnable
-       class and as all subclasses of Runnable, the user specializes
-       the run() method to specify the operation.
-       Iterate<SerialAsync> is a further specialization of the
-       Iterate class to use the SerialAsync Scheduling algorithm to
-       generate the data dependency graph for a data-driven
-       execution.  */
+  void endGeneration()
+  {
+    PAssert(inGeneration());
+    generationStack_m.pop();

-template<>
-class Iterate<SerialAsync> : public Runnable
-{
-  friend class IterateScheduler<SerialAsync>;
-  friend class DataObject<SerialAsync>;
+#if POOMA_MPI
+    // this is a safe point to block until we have "lots" of MPI Requests
+    if (!inGeneration())
+      while (!SystemContext::haveLotsOfMPIRequests())
+       SystemContext::runSomething(true);
+#endif
+  }

-public:
+  /// Wether we are inside a generation and may not safely block.

-  typedef DataObject<SerialAsync> DataObject_t;
-  typedef IterateScheduler<SerialAsync> IterateScheduler_t;
+  bool inGeneration() const
+  {
+    return !generationStack_m.empty();
+  }

+  /// What the current generation is.

-  ///////////////////////////
-  // The Constructor for this class takes the IterateScheduler and a
-  // CPU affinity.  CPU affinity has a default value of -1 which means
-  // it may run on any CPU available.
-  //
-  inline Iterate(IterateScheduler<SerialAsync> & s, int affinity=-1);
-
-  ///////////////////////////
-  // The dtor is virtual because the subclasses will need to add to it.
-  //
-  virtual ~Iterate() {}
+  int generation() const
+  {
+    if (!inGeneration())
+      return -1;
+    return generationStack_m.top();
+  }

-  ///////////////////////////
-  // The run method does the core work of the Iterate.
-  // It is supplied by the subclass.
-  //
-  virtual void run() = 0;
+  /// The parser thread calls this method to evaluate the generated
+  /// graph until all the nodes in the dependence graph has been
+  /// executed by the scheduler.  That is to say, the scheduler
+  /// executes all the Iterates that has been handed off to it by the
+  /// parser thread.

-  ///////////////////////////
-  // Stub in all the affinities, because there is no such thing
-  // in serial.
-  //
-  inline int affinity() const {return 0;}
-  ///////////////////////////
-  // Stub in all the affinities, because there is no such thing
-  // in serial.
-  //
-  inline int hintAffinity() const {return 0;}
-  ///////////////////////////
-  // Stub in all the affinities, because there is no such thing
-  // in serial.
-  //
-  inline void affinity(int) {}
-  ///////////////////////////
-  // Stub in all the affinities, because there is no such thing
-  // in serial.
-  //
-  inline void hintAffinity(int) {}
+  void blockingEvaluate()
+  {
+    if (inGeneration()) {
+      // It's not safe to block inside a generation, so
+      // just do as much as we can without blocking.
+      while (SystemContext::runSomething(false))
+       ;
+
+    } else {
+      // Loop as long as there is anything in the queue.
+      while (SystemContext::workReady())
+        SystemContext::runSomething(true);
+    }
+  }

-  ///////////////////////////
-  // Notify is used to indicate to the Iterate that one of the data
-  // objects it had requested has been granted. To do this, we dec a
-  // dependence counter which, if equal to 0, the Iterate is ready for
-  // execution.
-  //
-  inline void notify();
-
-  ///////////////////////////
-  // How many notifications remain?
-  //
-  inline
-  int notifications() const { return notifications_m; }
+  /// The parser thread calls this method to ask the scheduler to run
+  /// the given Iterate when the dependence on that Iterate has been
+  /// satisfied.

-  inline void addNotification()
+  void handOff(Iterate<SerialAsync>* it)
   {
-    notifications_m++;
+    // No action needs to be taken here.  Iterates will make their
+    // own way into the execution queue.
+    it->generation() = generation();
+    it->notify();
   }

-protected:
+  void releaseIterates() { }

-  // What scheduler are we working with?
-  IterateScheduler<SerialAsync> &scheduler_m;
+private:

-  // How many notifications should we receive before we can run?
-  int notifications_m;
+  typedef std::list<Iterate_t*> Container_t;
+  typedef Container_t::iterator Iterator_t;

-private:
-  // Set notifications dynamically and automatically every time a
-  // request is made by the iterate
-  void incr_notifications() { notifications_m++;}
+  static std::stack<int> generationStack_m;
+  int generation_m;

 };


-//-----------------------------------------------------------------------------
-
-/*------------------------------------------------------------------------
-CLASS
-       DataObject_SerialAsync
-
-       Implements a asynchronous scheduler for a data driven execution.
-KEYWORDS
-       Data-parallelism, Native-interface, IterateScheduler.
-
-DESCRIPTION
-        The DataObject Class is used introduce a type to represent
-       a resources (normally) blocks of data) that Iterates contend
-       for atomic access. Iterates make request for either a read or
-       write to the DataObjects. DataObjects may grant the request if
-       the object is currently available. Otherwise, the request is
-       enqueue in a queue private to the data object until the
-       DataObject is release by another Iterate. A set of read
-       requests may be granted all at once if there are no
-       intervening write request to that DataObject.
-       DataObject<SerialAsync> is a specialization of DataObject for
-       the policy template SerialAsync.
-*/
+/**
+ * Implements a asynchronous scheduler for a data driven execution.
+ *
+ * The DataObject Class is used introduce a type to represent
+ * a resources (normally) blocks of data) that Iterates contend
+ * for atomic access. Iterates make request for either a read or
+ * write to the DataObjects. DataObjects may grant the request if
+ * the object is currently available. Otherwise, the request is
+ * enqueue in a queue private to the data object until the
+ * DataObject is release by another Iterate. A set of read
+ * requests may be granted all at once if there are no
+ * intervening write request to that DataObject.
+ * DataObject<SerialAsync> is a specialization of DataObject for
+ * the policy template SerialAsync.
+ *
+ * There are two ways data can be used: to read or to write.
+ * Don't change this to give more than two states;
+ * things inside depend on that.
+ */

 template<>
 class DataObject<SerialAsync>
@@ -413,54 +513,56 @@
   typedef IterateScheduler<SerialAsync> IterateScheduler_t;
   typedef Iterate<SerialAsync> Iterate_t;

-  // There are two ways data can be used: to read or to write.
-  // Don't change this to give more than two states:
-  // things inside depend on that.
-
-  ///////////////////////////
-  // Construct the data object with an empty set of requests
-  // and the given affinity.
-  //
-  inline DataObject(int affinity=-1);
+
+  /// Construct the data object with an empty set of requests
+  /// and the given affinity.
+
+  DataObject(int affinity=-1)
+    : released_m(queue_m.end()), notifications_m(0)
+  {
+    // released_m to the end of the queue (which should) also be the
+    // beginning.  notifications_m to zero, since nothing has been
+    // released yet.
+  }

-  ///////////////////////////
-  // for compatibility with other SMARTS schedulers, accept
-  // Scheduler arguments (unused)
-  //
-  inline
-  DataObject(int affinity, IterateScheduler<SerialAsync>&);
-
-  ///////////////////////////
-  // Stub out affinity because there is no affinity in serial.
-  //
-  inline int affinity() const { return 0; }
-
-  ///////////////////////////
-  // Stub out affinity because there is no affinity in serial.
-  //
-  inline void affinity(int) {}
+  /// for compatibility with other SMARTS schedulers, accept
+  /// Scheduler arguments (unused)

-  ///////////////////////////
-  // An iterate makes a request for a certain action in a certain
-  // generation.
-  //
-  inline
-  void request(Iterate<SerialAsync>&, SerialAsync::Action);
-
-  ///////////////////////////
-  // An iterate finishes and tells the DataObject it no longer needs
-  // it.  If this is the last release for the current set of
-  // requests, have the IterateScheduler release some more.
-  //
-  inline void release(SerialAsync::Action);
+  inline DataObject(int affinity, IterateScheduler<SerialAsync>&)
+    : released_m(queue_m.end()), notifications_m(0)
+  {}
+
+  /// Stub out affinity because there is no affinity in serial.
+
+  int affinity() const { return 0; }
+
+  /// Stub out affinity because there is no affinity in serial.
+
+  void affinity(int) {}
+
+  /// An iterate makes a request for a certain action in a certain
+  /// generation.
+
+  inline void request(Iterate<SerialAsync>&, SerialAsync::Action);
+
+  /// An iterate finishes and tells the DataObject it no longer needs
+  /// it.  If this is the last release for the current set of
+  /// requests, have the IterateScheduler release some more.
+
+  void release(SerialAsync::Action)
+  {
+    if (--notifications_m == 0)
+      releaseIterates();
+  }

-protected:
 private:

-  // If release needs to let more iterates go, it calls this.
+  /// If release needs to let more iterates go, it calls this.
   inline void releaseIterates();

-  // The type for a request.
+  /**
+   * The type for a request.
+   */
   class Request
   {
   public:
@@ -475,135 +577,27 @@
     SerialAsync::Action act_m;
   };

-  // The type of the queue and iterator.
+  /// The type of the queue and iterator.
   typedef std::list<Request> Container_t;
   typedef Container_t::iterator Iterator_t;

-  // The list of requests from various iterates.
-  // They're granted in FIFO order.
+  /// The list of requests from various iterates.
+  /// They're granted in FIFO order.
   Container_t queue_m;

-  // Pointer to the last request that has been granted.
+  /// Pointer to the last request that has been granted.
   Iterator_t released_m;

-  // The number of outstanding notifications.
+  /// The number of outstanding notifications.
   int notifications_m;
 };

-//////////////////////////////////////////////////////////////////////
-//
-// Inline implementation of the functions for
-// IterateScheduler<SerialAsync>
-//
-//////////////////////////////////////////////////////////////////////
-
-//
-// IterateScheduler<SerialAsync>::handOff(Iterate<SerialAsync>*)
-// No action needs to be taken here.  Iterates will make their
-// own way into the execution queue.
-//
-
-inline void
-IterateScheduler<SerialAsync>::handOff(Iterate<SerialAsync>* it)
-{
-  it->notify();
-}
-
-//////////////////////////////////////////////////////////////////////
-//
-// Inline implementation of the functions for Iterate<SerialAsync>
-//
-//////////////////////////////////////////////////////////////////////
-
-//
-// Iterate<SerialAsync>::Iterate
-// Construct with the scheduler and the number of notifications.
-// Ignore the affinity.
-//
-
-inline
-Iterate<SerialAsync>::Iterate(IterateScheduler<SerialAsync>& s, int)
-: scheduler_m(s), notifications_m(1)
-{
-}
-
-//
-// Iterate<SerialAsync>::notify
-// Notify the iterate that a DataObject is ready.
-// Decrement the counter, and if it is zero, alert the scheduler.
-//
-
-inline void
-Iterate<SerialAsync>::notify()
-{
-  if ( --notifications_m == 0 )
-  {
-    add(this);
-  }
-}
-
-//////////////////////////////////////////////////////////////////////
-//
-// Inline implementation of the functions for DataObject<SerialAsync>
-//
-//////////////////////////////////////////////////////////////////////
-
-//
-// DataObject::DataObject()
-// Initialize:
-//   released_m to the end of the queue (which should) also be the
-//   beginning.  notifications_m to zero, since nothing has been
-//   released yet.
-//
-
-inline
-DataObject<SerialAsync>::DataObject(int)
-: released_m(queue_m.end()), notifications_m(0)
-{
-}
-
-//
-// void DataObject::release(Action)
-// An iterate has finished and is telling the DataObject that
-// it is no longer needed.
-//
+/// void DataObject::releaseIterates(SerialAsync::Action)
+/// When the last released iterate dies, we need to
+/// look at the beginning of the queue and tell more iterates
+/// that they can access this data.

 inline void
-DataObject<SerialAsync>::release(SerialAsync::Action)
-{
-  if ( --notifications_m == 0 )
-    releaseIterates();
-}
-
-
-
-//-----------------------------------------------------------------------------
-//
-// void IterateScheduler<SerialAsync>::blockingEvaluate
-// Evaluate all the iterates in the queue.
-//
-//-----------------------------------------------------------------------------
-inline
-void
-IterateScheduler<SerialAsync>::blockingEvaluate()
-{
-  // Loop as long as there is anything in the queue.
-  while (SystemContext::workReady())
-  {
-    SystemContext::runSomething();
-  }
-}
-
-//-----------------------------------------------------------------------------
-//
-// void DataObject::releaseIterates(SerialAsync::Action)
-// When the last released iterate dies, we need to
-// look at the beginning of the queue and tell more iterates
-// that they can access this data.
-//
-//-----------------------------------------------------------------------------
-inline
-void
 DataObject<SerialAsync>::releaseIterates()
 {
   // Get rid of the reservations that have finished.
@@ -622,14 +616,17 @@
       released_m->iterate().notify();
       ++notifications_m;

-      // Record what action that one will take.
+      // Record what action that one will take
+      // and record its generation number
       SerialAsync::Action act = released_m->act();
+      int generation = released_m->iterate().generation();

       // Look at the next iterate.
       ++released_m;

       // If the first one was a read, release more.
       if ( act == SerialAsync::Read )
+       {

         // As long as we aren't at the end and we have more reads...
         while ((released_m != end) &&
@@ -642,29 +639,30 @@
             // And go on to the next.
             ++released_m;
           }
+
+       }
+
     }
 }

+/// void DataObject::request(Iterate&, action)
+/// An iterate makes a reservation with this DataObject for a given
+/// action in a given generation.  The request may be granted
+/// immediately.

-//
-// void DataObject::request(Iterate&, action)
-// An iterate makes a reservation with this DataObject for a given
-// action in a given generation.  The request may be granted
-// immediately.
-//
-inline
-void
+inline void
 DataObject<SerialAsync>::request(Iterate<SerialAsync>& it,
                                  SerialAsync::Action act)

 {
   // The request can be granted immediately if:
   // The queue is currently empty, or
-  // The request is a read and everything in the queue is a read.
+  // the request is a read and everything in the queue is a read,
+  // or (with relaxed conditions), everything is the same generation.

   // Set notifications dynamically and automatically
   //     every time a request is made by the iterate
-  it.incr_notifications();
+  it.notifications_m++;

   bool allReleased = (queue_m.end() == released_m);
   bool releasable =  queue_m.empty() ||
@@ -691,17 +689,11 @@
 }


-//----------------------------------------------------------------------
-
-
-//
-// End of Smarts namespace.
-//
-}
+} // namespace Smarts

 //////////////////////////////////////////////////////////////////////

-#endif     // POOMA_PACKAGE_CLASS_H
+#endif     // _SerialAsync_h_

 /***************************************************************************
  * $RCSfile: SerialAsync.h,v $   $Author: sa_smith $
--- 
/home/richard/src/pooma/cvs/r2/src/Threads/IterateSchedulers/SerialAsync.cmpl.cpp
   2000-04-12 02:08:06.000000000 +0200
+++ Threads/IterateSchedulers/SerialAsync.cmpl.cpp      2004-01-02 
00:40:16.000000000 +0100
@@ -82,6 +82,12 @@

 std::list<RunnablePtr_t> SystemContext::workQueueMessages_m;
 std::list<RunnablePtr_t> SystemContext::workQueue_m;
+#if POOMA_MPI
+  MPI_Request SystemContext::requests_m[1024];
+  std::map<int, SystemContext::IteratePtr_t> 
SystemContext::allocated_requests_m;
+  std::set<int> SystemContext::free_requests_m;
+#endif
+std::stack<int> IterateScheduler<SerialAsync>::generationStack_m;

 }

reply via email to

[Prev in Thread] Current Thread [Next in Thread]