freepooma-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [pooma-dev] [PATCH] MPI support for SerialAsync scheduler


From: Richard Guenther
Subject: Re: [pooma-dev] [PATCH] MPI support for SerialAsync scheduler
Date: Tue, 6 Jan 2004 20:58:33 +0100 (CET)

On Tue, 6 Jan 2004, Jeffrey D. Oldham wrote:

> Let's move the magic constant into a const variable instead of having
> the constant scattered throughout the code.  Then, please commit.  Thanks.

For the record, this is what I committed.  It passes builds for both
--serial and --mpi for me.

Richard.


2004Jan06  Richard Guenther <address@hidden>

        * src/Threads/IterateSchedulers/SerialAsync.h: doxygenifize,
        add std::stack<int> for generation tracking, add support for
        asyncronous MPI requests.
        src/Threads/IterateSchedulers/SerialAsync.cmpl.cpp: define
        new static variables.
        src/Threads/IterateSchedulers/Runnable.h: declare add().
        src/Pooma/Pooma.cmpl.cpp: use SystemContext::max_requests
        constant.

Index: Pooma/Pooma.cmpl.cpp
===================================================================
RCS file: /home/pooma/Repository/r2/src/Pooma/Pooma.cmpl.cpp,v
retrieving revision 1.40
diff -u -u -r1.40 Pooma.cmpl.cpp
--- Pooma/Pooma.cmpl.cpp        5 Jan 2004 22:34:33 -0000       1.40
+++ Pooma/Pooma.cmpl.cpp        6 Jan 2004 19:52:47 -0000
@@ -354,8 +354,7 @@
 #if POOMA_MPI
   MPI_Comm_rank(MPI_COMM_WORLD, &myContext_g);
   MPI_Comm_size(MPI_COMM_WORLD, &numContexts_g);
-  // ugh...
-  for (int i=0; 
i<sizeof(Smarts::SystemContext::requests_m)/sizeof(MPI_Request); ++i)
+  for (int i=0; i<Smarts::SystemContext::max_requests; ++i)
     Smarts::SystemContext::free_requests_m.insert(i);
 #elif POOMA_CHEETAH
   PAssert(controller_g != 0);
Index: Threads/IterateSchedulers/Runnable.h
===================================================================
RCS file: /home/pooma/Repository/r2/src/Threads/IterateSchedulers/Runnable.h,v
retrieving revision 1.4
diff -u -u -r1.4 Runnable.h
--- Threads/IterateSchedulers/Runnable.h        8 Jun 2000 22:16:50 -0000       
1.4
+++ Threads/IterateSchedulers/Runnable.h        6 Jan 2004 19:52:47 -0000
@@ -125,5 +125,10 @@

 typedef Runnable *RunnablePtr_t;

+/// Schedulers need to implement this function to add
+/// a runnable to the execution queue.
+
+inline void add(RunnablePtr_t);
+
 }
 #endif
Index: Threads/IterateSchedulers/SerialAsync.cmpl.cpp
===================================================================
RCS file: 
/home/pooma/Repository/r2/src/Threads/IterateSchedulers/SerialAsync.cmpl.cpp,v
retrieving revision 1.3
diff -u -u -r1.3 SerialAsync.cmpl.cpp
--- Threads/IterateSchedulers/SerialAsync.cmpl.cpp      12 Apr 2000 00:08:06 
-0000      1.3
+++ Threads/IterateSchedulers/SerialAsync.cmpl.cpp      6 Jan 2004 19:52:47 
-0000
@@ -82,6 +82,13 @@

 std::list<RunnablePtr_t> SystemContext::workQueueMessages_m;
 std::list<RunnablePtr_t> SystemContext::workQueue_m;
+#if POOMA_MPI
+  const int SystemContext::max_requests;
+  MPI_Request SystemContext::requests_m[SystemContext::max_requests];
+  std::map<int, SystemContext::IteratePtr_t> 
SystemContext::allocated_requests_m;
+  std::set<int> SystemContext::free_requests_m;
+#endif
+std::stack<int> IterateScheduler<SerialAsync>::generationStack_m;

 }

Index: Threads/IterateSchedulers/SerialAsync.h
===================================================================
RCS file: 
/home/pooma/Repository/r2/src/Threads/IterateSchedulers/SerialAsync.h,v
retrieving revision 1.9
diff -u -u -r1.9 SerialAsync.h
--- Threads/IterateSchedulers/SerialAsync.h     8 Jun 2000 22:16:50 -0000       
1.9
+++ Threads/IterateSchedulers/SerialAsync.h     6 Jan 2004 19:52:48 -0000
@@ -42,48 +42,38 @@
 // DataObject<SerialAsync>
 //-----------------------------------------------------------------------------

-#include <iostream>
-
 #ifndef _SerialAsync_h_
 #define _SerialAsync_h_
-/*
-LIBRARY:
-        SerialAsync
-
-CLASSES: IterateScheduler
-
-CLASSES: DataObject
-
-CLASSES: Iterate
-
-OVERVIEW
-        SerialAsync IterateScheduler is a policy template to create a
-        dependence graphs and executes the graph respecting the
-        dependencies without using threads. There is no parallelism,
-        but Iterates may be executed out-of-order with respect to the
-        program text.
-
------------------------------------------------------------------------------*/
-
-//////////////////////////////////////////////////////////////////////

-//-----------------------------------------------------------------------------
-// Overview:
-// Smarts classes for times when you want no threads but you do want
-// dataflow evaluation.
-//-----------------------------------------------------------------------------
-
-//-----------------------------------------------------------------------------
-// Typedefs:
-//-----------------------------------------------------------------------------
+/** @file
+ * @ingroup IterateSchedulers
+ * @brief
+ * Smarts classes for times when you want no threads but you do want
+ * dataflow evaluation.
+ *
+ * SerialAsync IterateScheduler is a policy template to create a
+ * dependence graphs and executes the graph respecting the
+ * dependencies without using threads.
+ * There is no (thread level) parallelism, but Iterates may be executed
+ * out-of-order with respect to the program text. Also this scheduler is
+ * used for message based parallelism in which case asyncronous execution
+ * leads to reduced communication latencies.
+ */

 //-----------------------------------------------------------------------------
 // Includes:
 //-----------------------------------------------------------------------------

 #include <list>
+#include <vector>
+#include <map>
+#include <set>
+#include <functional>
+#include <stack>
 #include "Threads/IterateSchedulers/IterateScheduler.h"
 #include "Threads/IterateSchedulers/Runnable.h"
+#include "Tulip/Messaging.h"
+#include "Utilities/PAssert.h"

 //-----------------------------------------------------------------------------
 // Forward Declarations:
@@ -94,76 +84,258 @@

 namespace Smarts {

-#define MYID 0
-#define MAX_CPUS 1
-//
-// Tag class for specializing IterateScheduler, Iterate and DataObject.
-//
+/**
+ * Tag class for specializing IterateScheduler, Iterate and DataObject.
+ */
+
 struct SerialAsync
 {
-  enum Action { Read, Write};
+  enum Action { Read, Write };
 };


-//-----------------------------------------------------------------------------
+/**
+ * Iterate<SerialAsync> is used to implement the SerialAsync
+ * scheduling policy.
+ *
+ * An Iterate is a non-blocking unit of concurrency that is used
+ * to describe a chunk of work. It inherits from the Runnable
+ * class and as all subclasses of Runnable, the user specializes
+ * the run() method to specify the operation.
+ * Iterate<SerialAsync> is a further specialization of the
+ * Iterate class to use the SerialAsync Scheduling algorithm to
+ * generate the data dependency graph for a data-driven
+ * execution.
+ */
+
+template<>
+class Iterate<SerialAsync> : public Runnable
+{
+  friend class IterateScheduler<SerialAsync>;
+  friend class DataObject<SerialAsync>;
+
+public:
+
+  typedef DataObject<SerialAsync> DataObject_t;
+  typedef IterateScheduler<SerialAsync> IterateScheduler_t;
+
+
+  /// The Constructor for this class takes the IterateScheduler and a
+  /// CPU affinity.  CPU affinity has a default value of -1 which means
+  /// it may run on any CPU available.
+
+  inline Iterate(IterateScheduler<SerialAsync> & s, int affinity=-1)
+    : scheduler_m(s), notifications_m(1), generation_m(-1), togo_m(1)
+  {}
+
+  /// The dtor is virtual because the subclasses will need to add to it.
+
+  virtual ~Iterate() {}
+
+  /// The run method does the core work of the Iterate.
+  /// It is supplied by the subclass.
+
+  virtual void run() = 0;
+
+  //@name Stubs for the affinities
+  /// There is no such thing in serial.
+  //@{
+
+  inline int affinity() const {return 0;}
+
+  inline int hintAffinity() const {return 0;}
+
+  inline void affinity(int) {}
+
+  inline void hintAffinity(int) {}
+
+  //@}
+
+  /// Notify is used to indicate to the Iterate that one of the data
+  /// objects it had requested has been granted. To do this, we dec a
+  /// dependence counter which, if equal to 0, the Iterate is ready for
+  /// execution.
+
+  void notify()
+  {
+    if (--notifications_m == 0)
+      add(this);
+  }
+
+  /// How many notifications remain?
+
+  int notifications() const { return notifications_m; }
+
+  void addNotification() { notifications_m++; }
+
+  int& generation() { return generation_m; }
+
+  int& togo() { return togo_m; }
+
+protected:
+
+  /// What scheduler are we working with?
+  IterateScheduler<SerialAsync> &scheduler_m;
+
+  /// How many notifications should we receive before we can run?
+  int notifications_m;
+
+  /// Which generation we were issued in.
+  int generation_m;
+
+  /// How many times we need to go past a "did something" to be ready
+  /// for destruction?
+  int togo_m;
+
+};
+

 struct SystemContext
 {
   void addNCpus(int) {}
   void wait() {}
   void concurrency(int){}
-  int concurrency() {return 1;}
+  int concurrency() { return 1; }
   void mustRunOn() {}

   // We have a separate message queue because they are
   // higher priority.
+  typedef Iterate<SerialAsync> *IteratePtr_t;
   static std::list<RunnablePtr_t> workQueueMessages_m;
   static std::list<RunnablePtr_t> workQueue_m;
+#if POOMA_MPI
+  static const int max_requests = 1024;
+  static MPI_Request requests_m[max_requests];
+  static std::map<int, IteratePtr_t> allocated_requests_m;
+  static std::set<int> free_requests_m;
+#endif
+
+
+#if POOMA_MPI
+
+  /// Query, if we have lots of MPI_Request slots available

-  ///////////////////////////
-  // This function lets you check if there are iterates that are
-  // ready to run.
-  inline static
-  bool workReady()
+  static bool haveLotsOfMPIRequests()
   {
-    return !(workQueue_m.empty() && workQueueMessages_m.empty());
+    return free_requests_m.size() > max_requests/2;
   }

-  ///////////////////////////
-  // Run an iterate if one is ready.
-  inline static
-  void runSomething()
+  /// Get a MPI_Request slot, associated with an iterate
+
+  static MPI_Request* getMPIRequest(IteratePtr_t p)
   {
-    if (!workQueueMessages_m.empty())
-    {
-      // Get the top iterate.
-      // Delete it from the queue.
-      // Run the iterate.
-      // Delete the iterate.  This could put more iterates in the queue.
+    PInsist(!free_requests_m.empty(), "No free MPIRequest slots.");
+    int i = *free_requests_m.begin();
+    free_requests_m.erase(free_requests_m.begin());
+    allocated_requests_m[i] = p;
+    p->togo()++;
+    return &requests_m[i];
+  }

-      RunnablePtr_t p = workQueueMessages_m.front();
-      workQueueMessages_m.pop_front();
-      p->execute();
+  static void releaseMPIRequest(int i)
+  {
+    IteratePtr_t p = allocated_requests_m[i];
+    allocated_requests_m.erase(i);
+    free_requests_m.insert(i);
+    if (--(p->togo()) == 0)
       delete p;
-    }
+  }
+
+  static bool waitForSomeRequests(bool mayBlock)
+  {
+    if (allocated_requests_m.empty())
+      return false;
+
+    int last_used_request = allocated_requests_m.rbegin()->first;
+    int finished[last_used_request+1];
+    MPI_Status statuses[last_used_request+1];
+    int nr_finished;
+    int res;
+    if (mayBlock)
+      res = MPI_Waitsome(last_used_request+1, requests_m,
+                        &nr_finished, finished, statuses);
     else
-    {
-      if (!workQueue_m.empty())
-      {
-       RunnablePtr_t p = workQueue_m.front();
-       workQueue_m.pop_front();
-       p->execute();
-       delete p;
+      res = MPI_Testsome(last_used_request+1, requests_m,
+                        &nr_finished, finished, statuses);
+    PAssert(res == MPI_SUCCESS || res == MPI_ERR_IN_STATUS);
+    if (nr_finished == MPI_UNDEFINED)
+      return false;
+
+    // release finised requests
+    while (nr_finished--) {
+      if (res == MPI_ERR_IN_STATUS) {
+       if (statuses[nr_finished].MPI_ERROR != MPI_SUCCESS) {
+         char msg[MPI_MAX_ERROR_STRING+1];
+         int len;
+         MPI_Error_string(statuses[nr_finished].MPI_ERROR, msg, &len);
+         msg[len] = '\0';
+         PInsist(0, msg);
+       }
       }
+      releaseMPIRequest(finished[nr_finished]);
     }
+    return true;
+  }
+
+#else
+
+  static bool waitForSomeRequests(bool mayBlock)
+  {
+    return false;
+  }
+
+#endif
+
+
+  /// This function lets you check if there are iterates that are
+  /// ready to run.
+
+  static bool workReady()
+  {
+    return !(workQueue_m.empty()
+            && workQueueMessages_m.empty()
+#if POOMA_MPI
+            && allocated_requests_m.empty()
+#endif
+            );
+  }
+
+  /// Run an iterate if one is ready.  Returns if progress
+  /// was made.
+
+  static bool runSomething(bool mayBlock = true)
+  {
+    // do work in this order to minimize communication latency:
+    // - issue all messages
+    // - do some regular work
+    // - wait for messages to complete
+
+    RunnablePtr_t p = NULL;
+    if (!workQueueMessages_m.empty()) {
+      p = workQueueMessages_m.front();
+      workQueueMessages_m.pop_front();
+    } else if (!workQueue_m.empty()) {
+      p = workQueue_m.front();
+      workQueue_m.pop_front();
+    }
+
+    if (p) {
+      p->execute();
+      Iterate<SerialAsync> *it = dynamic_cast<IteratePtr_t>(p);
+      if (it) {
+       if (--(it->togo()) == 0)
+         delete it;
+      } else
+       delete p;
+      return true;
+
+    } else
+      return waitForSomeRequests(mayBlock);
   }

 };

-inline void addRunnable(RunnablePtr_t rn)
-{
-  SystemContext::workQueue_m.push_front(rn);
-}
+/// Adds a runnable to the appropriate work-queue.

 inline void add(RunnablePtr_t rn)
 {
@@ -182,25 +354,18 @@
 inline  void wait() {}
 inline  void mustRunOn(){}

-/*------------------------------------------------------------------------
-CLASS
-       IterateScheduler_Serial_Async
-
-       Implements a asynchronous scheduler for a data driven execution.
-       Specializes a IterateScheduler.
-
-KEYWORDS
-       Data-parallelism, Native-interface, IterateScheduler.
-
-DESCRIPTION
-
-        The SerialAsync IterateScheduler, Iterate and DataObject
-       implement a SMARTS scheduler that does dataflow without threads.
-       What that means is that when you hand iterates to the
-       IterateScheduler it stores them up until you call
-       IterateScheduler::blockingEvaluate(), at which point it evaluates
-       iterates until the queue is empty.
------------------------------------------------------------------------------*/
+
+/**
+ * Implements a asynchronous scheduler for a data driven execution.
+ * Specializes a IterateScheduler.
+ *
+ * The SerialAsync IterateScheduler, Iterate and DataObject
+ * implement a SMARTS scheduler that does dataflow without threads.
+ * What that means is that when you hand iterates to the
+ * IterateScheduler it stores them up until you call
+ * IterateScheduler::blockingEvaluate(), at which point it evaluates
+ * iterates until the queue is empty.
+ */

 template<>
 class IterateScheduler<SerialAsync>
@@ -212,196 +377,128 @@
   typedef DataObject<SerialAsync> DataObject_t;
   typedef Iterate<SerialAsync> Iterate_t;

-  ///////////////////////////
-  // Constructor
-  //
-  IterateScheduler() {}
-
-  ///////////////////////////
-  // Destructor
-  //
-  ~IterateScheduler() {}
-  void setConcurrency(int) {}
-
-  //---------------------------------------------------------------------------
-  // Mutators.
-  //---------------------------------------------------------------------------
-
-  ///////////////////////////
-  // Tells the scheduler that the parser thread is starting a new
-  // data-parallel statement.  Any Iterate that is handed off to the
-  // scheduler between beginGeneration() and endGeneration() belongs
-  // to the same data-paralllel statement and therefore has the same
-  // generation number.
-  //
-  inline void beginGeneration() { }
-
-  ///////////////////////////
-  // Tells the scheduler that no more Iterates will be handed off for
-  // the data parallel statement that was begun with a
-  // beginGeneration().
-  //
-  inline void endGeneration() {}
-
-  ///////////////////////////
-  // The parser thread calls this method to evaluate the generated
-  // graph until all the nodes in the dependence graph has been
-  // executed by the scheduler.  That is to say, the scheduler
-  // executes all the Iterates that has been handed off to it by the
-  // parser thread.
-  //
-  inline
-  void blockingEvaluate();
-
-  ///////////////////////////
-  // The parser thread calls this method to ask the scheduler to run
-  // the given Iterate when the dependence on that Iterate has been
-  // satisfied.
-  //
-  inline void handOff(Iterate<SerialAsync>* it);
+  IterateScheduler()
+    : generation_m(0)
+  {}

-  inline
-  void releaseIterates() { }
+  ~IterateScheduler() {}

-protected:
-private:
+  void setConcurrency(int) {}

-  typedef std::list<Iterate_t*> Container_t;
-  typedef Container_t::iterator Iterator_t;
+  /// Tells the scheduler that the parser thread is starting a new
+  /// data-parallel statement.  Any Iterate that is handed off to the
+  /// scheduler between beginGeneration() and endGeneration() belongs
+  /// to the same data-paralllel statement and therefore has the same
+  /// generation number.
+  /// Nested invocations are handled as being part of the outermost
+  /// generation.

-};
+  void beginGeneration()
+  {
+    // Ensure proper overflow behavior.
+    if (++generation_m < 0)
+      generation_m = 0;
+    generationStack_m.push(generation_m);
+  }

-//-----------------------------------------------------------------------------
+  /// Tells the scheduler that no more Iterates will be handed off for
+  /// the data parallel statement that was begun with a
+  /// beginGeneration().

-/*------------------------------------------------------------------------
-CLASS
-       Iterate_SerialAsync
-
-       Iterate<SerialAsync> is used to implement the SerialAsync
-       scheduling policy.
-
-KEYWORDS
-       Data_Parallelism, Native_Interface, IterateScheduler, Data_Flow.
-
-DESCRIPTION
-        An Iterate is a non-blocking unit of concurrency that is used
-       to describe a chunk of work. It inherits from the Runnable
-       class and as all subclasses of Runnable, the user specializes
-       the run() method to specify the operation.
-       Iterate<SerialAsync> is a further specialization of the
-       Iterate class to use the SerialAsync Scheduling algorithm to
-       generate the data dependency graph for a data-driven
-       execution.  */
+  void endGeneration()
+  {
+    PAssert(inGeneration());
+    generationStack_m.pop();

-template<>
-class Iterate<SerialAsync> : public Runnable
-{
-  friend class IterateScheduler<SerialAsync>;
-  friend class DataObject<SerialAsync>;
+#if POOMA_MPI
+    // this is a safe point to block until we have "lots" of MPI Requests
+    if (!inGeneration())
+      while (!SystemContext::haveLotsOfMPIRequests())
+       SystemContext::runSomething(true);
+#endif
+  }

-public:
+  /// Wether we are inside a generation and may not safely block.

-  typedef DataObject<SerialAsync> DataObject_t;
-  typedef IterateScheduler<SerialAsync> IterateScheduler_t;
+  bool inGeneration() const
+  {
+    return !generationStack_m.empty();
+  }

+  /// What the current generation is.

-  ///////////////////////////
-  // The Constructor for this class takes the IterateScheduler and a
-  // CPU affinity.  CPU affinity has a default value of -1 which means
-  // it may run on any CPU available.
-  //
-  inline Iterate(IterateScheduler<SerialAsync> & s, int affinity=-1);
-
-  ///////////////////////////
-  // The dtor is virtual because the subclasses will need to add to it.
-  //
-  virtual ~Iterate() {}
+  int generation() const
+  {
+    if (!inGeneration())
+      return -1;
+    return generationStack_m.top();
+  }

-  ///////////////////////////
-  // The run method does the core work of the Iterate.
-  // It is supplied by the subclass.
-  //
-  virtual void run() = 0;
+  /// The parser thread calls this method to evaluate the generated
+  /// graph until all the nodes in the dependence graph has been
+  /// executed by the scheduler.  That is to say, the scheduler
+  /// executes all the Iterates that has been handed off to it by the
+  /// parser thread.

-  ///////////////////////////
-  // Stub in all the affinities, because there is no such thing
-  // in serial.
-  //
-  inline int affinity() const {return 0;}
-  ///////////////////////////
-  // Stub in all the affinities, because there is no such thing
-  // in serial.
-  //
-  inline int hintAffinity() const {return 0;}
-  ///////////////////////////
-  // Stub in all the affinities, because there is no such thing
-  // in serial.
-  //
-  inline void affinity(int) {}
-  ///////////////////////////
-  // Stub in all the affinities, because there is no such thing
-  // in serial.
-  //
-  inline void hintAffinity(int) {}
+  void blockingEvaluate()
+  {
+    if (inGeneration()) {
+      // It's not safe to block inside a generation, so
+      // just do as much as we can without blocking.
+      while (SystemContext::runSomething(false))
+       ;
+
+    } else {
+      // Loop as long as there is anything in the queue.
+      while (SystemContext::workReady())
+        SystemContext::runSomething(true);
+    }
+  }

-  ///////////////////////////
-  // Notify is used to indicate to the Iterate that one of the data
-  // objects it had requested has been granted. To do this, we dec a
-  // dependence counter which, if equal to 0, the Iterate is ready for
-  // execution.
-  //
-  inline void notify();
-
-  ///////////////////////////
-  // How many notifications remain?
-  //
-  inline
-  int notifications() const { return notifications_m; }
+  /// The parser thread calls this method to ask the scheduler to run
+  /// the given Iterate when the dependence on that Iterate has been
+  /// satisfied.

-  inline void addNotification()
+  void handOff(Iterate<SerialAsync>* it)
   {
-    notifications_m++;
+    // No action needs to be taken here.  Iterates will make their
+    // own way into the execution queue.
+    it->generation() = generation();
+    it->notify();
   }

-protected:
+  void releaseIterates() { }

-  // What scheduler are we working with?
-  IterateScheduler<SerialAsync> &scheduler_m;
+private:

-  // How many notifications should we receive before we can run?
-  int notifications_m;
+  typedef std::list<Iterate_t*> Container_t;
+  typedef Container_t::iterator Iterator_t;

-private:
-  // Set notifications dynamically and automatically every time a
-  // request is made by the iterate
-  void incr_notifications() { notifications_m++;}
+  static std::stack<int> generationStack_m;
+  int generation_m;

 };


-//-----------------------------------------------------------------------------
-
-/*------------------------------------------------------------------------
-CLASS
-       DataObject_SerialAsync
-
-       Implements a asynchronous scheduler for a data driven execution.
-KEYWORDS
-       Data-parallelism, Native-interface, IterateScheduler.
-
-DESCRIPTION
-        The DataObject Class is used introduce a type to represent
-       a resources (normally) blocks of data) that Iterates contend
-       for atomic access. Iterates make request for either a read or
-       write to the DataObjects. DataObjects may grant the request if
-       the object is currently available. Otherwise, the request is
-       enqueue in a queue private to the data object until the
-       DataObject is release by another Iterate. A set of read
-       requests may be granted all at once if there are no
-       intervening write request to that DataObject.
-       DataObject<SerialAsync> is a specialization of DataObject for
-       the policy template SerialAsync.
-*/
+/**
+ * Implements a asynchronous scheduler for a data driven execution.
+ *
+ * The DataObject Class is used introduce a type to represent
+ * a resources (normally) blocks of data) that Iterates contend
+ * for atomic access. Iterates make request for either a read or
+ * write to the DataObjects. DataObjects may grant the request if
+ * the object is currently available. Otherwise, the request is
+ * enqueue in a queue private to the data object until the
+ * DataObject is release by another Iterate. A set of read
+ * requests may be granted all at once if there are no
+ * intervening write request to that DataObject.
+ * DataObject<SerialAsync> is a specialization of DataObject for
+ * the policy template SerialAsync.
+ *
+ * There are two ways data can be used: to read or to write.
+ * Don't change this to give more than two states;
+ * things inside depend on that.
+ */

 template<>
 class DataObject<SerialAsync>
@@ -413,54 +510,56 @@
   typedef IterateScheduler<SerialAsync> IterateScheduler_t;
   typedef Iterate<SerialAsync> Iterate_t;

-  // There are two ways data can be used: to read or to write.
-  // Don't change this to give more than two states:
-  // things inside depend on that.
-
-  ///////////////////////////
-  // Construct the data object with an empty set of requests
-  // and the given affinity.
-  //
-  inline DataObject(int affinity=-1);
+
+  /// Construct the data object with an empty set of requests
+  /// and the given affinity.
+
+  DataObject(int affinity=-1)
+    : released_m(queue_m.end()), notifications_m(0)
+  {
+    // released_m to the end of the queue (which should) also be the
+    // beginning.  notifications_m to zero, since nothing has been
+    // released yet.
+  }

-  ///////////////////////////
-  // for compatibility with other SMARTS schedulers, accept
-  // Scheduler arguments (unused)
-  //
-  inline
-  DataObject(int affinity, IterateScheduler<SerialAsync>&);
-
-  ///////////////////////////
-  // Stub out affinity because there is no affinity in serial.
-  //
-  inline int affinity() const { return 0; }
-
-  ///////////////////////////
-  // Stub out affinity because there is no affinity in serial.
-  //
-  inline void affinity(int) {}
+  /// for compatibility with other SMARTS schedulers, accept
+  /// Scheduler arguments (unused)

-  ///////////////////////////
-  // An iterate makes a request for a certain action in a certain
-  // generation.
-  //
-  inline
-  void request(Iterate<SerialAsync>&, SerialAsync::Action);
-
-  ///////////////////////////
-  // An iterate finishes and tells the DataObject it no longer needs
-  // it.  If this is the last release for the current set of
-  // requests, have the IterateScheduler release some more.
-  //
-  inline void release(SerialAsync::Action);
+  inline DataObject(int affinity, IterateScheduler<SerialAsync>&)
+    : released_m(queue_m.end()), notifications_m(0)
+  {}
+
+  /// Stub out affinity because there is no affinity in serial.
+
+  int affinity() const { return 0; }
+
+  /// Stub out affinity because there is no affinity in serial.
+
+  void affinity(int) {}
+
+  /// An iterate makes a request for a certain action in a certain
+  /// generation.
+
+  inline void request(Iterate<SerialAsync>&, SerialAsync::Action);
+
+  /// An iterate finishes and tells the DataObject it no longer needs
+  /// it.  If this is the last release for the current set of
+  /// requests, have the IterateScheduler release some more.
+
+  void release(SerialAsync::Action)
+  {
+    if (--notifications_m == 0)
+      releaseIterates();
+  }

-protected:
 private:

-  // If release needs to let more iterates go, it calls this.
+  /// If release needs to let more iterates go, it calls this.
   inline void releaseIterates();

-  // The type for a request.
+  /**
+   * The type for a request.
+   */
   class Request
   {
   public:
@@ -475,135 +574,27 @@
     SerialAsync::Action act_m;
   };

-  // The type of the queue and iterator.
+  /// The type of the queue and iterator.
   typedef std::list<Request> Container_t;
   typedef Container_t::iterator Iterator_t;

-  // The list of requests from various iterates.
-  // They're granted in FIFO order.
+  /// The list of requests from various iterates.
+  /// They're granted in FIFO order.
   Container_t queue_m;

-  // Pointer to the last request that has been granted.
+  /// Pointer to the last request that has been granted.
   Iterator_t released_m;

-  // The number of outstanding notifications.
+  /// The number of outstanding notifications.
   int notifications_m;
 };

-//////////////////////////////////////////////////////////////////////
-//
-// Inline implementation of the functions for
-// IterateScheduler<SerialAsync>
-//
-//////////////////////////////////////////////////////////////////////
-
-//
-// IterateScheduler<SerialAsync>::handOff(Iterate<SerialAsync>*)
-// No action needs to be taken here.  Iterates will make their
-// own way into the execution queue.
-//
+/// void DataObject::releaseIterates(SerialAsync::Action)
+/// When the last released iterate dies, we need to
+/// look at the beginning of the queue and tell more iterates
+/// that they can access this data.

 inline void
-IterateScheduler<SerialAsync>::handOff(Iterate<SerialAsync>* it)
-{
-  it->notify();
-}
-
-//////////////////////////////////////////////////////////////////////
-//
-// Inline implementation of the functions for Iterate<SerialAsync>
-//
-//////////////////////////////////////////////////////////////////////
-
-//
-// Iterate<SerialAsync>::Iterate
-// Construct with the scheduler and the number of notifications.
-// Ignore the affinity.
-//
-
-inline
-Iterate<SerialAsync>::Iterate(IterateScheduler<SerialAsync>& s, int)
-: scheduler_m(s), notifications_m(1)
-{
-}
-
-//
-// Iterate<SerialAsync>::notify
-// Notify the iterate that a DataObject is ready.
-// Decrement the counter, and if it is zero, alert the scheduler.
-//
-
-inline void
-Iterate<SerialAsync>::notify()
-{
-  if ( --notifications_m == 0 )
-  {
-    add(this);
-  }
-}
-
-//////////////////////////////////////////////////////////////////////
-//
-// Inline implementation of the functions for DataObject<SerialAsync>
-//
-//////////////////////////////////////////////////////////////////////
-
-//
-// DataObject::DataObject()
-// Initialize:
-//   released_m to the end of the queue (which should) also be the
-//   beginning.  notifications_m to zero, since nothing has been
-//   released yet.
-//
-
-inline
-DataObject<SerialAsync>::DataObject(int)
-: released_m(queue_m.end()), notifications_m(0)
-{
-}
-
-//
-// void DataObject::release(Action)
-// An iterate has finished and is telling the DataObject that
-// it is no longer needed.
-//
-
-inline void
-DataObject<SerialAsync>::release(SerialAsync::Action)
-{
-  if ( --notifications_m == 0 )
-    releaseIterates();
-}
-
-
-
-//-----------------------------------------------------------------------------
-//
-// void IterateScheduler<SerialAsync>::blockingEvaluate
-// Evaluate all the iterates in the queue.
-//
-//-----------------------------------------------------------------------------
-inline
-void
-IterateScheduler<SerialAsync>::blockingEvaluate()
-{
-  // Loop as long as there is anything in the queue.
-  while (SystemContext::workReady())
-  {
-    SystemContext::runSomething();
-  }
-}
-
-//-----------------------------------------------------------------------------
-//
-// void DataObject::releaseIterates(SerialAsync::Action)
-// When the last released iterate dies, we need to
-// look at the beginning of the queue and tell more iterates
-// that they can access this data.
-//
-//-----------------------------------------------------------------------------
-inline
-void
 DataObject<SerialAsync>::releaseIterates()
 {
   // Get rid of the reservations that have finished.
@@ -622,14 +613,17 @@
       released_m->iterate().notify();
       ++notifications_m;

-      // Record what action that one will take.
+      // Record what action that one will take
+      // and record its generation number
       SerialAsync::Action act = released_m->act();
+      int generation = released_m->iterate().generation();

       // Look at the next iterate.
       ++released_m;

       // If the first one was a read, release more.
       if ( act == SerialAsync::Read )
+       {

         // As long as we aren't at the end and we have more reads...
         while ((released_m != end) &&
@@ -642,29 +636,30 @@
             // And go on to the next.
             ++released_m;
           }
+
+       }
+
     }
 }

+/// void DataObject::request(Iterate&, action)
+/// An iterate makes a reservation with this DataObject for a given
+/// action in a given generation.  The request may be granted
+/// immediately.

-//
-// void DataObject::request(Iterate&, action)
-// An iterate makes a reservation with this DataObject for a given
-// action in a given generation.  The request may be granted
-// immediately.
-//
-inline
-void
+inline void
 DataObject<SerialAsync>::request(Iterate<SerialAsync>& it,
                                  SerialAsync::Action act)

 {
   // The request can be granted immediately if:
   // The queue is currently empty, or
-  // The request is a read and everything in the queue is a read.
+  // the request is a read and everything in the queue is a read,
+  // or (with relaxed conditions), everything is the same generation.

   // Set notifications dynamically and automatically
   //     every time a request is made by the iterate
-  it.incr_notifications();
+  it.notifications_m++;

   bool allReleased = (queue_m.end() == released_m);
   bool releasable =  queue_m.empty() ||
@@ -691,17 +686,11 @@
 }


-//----------------------------------------------------------------------
-
-
-//
-// End of Smarts namespace.
-//
-}
+} // namespace Smarts

 //////////////////////////////////////////////////////////////////////

-#endif     // POOMA_PACKAGE_CLASS_H
+#endif     // _SerialAsync_h_

 /***************************************************************************
  * $RCSfile: SerialAsync.h,v $   $Author: sa_smith $

reply via email to

[Prev in Thread] Current Thread [Next in Thread]