[Yt-svn] yt-commit r808 - trunk/yt/lagos

mturk at wrangler.dreamhost.com mturk at wrangler.dreamhost.com
Tue Sep 30 21:43:45 PDT 2008


Author: mturk
Date: Tue Sep 30 21:43:44 2008
New Revision: 808
URL: http://yt.spacepope.org/changeset/808

Log:
 * Some fixes for the parallel data mapping that could mess up parallel profiles.
 * More in-memory hierarchy fixes (still not done yet!)
 * CellMassCode field
 * Completely refactored DerivedQuantities.  Testing suggests: same and correct values.  Additionally, they now work in parallel.
 * find_max now returns the 'center' of the cell the max was found in rather than the right edge.  Also, I pulled out a duplicate method.
 * Some fixes for DM only runs.  Not quite ready yet.



Modified:
   trunk/yt/lagos/DataReadingFuncs.py
   trunk/yt/lagos/DerivedFields.py
   trunk/yt/lagos/DerivedQuantities.py
   trunk/yt/lagos/HierarchyType.py
   trunk/yt/lagos/ParallelTools.py
   trunk/yt/lagos/__init__.py

Modified: trunk/yt/lagos/DataReadingFuncs.py
==============================================================================
--- trunk/yt/lagos/DataReadingFuncs.py	(original)
+++ trunk/yt/lagos/DataReadingFuncs.py	Tue Sep 30 21:43:44 2008
@@ -230,14 +230,23 @@
     def __init__(self, ghost_zones=3):
         import enzo
         self.grids_in_memory = enzo.grid_data
+        self.old_grids_in_memory = enzo.old_grid_data
         self.my_slice = (slice(ghost_zones,-ghost_zones),
                       slice(ghost_zones,-ghost_zones),
                       slice(ghost_zones,-ghost_zones))
         BaseDataQueue.__init__(self)
 
     def _read_set(self, grid, field):
+        import enzo
         if grid.id not in self.grids_in_memory: raise KeyError
+        t1 = enzo.yt_parameter_file["InitialTime"]
+        t2 = enzo.hierarchy_information["GridOldTimes"][grid.id - 1]
+        coef1 = max((grid.Time - t1)/(grid.Time - t2), 0.0)
+        coef2 = 1.0 - coef1
         return self.grids_in_memory[grid.id][field][self.my_slice]
+        return (coef1*self.grids_in_memory[grid.id][field] + \
+                coef2*self.old_grids_in_memory[grid.id][field])\
+                [self.my_slice]
 
     def modify(self, field):
         return field.swapaxes(0,2)

Modified: trunk/yt/lagos/DerivedFields.py
==============================================================================
--- trunk/yt/lagos/DerivedFields.py	(original)
+++ trunk/yt/lagos/DerivedFields.py	Tue Sep 30 21:43:44 2008
@@ -589,6 +589,14 @@
           function=_CellMass,
           convert_function=_convertCellMassMsun)
 
+def _CellMassCode(field, data):
+    return data["Density"] * data["CellVolumeCode"]
+def _convertCellMassCode(data):
+    return 1.0/data.convert("Density")
+add_field("CellMassCode", 
+          function=_CellMassCode,
+          convert_function=_convertCellMassCode)
+
 def _CellVolume(field, data):
     if data['dx'].size == 1:
         try:

Modified: trunk/yt/lagos/DerivedQuantities.py
==============================================================================
--- trunk/yt/lagos/DerivedQuantities.py	(original)
+++ trunk/yt/lagos/DerivedQuantities.py	Tue Sep 30 21:43:44 2008
@@ -31,74 +31,78 @@
 quantity_info = {}
 
 class GridChildMaskWrapper:
-    def __init__(self, grid, data_object):
+    def __init__(self, grid, data_source):
         self.grid = grid
-        self.data_object = data_object
+        self.data_source = data_source
     def __getattr__(self, attr):
         return getattr(self.grid, attr)
     def __getitem__(self, item):
-        return self.data_object._get_data_from_grid(self.grid, item)
+        return self.data_source._get_data_from_grid(self.grid, item)
 
-def func_wrapper(quantities_collection, quantities_object):
-    func = quantities_object.function
-    c_func = quantities_object.combine_function
-    data_object = quantities_collection.data_object
-    def call_func_unlazy(args, kwargs):
-        retval = func(data_object, *args, **kwargs)
-        return c_func(data_object, *retval)
-    def call_func_lazy(args, kwargs):
-        n_ret = quantities_object.n_ret
-        retvals = [ [] for i in range(n_ret)]
-        pbar = get_pbar("Calculating ", len(data_object._grids))
-        for gi,g in enumerate(data_object._grids):
-            rv = func(GridChildMaskWrapper(g, data_object), *args, **kwargs)
-            for i in range(n_ret): retvals[i].append(rv[i])
-            g.clear_data()
-            pbar.update(gi)
-        pbar.finish()
-        retvals = [na.array(retvals[i]) for i in range(n_ret)]
-        return c_func(data_object, *retvals)
-    @wraps(func)
-    def call_func(*args, **kwargs):
-        lazy_reader = kwargs.pop('lazy_reader', False)
-        if lazy_reader and not quantities_object.force_unlazy:
-            return call_func_lazy(args, kwargs)
-        else:
-            return call_func_unlazy(args, kwargs)
-    return call_func
-
-class DerivedQuantity(object):
-    def __init__(self, name, function,
+class DerivedQuantity(ParallelAnalysisInterface):
+    def __init__(self, collection, name, function,
                  combine_function, units = "",
                  n_ret = 0, force_unlazy=False):
-        self.name = name
-        self.function = function
-        self.combine_function = combine_function
+        # We wrap the function with our object
+        self.__doc__ = function.__doc__
+        self.__name__ = name
+        self.collection = collection
+        self._data_source = collection.data_source
+        self.func = function
+        self.c_func = combine_function
         self.n_ret = n_ret
         self.force_unlazy = force_unlazy
 
+    def __call__(self, *args, **kwargs):
+        lazy_reader = kwargs.pop('lazy_reader', False)
+        if lazy_reader and not self.force_unlazy:
+            return self._call_func_lazy(args, kwargs)
+        else:
+            return self._call_func_unlazy(args, kwargs)
+
+    def _call_func_lazy(self, args, kwargs):
+        self.retvals = [ [] for i in range(self.n_ret)]
+        for gi,g in enumerate(self._get_grids()):
+            rv = self.func(GridChildMaskWrapper(g, self._data_source), *args, **kwargs)
+            for i in range(self.n_ret): self.retvals[i].append(rv[i])
+            g.clear_data()
+        self.retvals = [na.array(self.retvals[i]) for i in range(self.n_ret)]
+        return self.c_func(self._data_source, *self.retvals)
+
+    def _finalize_parallel(self):
+        self.retvals = [self._mpi_catlist(my_list) for my_list in self.retvals]
+        
+    def _call_func_unlazy(self, args, kwargs):
+        retval = self.func(self._data_source, *args, **kwargs)
+        return self.c_func(self._data_source, *retval)
+
 def add_quantity(name, **kwargs):
     if 'function' not in kwargs or 'combine_function' not in kwargs:
         mylog.error("Not adding field %s because both function and combine_function must be provided" % name)
         return
     f = kwargs.pop('function')
     c = kwargs.pop('combine_function')
-    quantity_info[name] = DerivedQuantity(name, f, c, **kwargs)
+    quantity_info[name] = (name, f, c, kwargs)
 
 class DerivedQuantityCollection(object):
     functions = quantity_info
-    def __init__(self, data_object):
-        self.data_object = data_object
+    def __init__(self, data_source):
+        self.data_source = data_source
 
     def __getitem__(self, key):
         if key not in self.functions:
             raise KeyError(key)
-        return func_wrapper(self, self.functions[key])
+        args = self.functions[key][:3]
+        kwargs = self.functions[key][3]
+        # Instantiate here, so we can pass it the data object
+        # Note that this means we instantiate every time we run help, etc
+        # I have made my peace with this.
+        return DerivedQuantity(self, *args, **kwargs)
 
     def keys(self):
         return self.functions.keys()
 
-def _TotalMass(data):
+def _TotalMass(self, data):
     """
     This function takes no arguments and returns the sum of cell masses and
     particle masses in the object.
@@ -227,7 +231,7 @@
                                   data['x'],data['y'],data['z'],
                                   truncate, kinetic/(2*G))
     return [(pot / kinetic)]
-def _combIsBound(data,bound):
+def _combIsBound(data, bound):
     return bound
 add_quantity("IsBound",function=_IsBound,combine_function=_combIsBound,n_ret=1,
              force_unlazy=True)
@@ -276,4 +280,3 @@
     return [na.sum(totals[:,i]) for i in range(n_fields)]
 add_quantity("TotalQuantity", function=_TotalQuantity,
                 combine_function=_combTotalQuantity, n_ret=2)
-        

Modified: trunk/yt/lagos/HierarchyType.py
==============================================================================
--- trunk/yt/lagos/HierarchyType.py	(original)
+++ trunk/yt/lagos/HierarchyType.py	Tue Sep 30 21:43:44 2008
@@ -243,9 +243,6 @@
                 maxGrid = grid
         mc = na.array(maxCoord)
         pos=maxGrid.get_position(mc)
-        pos[0] += 0.5*maxGrid.dx
-        pos[1] += 0.5*maxGrid.dx
-        pos[2] += 0.5*maxGrid.dx
         mylog.info("Max Value is %0.5e at %0.16f %0.16f %0.16f in grid %s at level %s %s", \
               maxVal, pos[0], pos[1], pos[2], maxGrid, maxGrid.Level, mc)
         self.center = pos
@@ -275,9 +272,6 @@
                 minGrid = grid
         mc = na.array(minCoord)
         pos=minGrid.get_position(mc)
-        pos[0] += 0.5*minGrid.dx
-        pos[1] += 0.5*minGrid.dx
-        pos[2] += 0.5*minGrid.dx
         mylog.info("Min Value is %0.5e at %0.16f %0.16f %0.16f in grid %s at level %s", \
               minVal, pos[0], pos[1], pos[2], minGrid, minGrid.Level)
         self.center = pos
@@ -396,39 +390,6 @@
         return self.grids[mask], na.where(mask)
 
     @time_execution
-    def find_max(self, field, finestLevels = True):
-        """
-        Returns (value, center) of location of maximum for a given field.
-        """
-        if finestLevels:
-            gI = na.where(self.gridLevels >= self.maxLevel - NUMTOCHECK)
-        else:
-            gI = na.where(self.gridLevels >= 0) # Slow but pedantic
-        maxVal = -1e100
-        for grid in self.grids[gI[0]]:
-            mylog.debug("Checking %s (level %s)", grid.id, grid.Level)
-            val, coord = grid.find_max(field)
-            if val > maxVal:
-                maxCoord = coord
-                maxVal = val
-                maxGrid = grid
-        mc = na.array(maxCoord)
-        pos=maxGrid.get_position(mc)
-        pos[0] += 0.5*maxGrid.dx
-        pos[1] += 0.5*maxGrid.dx
-        pos[2] += 0.5*maxGrid.dx
-        mylog.info("Max Value is %0.5e at %0.16f %0.16f %0.16f in grid %s at level %s %s", \
-              maxVal, pos[0], pos[1], pos[2], maxGrid, maxGrid.Level, mc)
-        self.center = pos
-        # This probably won't work for anyone else
-        self.bulkVelocity = (maxGrid["x-velocity"][maxCoord], \
-                             maxGrid["y-velocity"][maxCoord], \
-                             maxGrid["z-velocity"][maxCoord])
-        self.parameters["Max%sValue" % (field)] = maxVal
-        self.parameters["Max%sPos" % (field)] = "%s" % (pos)
-        return maxVal, pos
-
-    @time_execution
     def export_particles_pb(self, filename, filter = 1, indexboundary = 0, fields = None, scale=1.0):
         """
         Exports all the star particles, or a subset, to pb-format *filename*
@@ -603,7 +564,10 @@
     def __setup_filemap(self, grid):
         if not self.data_style == 6:
             return
-        self.cpu_map[grid.filename].append(grid)
+        try:
+            self.cpu_map[grid.filename].append(grid)
+        except AttributeError:
+            pass
 
     def __del__(self):
         self._close_data_file()
@@ -701,16 +665,13 @@
         # This needs to go elsewhere:
         # Now get the baryon filenames
         mylog.debug("Getting baryon filenames")
-        re_BaryonFileName = constructRegularExpressions("BaryonFileName",('s'))
-        fn_results = re.findall(re_BaryonFileName, self.__hierarchy_string)
-        if len(fn_results):
-            self.__set_all_filenames(fn_results)
-            return
-        re_BaryonFileName = constructRegularExpressions("FileName",('s'))
-        fn_results = re.findall(re_BaryonFileName, self.__hierarchy_string)
-        self.__set_all_filenames(fn_results)
-        # This is pretty bad, but we do it to save a significant amount of time
-        # in larger runs.
+        for patt in ["BaryonFileName", "FileName", "ParticleFileName"]:
+            re_FileName = constructRegularExpressions(patt,('s'))
+            fn_results = re.findall(re_FileName, self.__hierarchy_string)
+            if len(fn_results):
+                print patt
+                self.__set_all_filenames(fn_results)
+                return
 
     def __setup_grid_tree(self):
         mylog.debug("No cached tree found, creating")
@@ -835,7 +796,12 @@
             field_list = sets.Set()
             random_sample = self._generate_random_grids()
             for grid in random_sample:
-                gf = grid.getFields()
+                if not hasattr(grid, 'filename'): continue
+                try:
+                    gf = grid.getFields()
+                except grid._read_exception:
+                    mylog.debug("Grid %s is a bit funky?", grid.id)
+                    continue
                 mylog.debug("Grid %s has: %s", grid.id, gf)
                 field_list = field_list.union(sets.Set(gf))
             self.save_data(list(field_list),"/","DataFields")

Modified: trunk/yt/lagos/ParallelTools.py
==============================================================================
--- trunk/yt/lagos/ParallelTools.py	(original)
+++ trunk/yt/lagos/ParallelTools.py	Tue Sep 30 21:43:44 2008
@@ -88,10 +88,12 @@
         # Note that we're doing this in advance, and with a simple means
         # of choosing them; more advanced methods will be explored later.
         if self._use_all:
-            self.my_grid_ids = range(len(self._grids))
+            self.my_grid_ids = na.arange(len(self._grids))
         else:
-            upper, lower = na.mgrid[0:self.ng:(self._skip+1)*1j][self._offset:self._offset+2]
-            self.my_grid_ids = na.mgrid[upper:lower-1].astype("int64")
+            #upper, lower = na.mgrid[0:self.ng:(self._skip+1)*1j][self._offset:self._offset+2]
+            #self.my_grid_ids = na.mgrid[upper:lower-1].astype("int64")
+            self.my_grid_ids = na.array_split(
+                            na.arange(len(self._grids)), self._skip)[self._offset]
         
     def __iter__(self):
         self.pos = 0
@@ -164,6 +166,26 @@
             for j in buf: data[j] = na.concatenate([data[j],buf[j]], axis=-1)
         return data
 
+    def __mpi_recvlist(self, data):
+        # First we receive, then we make a new list.
+        for i in range(1,MPI.COMM_WORLD.size):
+            buf = MPI.COMM_WORLD.Recv(source=i, tag=0)
+            data += buf
+        return na.array(data)
+
+    def _mpi_catlist(self, data):
+        if not parallel_capable: return data
+        mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
+        MPI.COMM_WORLD.Barrier()
+        if MPI.COMM_WORLD.rank == 0:
+            data = self.__mpi_recvlist(data)
+        else:
+            MPI.COMM_WORLD.Send(data, dest=0, tag=0)
+        mylog.debug("Opening MPI Broadcast on %s", MPI.COMM_WORLD.rank)
+        data = MPI.COMM_WORLD.Bcast(data, root=0)
+        MPI.COMM_WORLD.Barrier()
+        return data
+
     def _should_i_write(self):
         if not parallel_capable: return True
         return (MPI.COMM_WORLD == 0)

Modified: trunk/yt/lagos/__init__.py
==============================================================================
--- trunk/yt/lagos/__init__.py	(original)
+++ trunk/yt/lagos/__init__.py	Tue Sep 30 21:43:44 2008
@@ -74,9 +74,9 @@
 import PointCombine
 import HDF5LightReader
 from EnzoDefs import *
+from ParallelTools import *
 from DerivedFields import *
 from DerivedQuantities import DerivedQuantityCollection, GridChildMaskWrapper
-from ParallelTools import *
 from DataReadingFuncs import *
 from ClusterFiles import *
 from ContourFinder import *



More information about the yt-svn mailing list