[Yt-svn] yt-commit r1027 - in trunk/yt/lagos: . hop
mturk at wrangler.dreamhost.com
mturk at wrangler.dreamhost.com
Fri Dec 19 09:43:38 PST 2008
Author: mturk
Date: Fri Dec 19 09:43:36 2008
New Revision: 1027
URL: http://yt.spacepope.org/changeset/1027
Log:
Changed the proxy function to work, and to demand that _distributed and _owned
be set. Changed hierarchy 3d partitioning to work with arbitrary domains.
Changed all the calls to MPI.COMM_WORLD.Barrer() to be calls to the _barrier
function, so tracebacks can be placed.
SS_Hop has some fixes; I get identical results for halo counts (not yet sure
about halo sizes, etc) on one proc as on two for a small dataset. SS is going
to test this now.
Fixes for GridCollection, which didn't quite work. Additionally, the grabbing
of particles from a GridCollection was something of a corner case I have now
fixed.
Modified:
trunk/yt/lagos/BaseDataTypes.py
trunk/yt/lagos/ParallelTools.py
trunk/yt/lagos/hop/SS_HopOutput.py
Modified: trunk/yt/lagos/BaseDataTypes.py
==============================================================================
--- trunk/yt/lagos/BaseDataTypes.py (original)
+++ trunk/yt/lagos/BaseDataTypes.py Fri Dec 19 09:43:36 2008
@@ -85,6 +85,7 @@
'dz':grid['dz']}
self.real_grid = grid
self.child_mask = 1
+ self.ActiveDimensions = self.data['x'].shape
def __getitem__(self, field):
if field not in self.data.keys():
if field == "RadiusCode":
@@ -1567,7 +1568,6 @@
"""
AMR3DData.__init__(self, center, fields, pf, **kwargs)
self._grids = na.array(grid_list)
- self.fields = fields
self.connection_pool = True
def _get_list_of_grids(self):
Modified: trunk/yt/lagos/ParallelTools.py
==============================================================================
--- trunk/yt/lagos/ParallelTools.py (original)
+++ trunk/yt/lagos/ParallelTools.py Fri Dec 19 09:43:36 2008
@@ -114,9 +114,11 @@
@wraps(func)
def single_proc_results(self, *args, **kwargs):
retval = None
+ if not self._distributed:
+ return func(self, *args, **kwargs)
if self._owned:
retval = func(self, *args, **kwargs)
- MPI.COMM_WORLD.Bcast(retval)
+ retval = MPI.COMM_WORLD.Bcast(retval, root=MPI.COMM_WORLD.rank)
return retval
return single_proc_results
@@ -180,15 +182,16 @@
return True, reg
def _partition_hierarchy_3d(self, padding=0.0):
+ LE, RE = self.pf["DomainLeftEdge"], self.pf["DomainRightEdge"]
if not parallel_capable:
- return False, self.hierarchy.grid_collection(self.center, self.hierarchy.grids)
+ return False, LE, RE, self.hierarchy.grid_collection(self.center, self.hierarchy.grids)
cc = MPI.Compute_dims(MPI.COMM_WORLD.size, 3)
mi = MPI.COMM_WORLD.rank
cx, cy, cz = na.unravel_index(mi, cc)
- x = na.mgrid[0:1:(cc[0]+1)*1j][cx:cx+2]
- y = na.mgrid[0:1:(cc[1]+1)*1j][cy:cy+2]
- z = na.mgrid[0:1:(cc[2]+1)*1j][cz:cz+2]
+ x = na.mgrid[LE[0]:RE[0]:(cc[0]+1)*1j][cx:cx+2]
+ y = na.mgrid[LE[1]:RE[1]:(cc[1]+1)*1j][cy:cy+2]
+ z = na.mgrid[LE[2]:RE[2]:(cc[2]+1)*1j][cz:cz+2]
LE = na.array([x[0], y[0], z[0]], dtype='float64')
RE = na.array([x[1], y[1], z[1]], dtype='float64')
@@ -197,13 +200,16 @@
return True, \
LE, RE, self.hierarchy.periodic_region(self.center, LE-padding, RE+padding)
- return LE, RE, self.hierarchy.region(self.center, LE, RE)
+ return False, LE, RE, self.hierarchy.region(self.center, LE, RE)
+ def _barrier(self):
+ if not parallel_capable: return
+ mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
+ MPI.COMM_WORLD.Barrier()
@parallel_passthrough
def _mpi_catdict(self, data):
- mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
- MPI.COMM_WORLD.Barrier()
+ self._barrier()
field_keys = data.keys()
field_keys.sort()
np = MPI.COMM_WORLD.size
@@ -216,10 +222,9 @@
axis=-1)
else:
MPI.COMM_WORLD.Send(data[key], dest=0, tag=0)
- MPI.COMM_WORLD.Barrier()
+ self._barrier()
data[key] = MPI.COMM_WORLD.Bcast(data[key], root=0)
- mylog.debug("Done joining dictionary on %s", MPI.COMM_WORLD.rank)
- MPI.COMM_WORLD.Barrier()
+ self._barrier()
return data
@parallel_passthrough
@@ -241,15 +246,14 @@
@parallel_passthrough
def _mpi_catlist(self, data):
- mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
- MPI.COMM_WORLD.Barrier()
+ self._barrier()
if MPI.COMM_WORLD.rank == 0:
data = self.__mpi_recvlist(data)
else:
MPI.COMM_WORLD.Send(data, dest=0, tag=0)
mylog.debug("Opening MPI Broadcast on %s", MPI.COMM_WORLD.rank)
data = MPI.COMM_WORLD.Bcast(data, root=0)
- MPI.COMM_WORLD.Barrier()
+ self._barrier()
return data
@parallel_passthrough
@@ -262,15 +266,14 @@
@parallel_passthrough
def _mpi_catarray(self, data):
- mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
- MPI.COMM_WORLD.Barrier()
+ self._barrier()
if MPI.COMM_WORLD.rank == 0:
data = self.__mpi_recvarray(data)
else:
MPI.COMM_WORLD.Send(data, dest=0, tag=0)
mylog.debug("Opening MPI Broadcast on %s", MPI.COMM_WORLD.rank)
data = MPI.COMM_WORLD.Bcast(data, root=0)
- MPI.COMM_WORLD.Barrier()
+ self._barrier()
return data
def _should_i_write(self):
@@ -285,13 +288,13 @@
@parallel_passthrough
def _mpi_allsum(self, data):
- MPI.COMM_WORLD.Barrier()
+ self._barrier()
return MPI.COMM_WORLD.Allreduce(data, op=MPI.SUM)
def _mpi_info_dict(self, info):
+ mylog.info("Parallel capable: %s", parallel_capable)
if not parallel_capable: return 0, {0:info}
- mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
- MPI.COMM_WORLD.Barrier()
+ self._barrier()
data = None
if MPI.COMM_WORLD.rank == 0:
data = {0:info}
@@ -301,7 +304,7 @@
MPI.COMM_WORLD.Send(info, dest=0, tag=0)
mylog.debug("Opening MPI Broadcast on %s", MPI.COMM_WORLD.rank)
data = MPI.COMM_WORLD.Bcast(data, root=0)
- MPI.COMM_WORLD.Barrier()
+ self._barrier()
return MPI.COMM_WORLD.rank, data
def _get_dependencies(self, fields):
Modified: trunk/yt/lagos/hop/SS_HopOutput.py
==============================================================================
--- trunk/yt/lagos/hop/SS_HopOutput.py (original)
+++ trunk/yt/lagos/hop/SS_HopOutput.py Fri Dec 19 09:43:36 2008
@@ -148,14 +148,17 @@
HOP-identified halo.
"""
__metaclass__ = ParallelDummy # This will proxy up our methods
+ _distributed = False
+ _owned = True
dont_wrap = ["get_sphere"]
- def __init__(self, hop_output, id, indices):
+ def __init__(self, hop_output, id, indices = None):
self.hop_output = hop_output
self.id = id
self.data = hop_output.data_source
- self.indices = hop_output._base_indices[indices]
-
+ if indices is not None: self.indices = hop_output._base_indices[indices]
+ # We assume that if indices = None, the instantiator has OTHER plans
+ # for us -- i.e., setting it somehow else
def center_of_mass(self):
"""
Calculate and return the center of mass.
@@ -234,9 +237,11 @@
class HaloFinder(HopList, ParallelAnalysisInterface):
def __init__(self, pf, threshold=160.0, dm_only=True):
self.pf = pf
+ self.hierarchy = pf.h
+ self.center = (pf["DomainRightEdge"] + pf["DomainLeftEdge"])/2.0
# do it once with no padding so the total_mass is correct (no duplicated particles)
self.padding = 0.0
- LE, RE, data_source = self._partition_hierarchy_3d(padding=self.padding)
+ padded, LE, RE, data_source = self._partition_hierarchy_3d(padding=self.padding)
# For scaling the threshold, note that it's a passthrough
total_mass = self._mpi_allsum(data_source["ParticleMassMsun"].sum())
# MJT: Note that instead of this, if we are assuming that the particles
@@ -244,28 +249,30 @@
# object representing the entire domain and sum it "lazily" with
# Derived Quantities.
self.padding = 0.2 #* pf["unitary"] # This should be clevererer
- LE, RE, data_source = self._partition_hierarchy_3d(padding=self.padding)
+ padded, LE, RE, self.data_source = self._partition_hierarchy_3d(padding=self.padding)
self.bounds = (LE, RE)
# reflect particles around the periodic boundary
self._reposition_particles((LE, RE))
# MJT: This is the point where HOP is run, and we have halos for every
# single sub-region
- super(HopList, self).__init__(data_source, threshold, dm_only)
- self._parse_hoplist(hop_list)
- self._join_hoplists(hop_list)
+ super(HaloFinder, self).__init__(self.data_source, threshold, dm_only)
+ self._parse_hoplist()
+ self._join_hoplists()
def _parse_hoplist(self):
- groups, max_dens,hi = [], {}, 0
+ groups, max_dens, hi = [], {}, 0
+ LE, RE = self.bounds
+ print LE, RE
for halo in self._groups:
- max_dens = halo.maximum_density_location()
+ this_max_dens = halo.maximum_density_location()
# if the most dense particle is in the box, keep it
- if na.all((max_dens >= LE+self.padding) & (max_dens < RE-self.padding)):
+ if na.all((this_max_dens >= LE) & (this_max_dens <= RE)):
# Now we add the halo information to OURSELVES, taken from the
# self.hop_list
# We need to mock up the HopList thingie, so we need to set:
# self._max_dens
#
- max_dens[self._max_dens[halo.id]]
+ max_dens[hi] = self._max_dens[halo.id]
groups.append(HopGroup(self, hi))
groups[-1].indices = halo.indices
groups[-1]._owned = True
@@ -274,7 +281,7 @@
self._groups = groups
self._max_dens = max_dens
- def _join_hoplists(self, hop_list):
+ def _join_hoplists(self):
# First we get the total number of halos the entire collection
# has identified
# Note I have added a new method here to help us get information
@@ -302,7 +309,8 @@
# MJT: Sorting doesn't work yet. They need to be sorted.
#haloes.sort(lambda x, y: cmp(len(x.indices),len(y.indices)))
# Unfortunately, we can't sort *just yet*.
- for i,halo in self._groups:
+ for i, halo in enumerate(self._groups):
+ self._distributed = True
halo.id = i
def _reposition_particles(self, bounds):
More information about the yt-svn
mailing list