[Yt-svn] yt: Making the cycle logic of the SFG more robust.
hg at spacepope.org
hg at spacepope.org
Mon Apr 19 09:17:32 PDT 2010
hg Repository: yt
details: yt/rev/510ac77d14f7
changeset: 1567:510ac77d14f7
user: Stephen Skory <stephenskory at yahoo.com>
date:
Mon Apr 19 09:17:00 2010 -0700
description:
Making the cycle logic of the SFG more robust.
diffstat:
yt/lagos/StructureFunctionGenerator.py | 75 ++++++++++++++++++++-----------------
1 files changed, 41 insertions(+), 34 deletions(-)
diffs (129 lines):
diff -r d0f87ad5ac49 -r 510ac77d14f7 yt/lagos/StructureFunctionGenerator.py
--- a/yt/lagos/StructureFunctionGenerator.py Thu Apr 15 13:39:49 2010 -0400
+++ b/yt/lagos/StructureFunctionGenerator.py Mon Apr 19 09:17:00 2010 -0700
@@ -69,14 +69,15 @@
self.size = self._mpi_get_size()
self.mine = self._mpi_get_rank()
self.vol_ratio = vol_ratio
- self.total_values = total_values / self.size
+ self.total_values = int(total_values / self.size)
# For communication.
self.recv_hooks = []
self.send_hooks = []
self.done_hooks = []
- self.comm_size = comm_size
+ self.comm_size = min(int(comm_size), self.total_values)
self.pf = pf
self.nlevels = na.unique(self.pf.h.grid_levels).size
+ self.nlevels = 10
self.period = self.pf['DomainRightEdge'] - self.pf['DomainLeftEdge']
self.min_edge = min(self.period)
self.hierarchy = pf.h
@@ -198,7 +199,7 @@
self.comm_cycle_count = 0
self.final_comm_cycle_count = 0
self.sent_done = False
- self._setup_done_hooks()
+ self._setup_done_hooks_on_root()
# While everyone else isn't done or I'm not done, we loop.
while self._should_cycle():
self._setup_recv_arrays()
@@ -218,7 +219,7 @@
self.gen_array[self.mine] = self.generated_points
self.comm_cycle_count += 1
if self.generated_points == self.total_values:
- self._send_done_toall()
+ self._send_done_to_root()
if self.mine == 0:
mylog.info("Length (%d of %d) %1.5e took %d communication cycles to complete." % \
(bigloop+1, len(self.lengths), length, self.comm_cycle_count))
@@ -282,53 +283,59 @@
to start with.
"""
self.points = na.ones((self.comm_size, 6), dtype='float64') * -1.0
-
- def _setup_done_hooks(self):
+
+ def _setup_done_hooks_on_root(self):
"""
- Opens non-blocking receives pointing to all the other tasks.
+ Opens non-blocking receives on root pointing to all the other tasks
"""
+ if self.mine != 0:
+ return
self.recv_done = {}
for task in xrange(self.size):
if task == self.mine: continue
self.recv_done[task] = na.zeros(1, dtype='int64')
self.done_hooks.append(self._mpi_Irecv_long(self.recv_done[task], \
task, tag=15))
-
- def _send_done_toall(self):
+
+ def _send_done_to_root(self):
"""
- Signal all the other tasks that this task has created all the points
- it needs to.
+ Tell the root process that I'm done.
"""
# If I've already done this, don't do it again.
if self.sent_done: return
- self.send_done = {}
- for task in xrange(self.size):
- if task == self.mine: continue
- # We'll send the cycle at which I think things should stop.
- self.send_done[task] = na.ones(1, dtype='int64') * \
+ if self.mine !=0:
+ # I send when I *think* things should finish.
+ self.send_done = na.ones(1, dtype='int64') * \
(self.size / self.vol_ratio -1) + self.comm_cycle_count
- self.done_hooks.append(self._mpi_Isend_long(self.send_done[task], \
- task, tag=15))
- # I need to mark my *own*, too!
- self.recv_done[self.mine] = (self.size / self.vol_ratio -1) + \
- self.comm_cycle_count
+ self.done_hooks.append(self._mpi_Isend_long(self.send_done, \
+ 0, tag=15))
+ else:
+ # As root, I need to mark myself!
+ self.recv_done[0] = na.ones(1, dtype='int64') * \
+ (self.size / self.vol_ratio -1) + self.comm_cycle_count
self.sent_done = True
-
+
def _should_cycle(self):
"""
- Depending on several factors, it returns whether or not the
- communication cycle should continue.
+ Determine if I should continue cycling the communication.
"""
- # We need to continue if:
- # If I'm not finished.
- if self.generated_points < self.total_values: return True
- # If other tasks aren't finished
- if not self._mpi_Request_Testall(self.done_hooks): return True
- # If they are all finished, meaning Testall returns True, we find
- # the biggest value in self.recv_done and stop there.
- stop = max(self.recv_done.values())
- if self.comm_cycle_count < stop:
- self.final_comm_cycle_count += 1
+ if self.mine == 0:
+ # If other tasks aren't finished, this will return False.
+ status = self._mpi_Request_Testall(self.done_hooks)
+ # Convolve this with with root's status.
+ status = status * (self.generated_points == self.total_values)
+ if status == 1:
+ # If they are all finished, meaning Testall returns True,
+ # and root has made its points, we find
+ # the biggest value in self.recv_done and stop there.
+ status = max(self.recv_done.values())
+ else:
+ status = 0
+ # Broadcast the status from root - we stop only if root thinks we should
+ # stop.
+ status = self._mpi_bcast_pickled(status)
+ if status == 0: return True
+ if self.comm_cycle_count < status:
return True
# If we've come this far, we're done.
return False
More information about the yt-svn
mailing list