[Yt-svn] yt-commit r1027 - in trunk/yt/lagos: . hop

mturk at wrangler.dreamhost.com mturk at wrangler.dreamhost.com
Fri Dec 19 09:43:38 PST 2008


Author: mturk
Date: Fri Dec 19 09:43:36 2008
New Revision: 1027
URL: http://yt.spacepope.org/changeset/1027

Log:
Changed the proxy function to work, and to demand that _distributed and _owned
be set.  Changed hierarchy 3d partitioning to work with arbitrary domains.
Changed all the calls to MPI.COMM_WORLD.Barrer() to be calls to the _barrier
function, so tracebacks can be placed.

SS_Hop has some fixes; I get identical results for halo counts (not yet sure
about halo sizes, etc) on one proc as on two for a small dataset.  SS is going
to test this now.

Fixes for GridCollection, which didn't quite work.  Additionally, the grabbing
of particles from a GridCollection was something of a corner case I have now
fixed.



Modified:
   trunk/yt/lagos/BaseDataTypes.py
   trunk/yt/lagos/ParallelTools.py
   trunk/yt/lagos/hop/SS_HopOutput.py

Modified: trunk/yt/lagos/BaseDataTypes.py
==============================================================================
--- trunk/yt/lagos/BaseDataTypes.py	(original)
+++ trunk/yt/lagos/BaseDataTypes.py	Fri Dec 19 09:43:36 2008
@@ -85,6 +85,7 @@
                      'dz':grid['dz']}
         self.real_grid = grid
         self.child_mask = 1
+        self.ActiveDimensions = self.data['x'].shape
     def __getitem__(self, field):
         if field not in self.data.keys():
             if field == "RadiusCode":
@@ -1567,7 +1568,6 @@
         """
         AMR3DData.__init__(self, center, fields, pf, **kwargs)
         self._grids = na.array(grid_list)
-        self.fields = fields
         self.connection_pool = True
 
     def _get_list_of_grids(self):

Modified: trunk/yt/lagos/ParallelTools.py
==============================================================================
--- trunk/yt/lagos/ParallelTools.py	(original)
+++ trunk/yt/lagos/ParallelTools.py	Fri Dec 19 09:43:36 2008
@@ -114,9 +114,11 @@
     @wraps(func)
     def single_proc_results(self, *args, **kwargs):
         retval = None
+        if not self._distributed:
+            return func(self, *args, **kwargs)
         if self._owned:
             retval = func(self, *args, **kwargs)
-        MPI.COMM_WORLD.Bcast(retval)
+        retval = MPI.COMM_WORLD.Bcast(retval, root=MPI.COMM_WORLD.rank)
         return retval
     return single_proc_results
 
@@ -180,15 +182,16 @@
         return True, reg
 
     def _partition_hierarchy_3d(self, padding=0.0):
+        LE, RE = self.pf["DomainLeftEdge"], self.pf["DomainRightEdge"]
         if not parallel_capable:
-           return False, self.hierarchy.grid_collection(self.center, self.hierarchy.grids)
+           return False, LE, RE, self.hierarchy.grid_collection(self.center, self.hierarchy.grids)
 
         cc = MPI.Compute_dims(MPI.COMM_WORLD.size, 3)
         mi = MPI.COMM_WORLD.rank
         cx, cy, cz = na.unravel_index(mi, cc)
-        x = na.mgrid[0:1:(cc[0]+1)*1j][cx:cx+2]
-        y = na.mgrid[0:1:(cc[1]+1)*1j][cy:cy+2]
-        z = na.mgrid[0:1:(cc[2]+1)*1j][cz:cz+2]
+        x = na.mgrid[LE[0]:RE[0]:(cc[0]+1)*1j][cx:cx+2]
+        y = na.mgrid[LE[1]:RE[1]:(cc[1]+1)*1j][cy:cy+2]
+        z = na.mgrid[LE[2]:RE[2]:(cc[2]+1)*1j][cz:cz+2]
 
         LE = na.array([x[0], y[0], z[0]], dtype='float64')
         RE = na.array([x[1], y[1], z[1]], dtype='float64')
@@ -197,13 +200,16 @@
             return True, \
                 LE, RE, self.hierarchy.periodic_region(self.center, LE-padding, RE+padding)
 
-        return LE, RE, self.hierarchy.region(self.center, LE, RE)
+        return False, LE, RE, self.hierarchy.region(self.center, LE, RE)
         
+    def _barrier(self):
+        if not parallel_capable: return
+        mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
+        MPI.COMM_WORLD.Barrier()
 
     @parallel_passthrough
     def _mpi_catdict(self, data):
-        mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
-        MPI.COMM_WORLD.Barrier()
+        self._barrier()
         field_keys = data.keys()
         field_keys.sort()
         np = MPI.COMM_WORLD.size
@@ -216,10 +222,9 @@
                     axis=-1)
             else:
                 MPI.COMM_WORLD.Send(data[key], dest=0, tag=0)
-            MPI.COMM_WORLD.Barrier()
+            self._barrier()
             data[key] = MPI.COMM_WORLD.Bcast(data[key], root=0)
-        mylog.debug("Done joining dictionary on %s", MPI.COMM_WORLD.rank)
-        MPI.COMM_WORLD.Barrier()
+        self._barrier()
         return data
 
     @parallel_passthrough
@@ -241,15 +246,14 @@
 
     @parallel_passthrough
     def _mpi_catlist(self, data):
-        mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
-        MPI.COMM_WORLD.Barrier()
+        self._barrier()
         if MPI.COMM_WORLD.rank == 0:
             data = self.__mpi_recvlist(data)
         else:
             MPI.COMM_WORLD.Send(data, dest=0, tag=0)
         mylog.debug("Opening MPI Broadcast on %s", MPI.COMM_WORLD.rank)
         data = MPI.COMM_WORLD.Bcast(data, root=0)
-        MPI.COMM_WORLD.Barrier()
+        self._barrier()
         return data
 
     @parallel_passthrough
@@ -262,15 +266,14 @@
 
     @parallel_passthrough
     def _mpi_catarray(self, data):
-        mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
-        MPI.COMM_WORLD.Barrier()
+        self._barrier()
         if MPI.COMM_WORLD.rank == 0:
             data = self.__mpi_recvarray(data)
         else:
             MPI.COMM_WORLD.Send(data, dest=0, tag=0)
         mylog.debug("Opening MPI Broadcast on %s", MPI.COMM_WORLD.rank)
         data = MPI.COMM_WORLD.Bcast(data, root=0)
-        MPI.COMM_WORLD.Barrier()
+        self._barrier()
         return data
 
     def _should_i_write(self):
@@ -285,13 +288,13 @@
 
     @parallel_passthrough
     def _mpi_allsum(self, data):
-        MPI.COMM_WORLD.Barrier()
+        self._barrier()
         return MPI.COMM_WORLD.Allreduce(data, op=MPI.SUM)
 
     def _mpi_info_dict(self, info):
+        mylog.info("Parallel capable: %s", parallel_capable)
         if not parallel_capable: return 0, {0:info}
-        mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
-        MPI.COMM_WORLD.Barrier()
+        self._barrier()
         data = None
         if MPI.COMM_WORLD.rank == 0:
             data = {0:info}
@@ -301,7 +304,7 @@
             MPI.COMM_WORLD.Send(info, dest=0, tag=0)
         mylog.debug("Opening MPI Broadcast on %s", MPI.COMM_WORLD.rank)
         data = MPI.COMM_WORLD.Bcast(data, root=0)
-        MPI.COMM_WORLD.Barrier()
+        self._barrier()
         return MPI.COMM_WORLD.rank, data
 
     def _get_dependencies(self, fields):

Modified: trunk/yt/lagos/hop/SS_HopOutput.py
==============================================================================
--- trunk/yt/lagos/hop/SS_HopOutput.py	(original)
+++ trunk/yt/lagos/hop/SS_HopOutput.py	Fri Dec 19 09:43:36 2008
@@ -148,14 +148,17 @@
     HOP-identified halo.
     """
     __metaclass__ = ParallelDummy # This will proxy up our methods
+    _distributed = False
+    _owned = True
     dont_wrap = ["get_sphere"]
 
-    def __init__(self, hop_output, id, indices):
+    def __init__(self, hop_output, id, indices = None):
         self.hop_output = hop_output
         self.id = id
         self.data = hop_output.data_source
-        self.indices = hop_output._base_indices[indices]
-        
+        if indices is not None: self.indices = hop_output._base_indices[indices]
+        # We assume that if indices = None, the instantiator has OTHER plans
+        # for us -- i.e., setting it somehow else
     def center_of_mass(self):
         """
         Calculate and return the center of mass.
@@ -234,9 +237,11 @@
 class HaloFinder(HopList, ParallelAnalysisInterface):
     def __init__(self, pf, threshold=160.0, dm_only=True):
         self.pf = pf
+        self.hierarchy = pf.h
+        self.center = (pf["DomainRightEdge"] + pf["DomainLeftEdge"])/2.0
         # do it once with no padding so the total_mass is correct (no duplicated particles)
         self.padding = 0.0
-        LE, RE, data_source = self._partition_hierarchy_3d(padding=self.padding)
+        padded, LE, RE, data_source = self._partition_hierarchy_3d(padding=self.padding)
         # For scaling the threshold, note that it's a passthrough
         total_mass = self._mpi_allsum(data_source["ParticleMassMsun"].sum())
         # MJT: Note that instead of this, if we are assuming that the particles
@@ -244,28 +249,30 @@
         # object representing the entire domain and sum it "lazily" with
         # Derived Quantities.
         self.padding = 0.2 #* pf["unitary"] # This should be clevererer
-        LE, RE, data_source = self._partition_hierarchy_3d(padding=self.padding)
+        padded, LE, RE, self.data_source = self._partition_hierarchy_3d(padding=self.padding)
         self.bounds = (LE, RE)
         # reflect particles around the periodic boundary
         self._reposition_particles((LE, RE))
         # MJT: This is the point where HOP is run, and we have halos for every
         # single sub-region
-        super(HopList, self).__init__(data_source, threshold, dm_only)
-        self._parse_hoplist(hop_list)
-        self._join_hoplists(hop_list)
+        super(HaloFinder, self).__init__(self.data_source, threshold, dm_only)
+        self._parse_hoplist()
+        self._join_hoplists()
 
     def _parse_hoplist(self):
-        groups, max_dens,hi  = [], {}, 0
+        groups, max_dens, hi  = [], {}, 0
+        LE, RE = self.bounds
+        print LE, RE
         for halo in self._groups:
-            max_dens = halo.maximum_density_location()
+            this_max_dens = halo.maximum_density_location()
             # if the most dense particle is in the box, keep it
-            if na.all((max_dens >= LE+self.padding) & (max_dens < RE-self.padding)):
+            if na.all((this_max_dens >= LE) & (this_max_dens <= RE)):
                 # Now we add the halo information to OURSELVES, taken from the
                 # self.hop_list
                 # We need to mock up the HopList thingie, so we need to set:
                 #     self._max_dens
                 #     
-                max_dens[self._max_dens[halo.id]]
+                max_dens[hi] = self._max_dens[halo.id]
                 groups.append(HopGroup(self, hi))
                 groups[-1].indices = halo.indices
                 groups[-1]._owned = True
@@ -274,7 +281,7 @@
         self._groups = groups
         self._max_dens = max_dens
 
-    def _join_hoplists(self, hop_list):
+    def _join_hoplists(self):
         # First we get the total number of halos the entire collection
         # has identified
         # Note I have added a new method here to help us get information
@@ -302,7 +309,8 @@
         # MJT: Sorting doesn't work yet.  They need to be sorted.
         #haloes.sort(lambda x, y: cmp(len(x.indices),len(y.indices)))
         # Unfortunately, we can't sort *just yet*.
-        for i,halo in self._groups:
+        for i, halo in enumerate(self._groups):
+            self._distributed = True
             halo.id = i
         
     def _reposition_particles(self, bounds):



More information about the yt-svn mailing list