[Yt-svn] yt-commit r782 - in trunk/yt: . lagos raven

mturk at wrangler.dreamhost.com mturk at wrangler.dreamhost.com
Wed Sep 17 07:01:17 PDT 2008


Author: mturk
Date: Wed Sep 17 07:01:16 2008
New Revision: 782
URL: http://yt.spacepope.org/changeset/782

Log:
Merging the parallel profiles branch back into trunk.

Now when run via mpirun, profiles (1-, 2- and 3-D) will be automatically
distributed to different processors.  The same thing goes for projections.  At
the end, the various processors will broadcast back to each other and you'll
get the object.

Additionally, a tiny bit of (unimpressive) magic has been inserted into the
PlotCollection, so that this will be automatically passed through -- you can
now run this script, for instance, both in parallel and in serial, and it will
automatically parallelize:

--
from yt.mods import *
pf = EnzoStaticOutput("galaxy1200.dir/galaxy1200")
pc = PlotCollection(pf)
pc.add_projection("Density", 0)
pc.add_profile_sphere(0.1, '1', ["Density","Temperature"],
                    lazy_reader=True, x_bounds=(1e-31,1e-24))
pc.add_phase_sphere(0.1, '1', ["Density","Temperature","VelocityMagnitude"],
                    lazy_reader=True, x_bounds=(1e-31,1e-24), y_bounds=(1e1,1e8))
pc.save("par")
--

Unfortunately, as of right now, it will not parallelize unless it knows the
boundaries for all the objects in question that you are profiling, and unless
lazy_reader has been set.  (Projections are automatically parallelized in all
cases.)  This will change when I parallelize the DerivedQuanties, at which
point it'll be able to (in parallel) get that information.

The IO system has been rewritten as well to take advantage of this.  Parallel
profiles and projections will load all the grids they want from a  CPU file
before they do any processing, and they'll do it with a single 'open'
operation.



Added:
   trunk/yt/lagos/ParallelTools.py
      - copied, changed from r781, /branches/parallel_profiles/yt/lagos/ParallelTools.py
Modified:
   trunk/yt/config.py
   trunk/yt/funcs.py
   trunk/yt/lagos/BaseDataTypes.py
   trunk/yt/lagos/BaseGridType.py
   trunk/yt/lagos/DataReadingFuncs.py
   trunk/yt/lagos/EnzoDefs.py
   trunk/yt/lagos/HDF5LightReader.c
   trunk/yt/lagos/HierarchyType.py
   trunk/yt/lagos/Profiles.py
   trunk/yt/lagos/__init__.py
   trunk/yt/logger.py
   trunk/yt/raven/PlotTypes.py

Modified: trunk/yt/config.py
==============================================================================
--- trunk/yt/config.py	(original)
+++ trunk/yt/config.py	Wed Sep 17 07:01:16 2008
@@ -63,7 +63,9 @@
         'unifiedlogfile': '1',
         'timefunctions':'False',
         'inGui':'False',
-        'parallel':'False',
+        '__parallel':'False',
+        '__parallel_rank':'0',
+        '__parallel_size':'1',
          },
     "raven":{
         'ImagePath':".",

Modified: trunk/yt/funcs.py
==============================================================================
--- trunk/yt/funcs.py	(original)
+++ trunk/yt/funcs.py	Wed Sep 17 07:01:16 2008
@@ -154,3 +154,10 @@
     from collections import defaultdict
 except ImportError:
     defaultdict = __defaultdict
+
+def only_on_root(func, *args, **kwargs):
+    from yt.config import ytcfg
+    if not ytcfg.getboolean("yt","__parallel"):
+        return func(*args,**kwargs)
+    if ytcfg.getint("yt","__parallel_rank") > 0: return
+    return func(*args, **kwargs)

Modified: trunk/yt/lagos/BaseDataTypes.py
==============================================================================
--- trunk/yt/lagos/BaseDataTypes.py	(original)
+++ trunk/yt/lagos/BaseDataTypes.py	Wed Sep 17 07:01:16 2008
@@ -706,7 +706,7 @@
         if use_child_mask: k = (k & grid.child_mask)
         return na.where(k)
 
-class EnzoProjBase(Enzo2DData):
+class EnzoProjBase(Enzo2DData, ParallelAnalysisInterface):
     _key_fields = ['px','py','pdx','pdy']
     def __init__(self, axis, field, weight_field = None,
                  max_level = None, center = None, pf = None,
@@ -718,22 +718,15 @@
         of that weight.
         """
         Enzo2DData.__init__(self, axis, field, pf, **kwargs)
-        if not source:
-            self._check_region = False
-            source = EnzoGridCollection(center, self.hierarchy.grids)
-            self._okay_to_serialize = True
-        else:
-            self._okay_to_serialize = False
-            self._check_region = True
-        self.source = source
+        self.center = center
+        self._initialize_source()
         self._grids = self.source._grids
         if max_level == None:
             max_level = self.hierarchy.maxLevel
-        if source is not None:
-            max_level = min(max_level, source.gridLevels.max())
+        if self.source is not None:
+            max_level = min(max_level, self.source.gridLevels.max())
         self._max_level = max_level
         self._weight = weight_field
-        self.center = center
         self.func = na.sum # for the future
         self.__retval_coords = {}
         self.__retval_fields = {}
@@ -747,6 +740,16 @@
             self._refresh_data()
             if self._okay_to_serialize: self._serialize()
 
+    def _initialize_source(self, source = None):
+        if source is None:
+            check, source = self._partition_hierarchy_2d(self.axis)
+            self._check_region = check
+            self._okay_to_serialize = (not check)
+        else:
+            self._okay_to_serialize = False
+            self._check_region = True
+        self.source = source
+
     #@time_execution
     def __cache_data(self):
         rdf = self.hierarchy.grid.readDataFast
@@ -783,6 +786,7 @@
         mylog.info("Finished calculating overlap.")
 
     def _serialize(self):
+        if not self._should_i_write(): return
         mylog.info("Serializing data...")
         node_name = "%s_%s_%s" % (self.fields[0], self._weight, self.axis)
         mylog.info("nodeName: %s", node_name)
@@ -830,6 +834,7 @@
             for fi in range(len(fields)): g_fields[fi] *= dls[fi]
             if self._weight is not None: g_coords[3] *= dls[-1]
             pbar.update(pi)
+            grid.clear_data()
         pbar.finish()
         self.__combine_grids_on_level(level) # In-place
         if level > 0 and level <= self._max_level:
@@ -930,6 +935,11 @@
         coord_data = []
         field_data = []
         dxs = []
+        # We do this here, but I am not convinced it should be done here
+        # It is probably faster, as it consolidates IO, but if we did it in
+        # _project_level, then it would be more memory conservative
+        self._preload(self.source._grids, self._get_dependencies(fields),
+                      self.hierarchy.queue)
         for level in range(0, self._max_level+1):
             my_coords, my_dx, my_fields = self.__project_level(level, fields)
             coord_data.append(my_coords)
@@ -948,14 +958,20 @@
         field_data = na.concatenate(field_data, axis=1)
         dxs = na.concatenate(dxs, axis=1)
         # We now convert to half-widths and center-points
-        self.data['pdx'] = dxs
-        self.data['px'] = (coord_data[0,:]+0.5) * self['pdx']
-        self.data['py'] = (coord_data[1,:]+0.5) * self['pdx']
-        self.data['pdx'] *= 0.5
-        self.data['pdy'] = self.data['pdx'].copy()
+        data = {}
+        data['pdx'] = dxs
+        data['px'] = (coord_data[0,:]+0.5) * data['pdx']
+        data['py'] = (coord_data[1,:]+0.5) * data['pdx']
+        data['pdx'] *= 0.5
+        data['pdy'] = data['pdx'].copy()
+        data['fields'] = field_data
+        data['weight_field'] = coord_data[3,:]
+        # Now we run the finalizer, which is ignored if we don't need it
+        data = self._mpi_catdict(data)
+        field_data = data.pop('fields')
         for fi, field in enumerate(fields):
             self[field] = field_data[fi,:]
-        self.data['weight_field'] = coord_data[3,:]
+        for i in data.keys(): self[i] = data.pop(i)
 
     def add_fields(self, fields, weight = "CellMassMsun"):
         pass

Modified: trunk/yt/lagos/BaseGridType.py
==============================================================================
--- trunk/yt/lagos/BaseGridType.py	(original)
+++ trunk/yt/lagos/BaseGridType.py	Wed Sep 17 07:01:16 2008
@@ -85,8 +85,13 @@
                 if fieldInfo.has_key(field):
                     conv_factor = fieldInfo[field]._convert_function(self)
                 try:
-                    self[field] = self.readDataFast(field) * conv_factor
-                except self._read_exception:
+                    if hasattr(self.hierarchy, 'queue'):
+                        temp = self.hierarchy.queue.pop(self, field)
+                    else:
+                        temp = self.readDataFast(field)
+                    self[field] = temp * conv_factor
+                except self._read_exception, exc:
+                    print exc
                     if field in fieldInfo:
                         if fieldInfo[field].particle_type:
                             self[field] = na.array([],dtype='int64')

Modified: trunk/yt/lagos/DataReadingFuncs.py
==============================================================================
--- trunk/yt/lagos/DataReadingFuncs.py	(original)
+++ trunk/yt/lagos/DataReadingFuncs.py	Wed Sep 17 07:01:16 2008
@@ -142,3 +142,64 @@
 
 def getExceptionHDF5():
     return (exceptions.KeyError, HDF5LightReader.ReadingError)
+
+class BaseDataQueue(object):
+
+    def __init__(self):
+        self.queue = defaultdict(lambda: {})
+
+    # We need a function for reading a list of sets
+    # and a function for *popping* from a queue all the appropriate sets
+
+    def preload(self, grids, sets):
+        pass
+
+    def pop(self, grid, field):
+        if grid.id in self.queue and field in self.queue[grid.id]:
+            return self.modify(self.queue[grid.id].pop(field))
+        else:
+            # We only read the one set and do not store it if it isn't pre-loaded
+            return self._read_set(grid, field)
+
+    def peek(self, grid, field):
+        return self.queue[grid.id].get(field, None)
+
+    def push(self, grid, field, data):
+        if grid.id in self.queue and field in self.queue[grid.id]:
+            raise ValueError
+        self.queue[grid][field] = data
+
+class DataQueueHDF4(BaseDataQueue):
+    def _read_set(self, grid, field):
+        return readDataHDF4(grid, field)
+
+    def modify(self, field):
+        return field.swapaxes(0,2)
+
+class DataQueueHDF5(BaseDataQueue):
+    def _read_set(self, grid, field):
+        return readDataHDF5(grid, field)
+
+    def modify(self, field):
+        return field.swapaxes(0,2)
+
+class DataQueuePackedHDF5(BaseDataQueue):
+    def _read_set(self, grid, field):
+        return readDataPacked(grid, field)
+
+    def modify(self, field):
+        return field.swapaxes(0,2)
+
+    def preload(self, grids, sets):
+        # We need to deal with files first
+        files_keys = defaultdict(lambda: [])
+        sets = list(sets)
+        for g in grids: files_keys[g.filename].append(g)
+        for file in files_keys:
+            mylog.debug("Starting read %s", file)
+            nodes = [g.id for g in files_keys[file]]
+            nodes.sort()
+            data = HDF5LightReader.ReadMultipleGrids(file, nodes, sets)
+            mylog.debug("Read %s items from %s", len(data), os.path.basename(file))
+            for gid in data: self.queue[gid].update(data[gid])
+        mylog.debug("Finished read of %s", sets)

Modified: trunk/yt/lagos/EnzoDefs.py
==============================================================================
--- trunk/yt/lagos/EnzoDefs.py	(original)
+++ trunk/yt/lagos/EnzoDefs.py	Wed Sep 17 07:01:16 2008
@@ -69,6 +69,7 @@
                  "CurrentTimeIdentifier": int,
                  "BoundaryConditionName": str,
                  "TopGridRank": int,
+                 "TopGridDimensions": int,
                 }
 
 mpc_conversion = {'mpc'   : 1e0,

Modified: trunk/yt/lagos/HDF5LightReader.c
==============================================================================
--- trunk/yt/lagos/HDF5LightReader.c	(original)
+++ trunk/yt/lagos/HDF5LightReader.c	Wed Sep 17 07:01:16 2008
@@ -120,7 +120,7 @@
 
     if (file_id < 0) {
         PyErr_Format(_hdf5ReadError,
-                 "ReadHDF5DataSet: Unable to open %s", nodename);
+                 "ReadHDF5DataSet: Unable to open %s", filename);
         goto _fail;
     }
 
@@ -456,11 +456,182 @@
     return 0;
 };
 
+PyArrayObject* get_array_from_nodename(char *nodename, hid_t rootnode);
+
+static PyObject *
+Py_ReadMultipleGrids(PyObject *obj, PyObject *args)
+{
+    // Process:
+    //      - Create dict to hold data
+    //      - Open each top-level node in order
+    //      - For each top-level node create a dictionary
+    //      - Insert new dict in top-level dict
+    //      - Read each dataset, insert into dict
+
+    // Format arguments
+
+    char *filename = NULL;
+    char *format_string = NULL;
+    PyObject *grid_ids = NULL;
+    PyObject *set_names = NULL;
+    Py_ssize_t num_sets = 0;
+    Py_ssize_t num_grids = 0;
+
+    if (!PyArg_ParseTuple(args, "sOO",
+            &filename, &grid_ids, &set_names))
+        return PyErr_Format(_hdf5ReadError,
+               "ReadMultipleGrids: Invalid parameters.");
+
+    num_grids = PyList_Size(grid_ids);
+    num_sets = PyList_Size(set_names);
+    PyObject *grids_dict = PyDict_New(); // New reference
+    PyObject *grid_key = NULL;
+    PyObject *grid_data = NULL;
+    PyObject *oset_name = NULL;
+    PyArrayObject *cur_data = NULL;
+    char *set_name;
+    hid_t file_id, grid_node;
+    file_id = grid_node = 0;
+    int i, n;
+    long id;
+    char grid_node_name[13]; // Grid + 8 + \0
+
+    file_id = H5Fopen (filename, H5F_ACC_RDONLY, H5P_DEFAULT); 
+
+    if (file_id < 0) {
+        PyErr_Format(_hdf5ReadError,
+                 "ReadMultipleGrids: Unable to open %s", filename);
+        goto _fail;
+    }
+
+    for(i = 0; i < num_grids; i++) {
+        grid_key = PyList_GetItem(grid_ids, i);
+        id = PyInt_AsLong(grid_key);
+        sprintf(grid_node_name, "Grid%08li", id);
+        grid_data = PyDict_New(); // New reference
+        PyDict_SetItem(grids_dict, grid_key, grid_data);
+        grid_node = H5Gopen(file_id, grid_node_name);
+        if (grid_node < 0) {
+              PyErr_Format(_hdf5ReadError,
+                  "ReadHDF5DataSet: Error opening (%s, %s)",
+                  filename, grid_node_name);
+              goto _fail;
+        }
+        for(n = 0; n < num_sets; n++) {
+            // This points to the in-place internal char*
+            oset_name = PyList_GetItem(set_names, n);
+            set_name = PyString_AsString(oset_name);
+            cur_data = get_array_from_nodename(set_name, grid_node);
+            if (cur_data == NULL) {
+              PyErr_Format(_hdf5ReadError,
+                  "ReadHDF5DataSet: Error reading (%s, %s, %s)",
+                  filename, grid_node_name, set_name);
+              goto _fail;
+            }
+            PyDict_SetItem(grid_data, oset_name, (PyObject *) cur_data);
+            Py_DECREF(cur_data); // still one left
+        }
+        // We just want the one reference from the grids_dict value set
+        Py_DECREF(grid_data); 
+        H5Gclose(grid_node);
+    }
+
+    H5Fclose(file_id);
+    PyObject *return_value = Py_BuildValue("N", grids_dict);
+    return return_value;
+
+    _fail:
+
+      if(!(file_id <= 0)&&(H5Iget_ref(file_id))) H5Fclose(file_id);
+      if(!(grid_node <= 0)&&(H5Iget_ref(grid_node))) H5Gclose(grid_node);
+      Py_XDECREF(grid_data);
+      PyDict_Clear(grids_dict); // Should catch the sub-dictionaries
+      return NULL;
+
+}
+
+PyArrayObject* get_array_from_nodename(char *nodename, hid_t rootnode)
+{
+    
+    H5E_auto_t err_func;
+    void *err_datastream;
+    herr_t my_error;
+    hsize_t *my_dims = NULL;
+    hsize_t *my_max_dims = NULL;
+    npy_intp *dims = NULL;
+    int my_typenum, my_rank, i;
+    size_t type_size;
+    PyArrayObject *my_array = NULL;
+    hid_t datatype_id, native_type_id, dataset, dataspace;
+    datatype_id = native_type_id = dataset = dataspace = 0;
+
+    H5Eget_auto(&err_func, &err_datastream);
+    H5Eset_auto(NULL, NULL);
+    dataset = H5Dopen(rootnode, nodename);
+    H5Eset_auto(err_func, err_datastream);
+
+    if(dataset < 0) goto _fail;
+
+    dataspace = H5Dget_space(dataset);
+    if(dataspace < 0) goto _fail;
+
+    my_rank = H5Sget_simple_extent_ndims( dataspace );
+    if(my_rank < 0) goto _fail;
+
+    my_dims = malloc(sizeof(hsize_t) * my_rank);
+    my_max_dims = malloc(sizeof(hsize_t) * my_rank);
+    my_error = H5Sget_simple_extent_dims( dataspace, my_dims, my_max_dims );
+    if(my_error < 0) goto _fail;
+
+    dims = malloc(my_rank * sizeof(npy_intp));
+    for (i = 0; i < my_rank; i++) dims[i] = (npy_intp) my_dims[i];
+
+    datatype_id = H5Dget_type(dataset);
+    native_type_id = H5Tget_native_type(datatype_id, H5T_DIR_ASCEND);
+    type_size = H5Tget_size(native_type_id);
+
+    /* Behavior here is intentionally undefined for non-native types */
+
+    int my_desc_type = get_my_desc_type(native_type_id);
+    if (my_desc_type == -1) {
+          PyErr_Format(_hdf5ReadError,
+                       "ReadHDF5DataSet: Unrecognized datatype.  Use a more advanced reader.");
+          goto _fail;
+    }
+
+    // Increments the refcount
+    my_array = (PyArrayObject *) PyArray_SimpleNewFromDescr(my_rank, dims,
+                PyArray_DescrFromType(my_desc_type));
+    if (!my_array) goto _fail;
+
+    H5Dread(dataset, native_type_id, H5S_ALL, H5S_ALL, H5P_DEFAULT, my_array->data);
+    H5Sclose(dataspace);
+    H5Dclose(dataset);
+    H5Tclose(native_type_id);
+    H5Tclose(datatype_id);
+    free(my_dims);
+    free(my_max_dims);
+    free(dims);
+
+    PyArray_UpdateFlags(my_array, NPY_OWNDATA | my_array->flags);
+    return my_array;
+
+    _fail:
+      if(!(dataset <= 0)&&(H5Iget_ref(dataset))) H5Dclose(dataset);
+      if(!(dataspace <= 0)&&(H5Iget_ref(dataspace))) H5Sclose(dataspace);
+      if(!(native_type_id <= 0)&&(H5Iget_ref(native_type_id))) H5Tclose(native_type_id);
+      if(!(datatype_id <= 0)&&(H5Iget_ref(datatype_id))) H5Tclose(datatype_id);
+      if(my_dims != NULL) free(my_dims);
+      if(my_max_dims != NULL) free(my_max_dims);
+      if(dims != NULL) free(dims);
+      return NULL;
+}
 
 static PyMethodDef _hdf5LightReaderMethods[] = {
     {"ReadData", Py_ReadHDF5DataSet, METH_VARARGS},
     {"ReadDataSlice", Py_ReadHDF5DataSetSlice, METH_VARARGS},
     {"ReadListOfDatasets", Py_ReadListOfDatasets, METH_VARARGS},
+    {"ReadMultipleGrids", Py_ReadMultipleGrids, METH_VARARGS},
     {NULL, NULL} 
 };
 

Modified: trunk/yt/lagos/HierarchyType.py
==============================================================================
--- trunk/yt/lagos/HierarchyType.py	(original)
+++ trunk/yt/lagos/HierarchyType.py	Wed Sep 17 07:01:16 2008
@@ -142,14 +142,17 @@
             a = SD.SD(testGrid)
             self.data_style = 4
             mylog.debug("Detected HDF4")
+            self.queue = DataQueueHDF4()
         except:
             list_of_sets = HDF5LightReader.ReadListOfDatasets(testGrid, "/")
             if len(list_of_sets) == 0:
                 mylog.debug("Detected packed HDF5")
                 self.data_style = 6
+                self.queue = DataQueuePackedHDF5()
             else:
                 mylog.debug("Detected unpacked HDF5")
                 self.data_style = 5
+                self.queue = DataQueueHDF5()
 
     def __setup_filemap(self, grid):
         if not self.data_style == 6:
@@ -178,6 +181,7 @@
         self.cutting = classobj("EnzoCuttingPlane",(EnzoCuttingPlaneBase,), dd)
         self.ray = classobj("EnzoOrthoRay",(EnzoOrthoRayBase,), dd)
         self.disk = classobj("EnzoCylinder",(EnzoCylinderBase,), dd)
+        self.grid_collection = classobj("EnzoGridCollection",(EnzoGridCollection,), dd)
 
     def __initialize_data_file(self):
         if not ytcfg.getboolean('lagos','serialize'): return
@@ -491,6 +495,7 @@
                 gf = grid.getFields()
                 mylog.debug("Grid %s has: %s", grid.id, gf)
                 field_list = field_list.union(sets.Set(gf))
+            self.save_data(list(field_list),"/","DataFields")
         self.field_list = list(field_list)
         for field in self.field_list:
             if field in fieldInfo: continue

Copied: trunk/yt/lagos/ParallelTools.py (from r781, /branches/parallel_profiles/yt/lagos/ParallelTools.py)
==============================================================================
--- /branches/parallel_profiles/yt/lagos/ParallelTools.py	(original)
+++ trunk/yt/lagos/ParallelTools.py	Wed Sep 17 07:01:16 2008
@@ -160,6 +160,12 @@
         if not parallel_capable: return True
         return (MPI.COMM_WORLD == 0)
 
+    def _preload(self, grids, fields, queue):
+        # This will preload if it detects we are parallel capable and
+        # if so, we load *everything* that we need.  Use with some care.
+        if not parallel_capable: return
+        queue.preload(grids, fields)
+
     def _mpi_allsum(self, data):
         MPI.COMM_WORLD.Barrier()
         return MPI.COMM_WORLD.Allreduce(data, op=MPI.SUM)

Modified: trunk/yt/lagos/Profiles.py
==============================================================================
--- trunk/yt/lagos/Profiles.py	(original)
+++ trunk/yt/lagos/Profiles.py	Wed Sep 17 07:01:16 2008
@@ -48,22 +48,33 @@
 
 # Note we do not inherit from EnzoData.
 # We could, but I think we instead want to deal with the root datasource.
-class BinnedProfile:
+class BinnedProfile(ParallelAnalysisInterface):
     def __init__(self, data_source, lazy_reader):
         self._data_source = data_source
         self._data = {}
+        self._pdata = {}
         self._lazy_reader = lazy_reader
 
+    def _get_dependencies(self, fields):
+        return ParallelAnalysisInterface._get_dependencies(
+                    self, fields + self._get_bin_fields())
+
+    def _initialize_parallel(self, fields):
+        self._preload(self._get_grid_objs(), self._get_dependencies(fields),
+                      self._data_source.hierarchy.queue)
+
     def _lazy_add_fields(self, fields, weight, accumulation):
-        data = {}         # final results will go here
-        weight_data = {}  # we need to track the weights as we go
+        self._ngrids = 0
+        self.__data = {}         # final results will go here
+        self.__weight_data = {}  # we need to track the weights as we go
         for field in fields:
-            data[field] = self._get_empty_field()
-            weight_data[field] = self._get_empty_field()
-        used = self._get_empty_field().astype('bool')
-        pbar = get_pbar('Binning grids', len(self._data_source._grids))
-        for gi,grid in enumerate(self._data_source._grids):
-            pbar.update(gi)
+            self.__data[field] = self._get_empty_field()
+            self.__weight_data[field] = self._get_empty_field()
+        self.__used = self._get_empty_field().astype('bool')
+        #pbar = get_pbar('Binning grids', len(self._data_source._grids))
+        for gi,grid in enumerate(self._get_grids(fields)):
+            self._ngrids += 1
+            #pbar.update(gi)
             args = self._get_bins(grid, check_cut=True)
             if not args: # No bins returned for this grid, so forget it!
                 continue
@@ -71,17 +82,26 @@
                 # We get back field values, weight values, used bins
                 f, w, u = self._bin_field(grid, field, weight, accumulation,
                                           args=args, check_cut=True)
-                data[field] += f        # running total
-                weight_data[field] += w # running total
-                used = (used | u)       # running 'or'
+                self.__data[field] += f        # running total
+                self.__weight_data[field] += w # running total
+                self.__used = (self.__used | u)       # running 'or'
             grid.clear_data()
-        pbar.finish()
-        ub = na.where(used)
+        # When the loop completes the parallel finalizer gets called
+        #pbar.finish()
+        ub = na.where(self.__used)
         for field in fields:
             if weight: # Now, at the end, we divide out.
-                data[field][ub] /= weight_data[field][ub]
-            self[field] = data[field]
-        self["UsedBins"] = used
+                self.__data[field][ub] /= self.__weight_data[field][ub]
+            self[field] = self.__data[field]
+        self["UsedBins"] = self.__used
+        del self.__data, self.__weight_data, self.__used
+
+    def _finalize_parallel(self):
+        for key in self.__data:
+            self.__data[key] = self._mpi_allsum(self.__data[key])
+        for key in self.__weight_data:
+            self.__weight_data[key] = self._mpi_allsum(self.__weight_data[key])
+        self.__used = self._mpi_allsum(self.__used)
 
     def _unlazy_add_fields(self, fields, weight, accumulation):
         for field in fields:
@@ -226,6 +246,9 @@
             fid.write("\n")
         fid.close()
 
+    def _get_bin_fields(self):
+        return [self.bin_field]
+
 class BinnedProfile2D(BinnedProfile):
     def __init__(self, data_source,
                  x_n_bins, x_bin_field, x_lower_bound, x_upper_bound, x_log,
@@ -338,6 +361,9 @@
             fid.write("\n")
         fid.close()
 
+    def _get_bin_fields(self):
+        return [self.x_bin_field, self.y_bin_field]
+
 class BinnedProfile3D(BinnedProfile):
     def __init__(self, data_source,
                  x_n_bins, x_bin_field, x_lower_bound, x_upper_bound, x_log,
@@ -439,6 +465,9 @@
     def write_out(self, filename, format="%0.16e"):
         pass # Will eventually dump HDF5
 
+    def _get_bin_fields(self):
+        return [self.x_bin_field, self.y_bin_field, self.z_bin_field]
+
     def store_profile(self, name, force=False):
         """
         By identifying the profile with a fixed, user-input *name* we can

Modified: trunk/yt/lagos/__init__.py
==============================================================================
--- trunk/yt/lagos/__init__.py	(original)
+++ trunk/yt/lagos/__init__.py	Wed Sep 17 07:01:16 2008
@@ -63,8 +63,10 @@
         pass
 
 if ytcfg.getboolean("lagos","usefortran"):
-    pass
-    #import EnzoFortranRoutines
+    try:
+        import EnzoFortranRoutines
+    except ImportError:
+        mylog.warning("Told to import fortan, but unable!")
 
 # Now we import all the subfiles
 
@@ -74,6 +76,7 @@
 from EnzoDefs import *
 from DerivedFields import *
 from DerivedQuantities import DerivedQuantityCollection, GridChildMaskWrapper
+from ParallelTools import *
 from DataReadingFuncs import *
 from ClusterFiles import *
 from ContourFinder import *

Modified: trunk/yt/logger.py
==============================================================================
--- trunk/yt/logger.py	(original)
+++ trunk/yt/logger.py	Wed Sep 17 07:01:16 2008
@@ -55,45 +55,60 @@
 mb = 10*1024*1024
 bc = 10
 
+loggers = []
+file_handlers = []
+
 if ytcfg.getboolean("yt","logfile") and os.access(".", os.W_OK):
     if ytcfg.getboolean("yt","unifiedlogfile"):
         log_file_name = ytcfg.get("yt","LogFileName")
-        ytHandler = handlers.RotatingFileHandler(log_file_name,
+        ytFileHandler = handlers.RotatingFileHandler(log_file_name,
                                                  maxBytes=mb, backupCount=bc)
         k = logging.Formatter(fstring)
-        ytHandler.setFormatter(k)
-        ytLogger.addHandler(ytHandler)
+        ytFileHandler.setFormatter(k)
+        ytLogger.addHandler(ytFileHandler)
+        loggers.append(ytLogger)
+        file_handlers.append(ytFileHandler)
     else:
         # If we *don't* want a unified file handler (which is the default now!)
-        fidoHandler = handlers.RotatingFileHandler("fido.log",
+        fidoFileHandler = handlers.RotatingFileHandler("fido.log",
                                                    maxBytes=mb, backupCount=bc)
-        fidoHandler.setFormatter(f)
-        fidoLogger.addHandler(fidoHandler)
+        fidoFileHandler.setFormatter(f)
+        fidoLogger.addHandler(fidoFileHandler)
 
-        ravenHandler = handlers.RotatingFileHandler("raven.log",
+        ravenFileHandler = handlers.RotatingFileHandler("raven.log",
                                                     maxBytes=mb, backupCount=bc)
-        ravenHandler.setFormatter(f)
-        ravenLogger.addHandler(ravenHandler)
+        ravenFileHandler.setFormatter(f)
+        ravenLogger.addHandler(ravenFileHandler)
+        loggers.append(ravenLogger)
+        file_handlers.append(ravenFileHandler)
 
-        lagosHandler = handlers.RotatingFileHandler("lagos.log",
+        lagosFileHandler = handlers.RotatingFileHandler("lagos.log",
                                                     maxBytes=mb, backupCount=bc)
-        lagosHandler.setFormatter(f)
-        lagosLogger.addHandler(lagosHandler)
+        lagosFileHandler.setFormatter(f)
+        lagosLogger.addHandler(lagosFileHandler)
+        loggers.append(lagosLogger)
+        file_handlers.append(lagosFileHandler)
 
-        enkiHandler = handlers.RotatingFileHandler("enki.log",
+        enkiFileHandler = handlers.RotatingFileHandler("enki.log",
                                                    maxBytes=mb, backupCount=bc)
-        enkiHandler.setFormatter(f)
-        enkiLogger.addHandler(enkiHandler)
+        enkiFileHandler.setFormatter(f)
+        enkiLogger.addHandler(enkiFileHandler)
+        loggers.append(enkiLogger)
+        file_handlers.append(enkiFileHandler)
 
-        deliveratorHandler = handlers.RotatingFileHandler("deliverator.log",
+        deliveratorFileHandler = handlers.RotatingFileHandler("deliverator.log",
                                                 maxBytes=mb, backupCount=bc)
-        deliveratorHandler.setFormatter(f)
-        deliveratorLogger.addHandler(deliveratorHandler)
+        deliveratorFileHandler.setFormatter(f)
+        deliveratorLogger.addHandler(deliveratorFileHandler)
+        loggers.append(deliveratorLogger)
+        file_handlers.append(deliveratorFileHandler)
 
-        reasonHandler = handlers.RotatingFileHandler("reason.log",
+        reasonFileHandler = handlers.RotatingFileHandler("reason.log",
                                                     maxBytes=mb, backupCount=bc)
-        reasonHandler.setFormatter(f)
-        reasonLogger.addHandler(reasonHandler)
+        reasonFileHandler.setFormatter(f)
+        reasonLogger.addHandler(reasonFileHandler)
+        loggers.append(reasonLogger)
+        file_handlers.append(reasonFileHandler)
 
 def disable_stream_logging():
     # We just remove the root logger's handlers
@@ -101,5 +116,9 @@
         if isinstance(handler, logging.StreamHandler):
             rootLogger.removeHandler(handler)
 
+def disable_file_logging():
+    for logger, handler in zip(loggers, file_handlers):
+        logger.removeHandler(handler)
+
 if ytcfg.getboolean("yt","suppressStreamLogging"):
     disable_stream_logging()

Modified: trunk/yt/raven/PlotTypes.py
==============================================================================
--- trunk/yt/raven/PlotTypes.py	(original)
+++ trunk/yt/raven/PlotTypes.py	Wed Sep 17 07:01:16 2008
@@ -138,8 +138,7 @@
             my_prefix = prefix
         fn = ".".join([my_prefix, format])
         canvas = engineVals["canvas"](self._figure)
-        #self._figure.savefig(fn, format)
-        canvas.print_figure(fn)
+        only_on_root(canvas.print_figure, fn)
         self["Type"] = self._type_name
         self["GeneratedAt"] = self.data.hierarchy["CurrentTimeIdentifier"]
         return fn



More information about the yt-svn mailing list