[Yt-svn] yt: Making the cycle logic of the SFG more robust.

hg at spacepope.org hg at spacepope.org
Mon Apr 19 09:17:32 PDT 2010


hg Repository: yt
details:   yt/rev/510ac77d14f7
changeset: 1567:510ac77d14f7
user:      Stephen Skory <stephenskory at yahoo.com>
date:
Mon Apr 19 09:17:00 2010 -0700
description:
Making the cycle logic of the SFG more robust.

diffstat:

 yt/lagos/StructureFunctionGenerator.py |  75 ++++++++++++++++++++-----------------
 1 files changed, 41 insertions(+), 34 deletions(-)

diffs (129 lines):

diff -r d0f87ad5ac49 -r 510ac77d14f7 yt/lagos/StructureFunctionGenerator.py
--- a/yt/lagos/StructureFunctionGenerator.py	Thu Apr 15 13:39:49 2010 -0400
+++ b/yt/lagos/StructureFunctionGenerator.py	Mon Apr 19 09:17:00 2010 -0700
@@ -69,14 +69,15 @@
         self.size = self._mpi_get_size()
         self.mine = self._mpi_get_rank()
         self.vol_ratio = vol_ratio
-        self.total_values = total_values / self.size
+        self.total_values = int(total_values / self.size)
         # For communication.
         self.recv_hooks = []
         self.send_hooks = []
         self.done_hooks = []
-        self.comm_size = comm_size
+        self.comm_size = min(int(comm_size), self.total_values)
         self.pf = pf
         self.nlevels = na.unique(self.pf.h.grid_levels).size
+        self.nlevels = 10
         self.period = self.pf['DomainRightEdge'] - self.pf['DomainLeftEdge']
         self.min_edge = min(self.period)
         self.hierarchy = pf.h
@@ -198,7 +199,7 @@
             self.comm_cycle_count = 0
             self.final_comm_cycle_count = 0
             self.sent_done = False
-            self._setup_done_hooks()
+            self._setup_done_hooks_on_root()
             # While everyone else isn't done or I'm not done, we loop.
             while self._should_cycle():
                 self._setup_recv_arrays()
@@ -218,7 +219,7 @@
                 self.gen_array[self.mine] = self.generated_points
                 self.comm_cycle_count += 1
                 if self.generated_points == self.total_values:
-                    self._send_done_toall()
+                    self._send_done_to_root()
             if self.mine == 0:
                 mylog.info("Length (%d of %d) %1.5e took %d communication cycles to complete." % \
                 (bigloop+1, len(self.lengths), length, self.comm_cycle_count))
@@ -282,53 +283,59 @@
         to start with.
         """
         self.points = na.ones((self.comm_size, 6), dtype='float64') * -1.0
-
-    def _setup_done_hooks(self):
+    
+    def _setup_done_hooks_on_root(self):
         """
-        Opens non-blocking receives pointing to all the other tasks.
+        Opens non-blocking receives on root pointing to all the other tasks
         """
+        if self.mine != 0:
+            return
         self.recv_done = {}
         for task in xrange(self.size):
             if task == self.mine: continue
             self.recv_done[task] = na.zeros(1, dtype='int64')
             self.done_hooks.append(self._mpi_Irecv_long(self.recv_done[task], \
                 task, tag=15))
-
-    def _send_done_toall(self):
+    
+    def _send_done_to_root(self):
         """
-        Signal all the other tasks that this task has created all the points
-        it needs to.
+        Tell the root process that I'm done.
         """
         # If I've already done this, don't do it again.
         if self.sent_done: return
-        self.send_done = {}
-        for task in xrange(self.size):
-            if task == self.mine: continue
-            # We'll send the cycle at which I think things should stop.
-            self.send_done[task] = na.ones(1, dtype='int64') * \
+        if self.mine !=0:
+            # I send when I *think* things should finish.
+            self.send_done = na.ones(1, dtype='int64') * \
                 (self.size / self.vol_ratio -1) + self.comm_cycle_count
-            self.done_hooks.append(self._mpi_Isend_long(self.send_done[task], \
-                task, tag=15))
-        # I need to mark my *own*, too!
-        self.recv_done[self.mine] = (self.size / self.vol_ratio -1) + \
-            self.comm_cycle_count
+            self.done_hooks.append(self._mpi_Isend_long(self.send_done, \
+                    0, tag=15))
+        else:
+            # As root, I need to mark myself!
+            self.recv_done[0] = na.ones(1, dtype='int64') * \
+                (self.size / self.vol_ratio -1) + self.comm_cycle_count
         self.sent_done = True
-
+    
     def _should_cycle(self):
         """
-        Depending on several factors, it returns whether or not the
-        communication cycle should continue.
+        Determine if I should continue cycling the communication.
         """
-        # We need to continue if:
-        # If I'm not finished.
-        if self.generated_points < self.total_values: return True
-        # If other tasks aren't finished
-        if not self._mpi_Request_Testall(self.done_hooks): return True
-        # If they are all finished, meaning Testall returns True, we find
-        # the biggest value in self.recv_done and stop there.
-        stop = max(self.recv_done.values())
-        if self.comm_cycle_count < stop:
-            self.final_comm_cycle_count += 1
+        if self.mine == 0:
+            # If other tasks aren't finished, this will return False.
+            status = self._mpi_Request_Testall(self.done_hooks)
+            # Convolve this with with root's status.
+            status = status * (self.generated_points == self.total_values)
+            if status == 1:
+                # If they are all finished, meaning Testall returns True,
+                # and root has made its points, we find
+                # the biggest value in self.recv_done and stop there.
+                status = max(self.recv_done.values())
+        else:
+            status = 0
+        # Broadcast the status from root - we stop only if root thinks we should
+        # stop.
+        status = self._mpi_bcast_pickled(status)
+        if status == 0: return True
+        if self.comm_cycle_count < status:
             return True
         # If we've come this far, we're done.
         return False



More information about the yt-svn mailing list