[Yt-svn] yt-commit r1026 - in trunk/yt/lagos: . hop

mturk at wrangler.dreamhost.com mturk at wrangler.dreamhost.com
Thu Dec 18 21:28:40 PST 2008


Author: mturk
Date: Thu Dec 18 21:28:39 2008
New Revision: 1026
URL: http://yt.spacepope.org/changeset/1026

Log:
Lots of modifications to SS_HopOutput; almost ready now to test and run.

Added some mods to the parallel tools, including _mpi_info_dict and a
metaclass called ParallelDummy for objects that exist on all but only REALLY
exist on one.



Modified:
   trunk/yt/lagos/ParallelTools.py
   trunk/yt/lagos/hop/SS_HopOutput.py

Modified: trunk/yt/lagos/ParallelTools.py
==============================================================================
--- trunk/yt/lagos/ParallelTools.py	(original)
+++ trunk/yt/lagos/ParallelTools.py	Thu Dec 18 21:28:39 2008
@@ -120,6 +120,18 @@
         return retval
     return single_proc_results
 
+class ParallelDummy(type):
+    # All attributes that don't start with _ get replaced with
+    # parallel_simple_proxy attributes.
+    def __init__(cls, name, bases, d):
+        super(ParallelDummy, cls).__init__(name, bases, d)
+        skip = d.pop("dont_wrap", [])
+        for attrname in d:
+            if attrname.startswith("_") or attrname in skip: continue
+            attr = getattr(cls, attrname)
+            if type(attr) == types.MethodType:
+                setattr(cls, attrname, parallel_simple_proxy(attr))
+
 def parallel_passthrough(func):
     @wraps(func)
     def passage(self, data):
@@ -221,8 +233,9 @@
     @parallel_passthrough
     def __mpi_recvlist(self, data):
         # First we receive, then we make a new list.
+        data = ensure_list(data)
         for i in range(1,MPI.COMM_WORLD.size):
-            buf = MPI.COMM_WORLD.Recv(source=i, tag=0)
+            buf = ensure_list(MPI.COMM_WORLD.Recv(source=i, tag=0))
             data += buf
         return na.array(data)
 
@@ -275,6 +288,21 @@
         MPI.COMM_WORLD.Barrier()
         return MPI.COMM_WORLD.Allreduce(data, op=MPI.SUM)
 
+    def _mpi_info_dict(self, info):
+        if not parallel_capable: return 0, {0:info}
+        mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
+        MPI.COMM_WORLD.Barrier()
+        data = None
+        if MPI.COMM_WORLD.rank == 0:
+            data = {0:info}
+            for i in range(1, MPI.COMM_WORLD.size):
+                data[i] = MPI.COMM_WORLD.Recv(source=i, tag=0)
+        else:
+            MPI.COMM_WORLD.Send(info, dest=0, tag=0)
+        mylog.debug("Opening MPI Broadcast on %s", MPI.COMM_WORLD.rank)
+        data = MPI.COMM_WORLD.Bcast(data, root=0)
+        MPI.COMM_WORLD.Barrier()
+        return MPI.COMM_WORLD.rank, data
 
     def _get_dependencies(self, fields):
         deps = []

Modified: trunk/yt/lagos/hop/SS_HopOutput.py
==============================================================================
--- trunk/yt/lagos/hop/SS_HopOutput.py	(original)
+++ trunk/yt/lagos/hop/SS_HopOutput.py	Thu Dec 18 21:28:39 2008
@@ -147,6 +147,9 @@
     A data source that returns particle information about the members of a
     HOP-identified halo.
     """
+    __metaclass__ = ParallelDummy # This will proxy up our methods
+    dont_wrap = ["get_sphere"]
+
     def __init__(self, hop_output, id, indices):
         self.hop_output = hop_output
         self.id = id
@@ -228,84 +231,86 @@
                         center, radius=radius)
         return sphere
 
-class HaloFinder(ParallelAnalysisInterface):
+class HaloFinder(HopList, ParallelAnalysisInterface):
     def __init__(self, pf, threshold=160.0, dm_only=True):
         self.pf = pf
-        self.hierarchy = pf.hierarchy
         # do it once with no padding so the total_mass is correct (no duplicated particles)
         self.padding = 0.0
-        LE, RE, self.source = self._partition_hierarchy_3d(padding=self.padding)
+        LE, RE, data_source = self._partition_hierarchy_3d(padding=self.padding)
         # For scaling the threshold, note that it's a passthrough
-        total_mass = self._mpi_allsum(self.source["ParticleMassMsun"].sum())
-        self.padding = 0.2 #* pf["unitary"]
-        LE, RE, self.source = self._partition_hierarchy_3d(padding=self.padding)
+        total_mass = self._mpi_allsum(data_source["ParticleMassMsun"].sum())
+        # MJT: Note that instead of this, if we are assuming that the particles
+        # are all on different processors, we should instead construct an
+        # object representing the entire domain and sum it "lazily" with
+        # Derived Quantities.
+        self.padding = 0.2 #* pf["unitary"] # This should be clevererer
+        LE, RE, data_source = self._partition_hierarchy_3d(padding=self.padding)
         self.bounds = (LE, RE)
         # reflect particles around the periodic boundary
         self._reposition_particles((LE, RE))
-        # calculate hop on each sub region
-        hop_list = HopList(self.source, threshold, dm_only)
-        # include only haloes that reside in the real part of the box
+        # MJT: This is the point where HOP is run, and we have halos for every
+        # single sub-region
+        super(HopList, self).__init__(data_source, threshold, dm_only)
         self._parse_hoplist(hop_list)
-        # collect all the haloes into one big ordered list
         self._join_hoplists(hop_list)
 
-    @parallel_passthrough
-    # on each processor, go through the hoplist and throw away haloes that aren't in the 'real'
-    # part of the subbox, and add the good ones to a list, _groups
-    def _parse_hoplist(self, hop_list):
-        LE, RE = bounds
-        # find the unique halo ids
-        unique_ids = na.unique(hop_list.tags)
-        # find the size of each halo
-        counts = na.bincount(hop_list.tags)
-        # sort the particles by their halo ID
-        sort_indices = na.argsort(hop_list.tags)
-        # grab the particles into a simple list
-        grab_indices = na.indices(hop_list.tags.shape).ravel()
-        # get the densities of the particles, ordered as above
-        dens = hop_list.densities[sort_indices]
-        cp = 0
-        print 'uids %d' % (unique_ids.size)
-        # now we'll loop over the haloes, and only keep the real ones
-        for ii in unique_ids:
-            cp_c = cp + counts[ii+1]
-            if ii == -1:
-                cp += counts[ii+1]
-                continue
-            group_indices = grab_indices[cp:cp_c]
-            # find the id and position of the densest particle
-            md_i = na.argmax(dens[cp:cp_c])
-            px = self.particle_fields["particle_position_x"][group_indices]
-            px, py, pz = [self.particle_fields["particle_position_%s"%ax][group_indices] for ax in 'xyz']
-            max_dens = (dens[cp:cp_c][md_i], px[md_i], py[md_i], pz[md_i])
+    def _parse_hoplist(self):
+        groups, max_dens,hi  = [], {}, 0
+        for halo in self._groups:
+            max_dens = halo.maximum_density_location()
             # if the most dense particle is in the box, keep it
-            if ((max_dens[1:3] >= LE+self.padding) and (max_dens[1:3] < RE-self.padding)):
-                self._groups.append(HopGroup(self, ii, group_indices)
-                self._max_dens[ii] = max_dens
-            cp += counts[ii+1]
-    
-    @parallel_passthrough
+            if na.all((max_dens >= LE+self.padding) & (max_dens < RE-self.padding)):
+                # Now we add the halo information to OURSELVES, taken from the
+                # self.hop_list
+                # We need to mock up the HopList thingie, so we need to set:
+                #     self._max_dens
+                #     
+                max_dens[self._max_dens[halo.id]]
+                groups.append(HopGroup(self, hi))
+                groups[-1].indices = halo.indices
+                groups[-1]._owned = True
+                hi += 1
+        del self._groups, self._max_dens # explicit >> implicit
+        self._groups = groups
+        self._max_dens = max_dens
+
     def _join_hoplists(self, hop_list):
         # First we get the total number of halos the entire collection
         # has identified
-        nhalos = self._mpi_allsum(len(_groups))
-        # I don't know how this stuff works, so I'm going to sketch it out.
-        # collect all the HopGroups into one big list
-        haloes = self._mpi_allcollectwhatever(_groups)
+        # Note I have added a new method here to help us get information
+        # about processors and ownership and so forth.
+        # _mpi_info_dict returns a dict of {proc: whatever} where whatever is
+        # what is fed in on each proc.
+        mine, halo_info = self._mpi_info_dict(len(self))
+        nhalos = sum(halo_info.values())
+        # Figure out our offset
+        my_first_id = sum([v for k,v in halo_info.items() if k < mine])
+        # Fix our max_dens
+        max_dens = {}
+        for i,m in self._max_dens.items(): max_dens[i+my_first_id] = m
+        self._max_dens = max_dens
         # sort the list by the size of the groups
-        haloes.sort(lambda x, y: cmp(len(x.indices),len(y.indices)))
-        # reassign their ID
-        for i,halo in enumerate(haloes):
+        # Now we add ghost halos and reassign all the IDs
+        # Note: we already know which halos we own!
+        after = nhalos - (my_first_id + len(self._groups))
+        # One single fake halo, not owned, does the trick
+        fake_halo = HopGroup(self, 0)
+        fake_halo._owned = False
+        self._groups = [fake_halo] * my_first_id + \
+                       self._groups + \
+                       [fake_halo] * after
+        # MJT: Sorting doesn't work yet.  They need to be sorted.
+        #haloes.sort(lambda x, y: cmp(len(x.indices),len(y.indices)))
+        # Unfortunately, we can't sort *just yet*.
+        for i,halo in self._groups:
             halo.id = i
         
-    
-    @parallel_passthrough
     def _reposition_particles(self, bounds):
         # This only does periodicity.  We do NOT want to deal with anything
         # else.  The only reason we even do periodicity is the 
         LE, RE = bounds
         dw = self.pf["DomainRightEdge"] - self.pf["DomainLeftEdge"]
         for i, ax in enumerate('xyz'):
-            arr = self.source["particle_position_%s" % ax]
+            arr = self.data_source["particle_position_%s" % ax]
             arr[arr < LE[i]-self.padding] += dw[i]
             arr[arr > RE[i]+self.padding] -= dw[i]



More information about the yt-svn mailing list