[Yt-svn] yt-commit r700 - in branches/parallel_profiles/yt: . lagos

mturk at wrangler.dreamhost.com mturk at wrangler.dreamhost.com
Mon Jul 28 12:59:49 PDT 2008


Author: mturk
Date: Mon Jul 28 12:59:48 2008
New Revision: 700
URL: http://yt.spacepope.org/changeset/700

Log:
Okay, committing some changes from the other day.  Right now you either o a
lazy_profile where it pre-loads everything, *thus making it completely
irrelevant* or you run it all in core *which does the same thing but doesn't
pre-load*.  It's a fine place we're in right now.  But.  I'm thinking on how to
refactor this to bring it back to usability.  For now, it is surprisingly
functional as a parallel analysis toolkit.  Queueing of data and preloading
works great...



Modified:
   branches/parallel_profiles/yt/lagos/BaseGridType.py
   branches/parallel_profiles/yt/lagos/DataReadingFuncs.py
   branches/parallel_profiles/yt/lagos/HDF5LightReader.c
   branches/parallel_profiles/yt/lagos/HierarchyType.py
   branches/parallel_profiles/yt/lagos/Profiles.py
   branches/parallel_profiles/yt/parallel_tools.py

Modified: branches/parallel_profiles/yt/lagos/BaseGridType.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/BaseGridType.py	(original)
+++ branches/parallel_profiles/yt/lagos/BaseGridType.py	Mon Jul 28 12:59:48 2008
@@ -85,8 +85,13 @@
                 if fieldInfo.has_key(field):
                     conv_factor = fieldInfo[field]._convert_function(self)
                 try:
-                    self[field] = self.readDataFast(field) * conv_factor
-                except self._read_exception:
+                    if hasattr(self.hierarchy, 'queue'):
+                        temp = self.hierarchy.queue.pop(self, field)
+                    else:
+                        temp = self.readDataFast(field)
+                    self[field] = temp * conv_factor
+                except self._read_exception, exc:
+                    print exc
                     if field in fieldInfo:
                         if fieldInfo[field].particle_type:
                             self[field] = na.array([],dtype='int64')

Modified: branches/parallel_profiles/yt/lagos/DataReadingFuncs.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/DataReadingFuncs.py	(original)
+++ branches/parallel_profiles/yt/lagos/DataReadingFuncs.py	Mon Jul 28 12:59:48 2008
@@ -156,7 +156,7 @@
 
     def pop(self, grid, field):
         if grid.id in self.queue and field in self.queue[grid.id]:
-            return self.queue[grid.id].pop(field)
+            return self.modify(self.queue[grid.id].pop(field))
         else:
             # We only read the one set and do not store it if it isn't pre-loaded
             return self._read_set(grid, field)
@@ -170,13 +170,22 @@
         self.queue[grid][field] = data
 
 class DataQueuePackedHDF5(BaseDataQueue):
-    _read_set = readDataHDF5
+
+    def _read_set(self, grid, field):
+        return readDataPacked(grid, field)
+
+    def modify(self, field):
+        return field.swapaxes(0,2)
 
     def preload(self, grids, sets):
         # We need to deal with files first
         files_keys = defaultdict(lambda: [])
-        for g in sorted(grids): files_keys[g.filename].append(g)
+        sets = list(sets)
+        for g in grids: files_keys[g.filename].append(g)
         for file in files_keys:
-            nodes = [g.id for g in grids]
+            mylog.debug("Starting read %s", file)
+            nodes = [g.id for g in files_keys[file]]
+            nodes.sort()
             data = HDF5LightReader.ReadMultipleGrids(file, nodes, sets)
+            mylog.debug("Read %s items from %s", len(data), os.path.basename(file))
             for gid in data: self.queue[gid].update(data[gid])

Modified: branches/parallel_profiles/yt/lagos/HDF5LightReader.c
==============================================================================
--- branches/parallel_profiles/yt/lagos/HDF5LightReader.c	(original)
+++ branches/parallel_profiles/yt/lagos/HDF5LightReader.c	Mon Jul 28 12:59:48 2008
@@ -525,7 +525,7 @@
             if (cur_data == NULL) {
               PyErr_Format(_hdf5ReadError,
                   "ReadHDF5DataSet: Error reading (%s, %s, %s)",
-                  filename, grid_key, grid_node);
+                  filename, grid_node_name, set_name);
               goto _fail;
             }
             PyDict_SetItem(grid_data, oset_name, (PyObject *) cur_data);

Modified: branches/parallel_profiles/yt/lagos/HierarchyType.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/HierarchyType.py	(original)
+++ branches/parallel_profiles/yt/lagos/HierarchyType.py	Mon Jul 28 12:59:48 2008
@@ -147,6 +147,7 @@
             if len(list_of_sets) == 0:
                 mylog.debug("Detected packed HDF5")
                 self.data_style = 6
+                self.queue = DataQueuePackedHDF5()
             else:
                 mylog.debug("Detected unpacked HDF5")
                 self.data_style = 5

Modified: branches/parallel_profiles/yt/lagos/Profiles.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/Profiles.py	(original)
+++ branches/parallel_profiles/yt/lagos/Profiles.py	Mon Jul 28 12:59:48 2008
@@ -55,18 +55,27 @@
         self._pdata = {}
         self._lazy_reader = lazy_reader
 
+    def _get_dependencies(self, fields):
+        deps = []
+        for field in fields + self._get_bin_fields(): 
+            deps += fieldInfo[field].get_dependencies().requested
+        return list(set(deps))
+
     def _lazy_add_fields(self, fields, weight, accumulation):
         self._ngrids = 0
         self.__data = {}         # final results will go here
         self.__weight_data = {}  # we need to track the weights as we go
+        if hasattr(self._data_source.hierarchy, 'queue'):
+            self._data_source.hierarchy.queue.preload(self._get_grid_objs(), 
+                                                      self._get_dependencies(fields))
         for field in fields:
             self.__data[field] = self._get_empty_field()
             self.__weight_data[field] = self._get_empty_field()
         self.__used = self._get_empty_field().astype('bool')
-        pbar = get_pbar('Binning grids', len(self._data_source._grids))
+        #pbar = get_pbar('Binning grids', len(self._data_source._grids))
         for gi,grid in enumerate(self._get_grids()):
             self._ngrids += 1
-            pbar.update(gi)
+            #pbar.update(gi)
             args = self._get_bins(grid, check_cut=True)
             if not args: # No bins returned for this grid, so forget it!
                 continue
@@ -79,7 +88,7 @@
                 self.__used = (self.__used | u)       # running 'or'
             grid.clear_data()
         # When the loop completes the parallel finalizer gets called
-        pbar.finish()
+        #pbar.finish()
         ub = na.where(self.__used)
         for field in fields:
             if weight: # Now, at the end, we divide out.
@@ -90,16 +99,14 @@
 
     def _finalize_parallel(self):
         from mpi4py import MPI # I am displeased by importing here
-        temp = self._get_empty_field()
+        MPI.COMM_WORLD.Barrier()
         for key in self.__data:
-            MPI.COMM_WORLD.Allreduce(self.__data[key], temp, MPI.SUM)
-            self.__data[key] += temp
+            self.__data[key] = \
+                MPI.COMM_WORLD.Allreduce(self.__data[key], op=MPI.SUM)
         for key in self.__weight_data:
-            MPI.COMM_WORLD.Allreduce(self.__weight_data[key], temp, MPI.SUM)
-            self.__weight_data[key] += temp
-        temp = self.__used.copy().astype('int32')
-        MPI.COMM_WORLD.Allreduce(self.__used.astype('int32'), temp, MPI.SUM)
-        self.__used = (temp > 0)
+            self.__weight_data[key] = \
+                MPI.COMM_WORLD.Allreduce(self.__weight_data[key], op=MPI.SUM)
+        self.__used = MPI.COMM_WORLD.Allreduce(self.__used, op=MPI.SUM)
 
     def _unlazy_add_fields(self, fields, weight, accumulation):
         for field in fields:
@@ -244,6 +251,9 @@
             fid.write("\n")
         fid.close()
 
+    def _get_bin_fields(self):
+        return [self.bin_field]
+
 class BinnedProfile2D(BinnedProfile):
     def __init__(self, data_source,
                  x_n_bins, x_bin_field, x_lower_bound, x_upper_bound, x_log,
@@ -356,6 +366,9 @@
             fid.write("\n")
         fid.close()
 
+    def _get_bin_fields(self):
+        return [self.x_bin_field, self.y_bin_field]
+
 class BinnedProfile3D(BinnedProfile):
     def __init__(self, data_source,
                  x_n_bins, x_bin_field, x_lower_bound, x_upper_bound, x_log,
@@ -457,6 +470,9 @@
     def write_out(self, filename, format="%0.16e"):
         pass # Will eventually dump HDF5
 
+    def _get_bin_fields(self):
+        return [self.x_bin_field, self.y_bin_field, self.z_bin_field]
+
     def store_profile(self, name, force=False):
         """
         By identifying the profile with a fixed, user-input *name* we can

Modified: branches/parallel_profiles/yt/parallel_tools.py
==============================================================================
--- branches/parallel_profiles/yt/parallel_tools.py	(original)
+++ branches/parallel_profiles/yt/parallel_tools.py	Mon Jul 28 12:59:48 2008
@@ -37,7 +37,7 @@
 print "PARALLEL COMPATIBLE:", parallel_capable
 
 class GridIterator(object):
-    def __init__(self, pobj):
+    def __init__(self, pobj, just_list = False):
         self.pobj = pobj
         if hasattr(pobj, '_grids') and pobj._grids is not None:
             gs = pobj._grids
@@ -45,6 +45,7 @@
             gs = pobj._data_source._grids
         self._grids = sorted(gs, key = lambda g: g.filename)
         self.ng = len(self._grids)
+        self.just_list = just_list
 
     def __iter__(self):
         self.pos = 0
@@ -63,16 +64,17 @@
     This takes an object, pobj, that implements ParallelAnalysisInterface,
     and then does its thing.
     """
-    def __init__(self, pobj):
-        GridIterator.__init__(self, pobj)
+    def __init__(self, pobj, just_list = False):
+        GridIterator.__init__(self, pobj, just_list)
         self._offset = MPI.COMM_WORLD.rank
         self._skip = MPI.COMM_WORLD.size
         # Note that we're doing this in advance, and with a simple means
         # of choosing them; more advanced methods will be explored later.
-        self.my_grid_ids = na.mgrid[self._offset:self.ng:self._skip]
+        upper, lower = na.mgrid[0:self.ng:(self._skip+1)*1j][self._offset:self._offset+2]
+        self.my_grid_ids = na.mgrid[upper:lower-1].astype("int64")
         
     def __iter__(self):
-        self.pobj._initialize_parallel()
+        if not self.just_list: self.pobj._initialize_parallel()
         self.pos = 0
         return self
 
@@ -81,7 +83,7 @@
             gid = self.my_grid_ids[self.pos]
             self.pos += 1
             return self._grids[gid]
-        self.pobj._finalize_parallel()
+        if not self.just_list: self.pobj._finalize_parallel()
         raise StopIteration
 
 class ParallelAnalysisInterface(object):
@@ -93,6 +95,12 @@
             return ParallelGridIterator(self)
         return GridIterator(self)
 
+    def _get_grid_objs(self):
+        if parallel_capable and \
+           ytcfg.getboolean("yt","parallel"):
+            return ParallelGridIterator(self, True)
+        return GridIterator(self, True)
+
     def _initialize_parallel(self):
         pass
 



More information about the yt-svn mailing list