[Yt-svn] yt-commit r727 - branches/parallel_profiles/yt/lagos

mturk at wrangler.dreamhost.com mturk at wrangler.dreamhost.com
Fri Aug 8 15:41:10 PDT 2008


Author: mturk
Date: Fri Aug  8 15:41:08 2008
New Revision: 727
URL: http://yt.spacepope.org/changeset/727

Log:
Projections now work in parallel transparently.  There's a chance some of this
stuff broke parallel profiles.  Looking into it.

PlotCollections will be parallel enabled next.



Modified:
   branches/parallel_profiles/yt/lagos/BaseDataTypes.py
   branches/parallel_profiles/yt/lagos/DataReadingFuncs.py
   branches/parallel_profiles/yt/lagos/HierarchyType.py
   branches/parallel_profiles/yt/lagos/ParallelTools.py
   branches/parallel_profiles/yt/lagos/Profiles.py

Modified: branches/parallel_profiles/yt/lagos/BaseDataTypes.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/BaseDataTypes.py	(original)
+++ branches/parallel_profiles/yt/lagos/BaseDataTypes.py	Fri Aug  8 15:41:08 2008
@@ -718,22 +718,15 @@
         of that weight.
         """
         Enzo2DData.__init__(self, axis, field, pf, **kwargs)
-        if not source:
-            self._check_region = False
-            source = EnzoGridCollection(center, self.hierarchy.grids)
-            self._okay_to_serialize = True
-        else:
-            self._okay_to_serialize = False
-            self._check_region = True
-        self.source = source
+        self.center = center
+        self._initialize_source()
         self._grids = self.source._grids
         if max_level == None:
             max_level = self.hierarchy.maxLevel
-        if source is not None:
-            max_level = min(max_level, source.gridLevels.max())
+        if self.source is not None:
+            max_level = min(max_level, self.source.gridLevels.max())
         self._max_level = max_level
         self._weight = weight_field
-        self.center = center
         self.func = na.sum # for the future
         self.__retval_coords = {}
         self.__retval_fields = {}
@@ -747,6 +740,16 @@
             self._refresh_data()
             if self._okay_to_serialize: self._serialize()
 
+    def _initialize_source(self, source = None):
+        if source is None:
+            check, source = self._partition_hierarchy_2d(self.axis)
+            self._check_region = check
+            self._okay_to_serialize = (not check)
+        else:
+            self._okay_to_serialize = False
+            self._check_region = True
+        self.source = source
+
     #@time_execution
     def __cache_data(self):
         rdf = self.hierarchy.grid.readDataFast
@@ -783,6 +786,7 @@
         mylog.info("Finished calculating overlap.")
 
     def _serialize(self):
+        if not self.should_i_write(): return
         mylog.info("Serializing data...")
         node_name = "%s_%s_%s" % (self.fields[0], self._weight, self.axis)
         mylog.info("nodeName: %s", node_name)
@@ -819,8 +823,6 @@
 
     def __project_level(self, level, fields):
         grids_to_project = self.source.select_grids(level)
-        self.hierarchy.queue.preload(grids_to_project,
-                                     self._get_dependencies(fields))
         dls, convs = self.__get_dls(grids_to_project[0], fields)
         zero_out = (level != self._max_level)
         pbar = get_pbar('Projecting  level % 2i / % 2i ' \
@@ -932,6 +934,8 @@
         coord_data = []
         field_data = []
         dxs = []
+        self.hierarchy.queue.preload(self.source._grids,
+                                     self._get_dependencies(fields))
         for level in range(0, self._max_level+1):
             my_coords, my_dx, my_fields = self.__project_level(level, fields)
             coord_data.append(my_coords)
@@ -950,14 +954,20 @@
         field_data = na.concatenate(field_data, axis=1)
         dxs = na.concatenate(dxs, axis=1)
         # We now convert to half-widths and center-points
-        self.data['pdx'] = dxs
-        self.data['px'] = (coord_data[0,:]+0.5) * self['pdx']
-        self.data['py'] = (coord_data[1,:]+0.5) * self['pdx']
-        self.data['pdx'] *= 0.5
-        self.data['pdy'] = self.data['pdx'].copy()
+        data = {}
+        data['pdx'] = dxs
+        data['px'] = (coord_data[0,:]+0.5) * data['pdx']
+        data['py'] = (coord_data[1,:]+0.5) * data['pdx']
+        data['pdx'] *= 0.5
+        data['pdy'] = data['pdx'].copy()
+        data['fields'] = field_data
+        data['weight_field'] = coord_data[3,:]
+        # Now we run the finalizer, which is ignored if we don't need it
+        data = self._mpi_catdict(data)
+        field_data = data.pop('fields')
         for fi, field in enumerate(fields):
             self[field] = field_data[fi,:]
-        self.data['weight_field'] = coord_data[3,:]
+        for i in data.keys(): self[i] = data.pop(i)
 
     def add_fields(self, fields, weight = "CellMassMsun"):
         pass

Modified: branches/parallel_profiles/yt/lagos/DataReadingFuncs.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/DataReadingFuncs.py	(original)
+++ branches/parallel_profiles/yt/lagos/DataReadingFuncs.py	Fri Aug  8 15:41:08 2008
@@ -189,3 +189,4 @@
             data = HDF5LightReader.ReadMultipleGrids(file, nodes, sets)
             mylog.debug("Read %s items from %s", len(data), os.path.basename(file))
             for gid in data: self.queue[gid].update(data[gid])
+        mylog.debug("Finished read of %s", sets)

Modified: branches/parallel_profiles/yt/lagos/HierarchyType.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/HierarchyType.py	(original)
+++ branches/parallel_profiles/yt/lagos/HierarchyType.py	Fri Aug  8 15:41:08 2008
@@ -179,6 +179,7 @@
         self.cutting = classobj("EnzoCuttingPlane",(EnzoCuttingPlaneBase,), dd)
         self.ray = classobj("EnzoOrthoRay",(EnzoOrthoRayBase,), dd)
         self.disk = classobj("EnzoCylinder",(EnzoCylinderBase,), dd)
+        self.grid_collection = classobj("EnzoGridCollection",(EnzoGridCollection,), dd)
 
     def __initialize_data_file(self):
         if not ytcfg.getboolean('lagos','serialize'): return

Modified: branches/parallel_profiles/yt/lagos/ParallelTools.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/ParallelTools.py	(original)
+++ branches/parallel_profiles/yt/lagos/ParallelTools.py	Fri Aug  8 15:41:08 2008
@@ -106,8 +106,55 @@
     def _finalize_parallel(self):
         pass
 
+    def _partition_hierarchy_2d(self, axis):
+        if not parallel_capable:
+           return False, self.hierarchy.grid_collection(self.center, self.hierarchy.grids)
+
+        cc = MPI.Compute_dims(MPI.COMM_WORLD.size, 2)
+        mi = MPI.COMM_WORLD.rank
+        cx, cy = na.unravel_index(mi, cc)
+        x = na.mgrid[0:1:(cc[0]+1)*1j][cx:cx+2]
+        y = na.mgrid[0:1:(cc[1]+1)*1j][cy:cy+2]
+
+        LE = na.zeros(3, dtype='float64')
+        RE = na.ones(3, dtype='float64')
+        LE[x_dict[axis]] = x[0]  # It actually doesn't matter if this is x or y
+        RE[x_dict[axis]] = x[1]
+        LE[y_dict[axis]] = y[0]
+        RE[y_dict[axis]] = y[1]
+
+        return True, self.hierarchy.region(self.center, LE, RE)
+
+    def _mpi_catdict(self, data):
+        if not parallel_capable: return data
+        mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
+        MPI.COMM_WORLD.Barrier()
+        if MPI.COMM_WORLD.rank == 0:
+            data = self.__mpi_recvdict(data)
+        else:
+            MPI.COMM_WORLD.Send(data, dest=0, tag=0)
+        mylog.debug("Opening MPI Broadcast on %s", MPI.COMM_WORLD.rank)
+        data = MPI.COMM_WORLD.Bcast(data, root=0)
+        MPI.COMM_WORLD.Barrier()
+        return data
+
+    def __mpi_recvdict(self, data):
+        # First we receive, then we make a new dict.
+        for i in range(1,MPI.COMM_WORLD.size):
+            buf = MPI.COMM_WORLD.Recv(source=i, tag=0)
+            for j in buf: data[j] = na.concatenate([data[j],buf[j]], axis=-1)
+        return data
+
+    def _should_i_write(self):
+        if not parallel_capable: return True
+        return (MPI.COMM_WORLD == 0)
+
+    def _mpi_allsum(self, data):
+        MPI.COMM_WORLD.Barrier()
+        return MPI.COMM_WORLD.Allreduce(data, op=MPI.SUM)
+
     def _get_dependencies(self, fields):
         deps = []
         for field in fields:
-            deps += fieldInfo[field].get_dependencies().requested
+            deps += ensure_list(fieldInfo[field].get_dependencies().requested)
         return list(set(deps))

Modified: branches/parallel_profiles/yt/lagos/Profiles.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/Profiles.py	(original)
+++ branches/parallel_profiles/yt/lagos/Profiles.py	Fri Aug  8 15:41:08 2008
@@ -98,15 +98,11 @@
         del self.__data, self.__weight_data, self.__used
 
     def _finalize_parallel(self):
-        from mpi4py import MPI # I am displeased by importing here
-        MPI.COMM_WORLD.Barrier()
         for key in self.__data:
-            self.__data[key] = \
-                MPI.COMM_WORLD.Allreduce(self.__data[key], op=MPI.SUM)
+            self.__data[key] = self._mpi_gather(self.__data[key])
         for key in self.__weight_data:
-            self.__weight_data[key] = \
-                MPI.COMM_WORLD.Allreduce(self.__weight_data[key], op=MPI.SUM)
-        self.__used = MPI.COMM_WORLD.Allreduce(self.__used, op=MPI.SUM)
+            self.__weight_data[key] = self._mpi_gather(self.__weight_data[key])
+        self.__used = self._mpi_allsum(self.__used)
 
     def _unlazy_add_fields(self, fields, weight, accumulation):
         for field in fields:



More information about the yt-svn mailing list