[Yt-svn] yt-commit r700 - in branches/parallel_profiles/yt: . lagos
mturk at wrangler.dreamhost.com
mturk at wrangler.dreamhost.com
Mon Jul 28 12:59:49 PDT 2008
Author: mturk
Date: Mon Jul 28 12:59:48 2008
New Revision: 700
URL: http://yt.spacepope.org/changeset/700
Log:
Okay, committing some changes from the other day. Right now you either o a
lazy_profile where it pre-loads everything, *thus making it completely
irrelevant* or you run it all in core *which does the same thing but doesn't
pre-load*. It's a fine place we're in right now. But. I'm thinking on how to
refactor this to bring it back to usability. For now, it is surprisingly
functional as a parallel analysis toolkit. Queueing of data and preloading
works great...
Modified:
branches/parallel_profiles/yt/lagos/BaseGridType.py
branches/parallel_profiles/yt/lagos/DataReadingFuncs.py
branches/parallel_profiles/yt/lagos/HDF5LightReader.c
branches/parallel_profiles/yt/lagos/HierarchyType.py
branches/parallel_profiles/yt/lagos/Profiles.py
branches/parallel_profiles/yt/parallel_tools.py
Modified: branches/parallel_profiles/yt/lagos/BaseGridType.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/BaseGridType.py (original)
+++ branches/parallel_profiles/yt/lagos/BaseGridType.py Mon Jul 28 12:59:48 2008
@@ -85,8 +85,13 @@
if fieldInfo.has_key(field):
conv_factor = fieldInfo[field]._convert_function(self)
try:
- self[field] = self.readDataFast(field) * conv_factor
- except self._read_exception:
+ if hasattr(self.hierarchy, 'queue'):
+ temp = self.hierarchy.queue.pop(self, field)
+ else:
+ temp = self.readDataFast(field)
+ self[field] = temp * conv_factor
+ except self._read_exception, exc:
+ print exc
if field in fieldInfo:
if fieldInfo[field].particle_type:
self[field] = na.array([],dtype='int64')
Modified: branches/parallel_profiles/yt/lagos/DataReadingFuncs.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/DataReadingFuncs.py (original)
+++ branches/parallel_profiles/yt/lagos/DataReadingFuncs.py Mon Jul 28 12:59:48 2008
@@ -156,7 +156,7 @@
def pop(self, grid, field):
if grid.id in self.queue and field in self.queue[grid.id]:
- return self.queue[grid.id].pop(field)
+ return self.modify(self.queue[grid.id].pop(field))
else:
# We only read the one set and do not store it if it isn't pre-loaded
return self._read_set(grid, field)
@@ -170,13 +170,22 @@
self.queue[grid][field] = data
class DataQueuePackedHDF5(BaseDataQueue):
- _read_set = readDataHDF5
+
+ def _read_set(self, grid, field):
+ return readDataPacked(grid, field)
+
+ def modify(self, field):
+ return field.swapaxes(0,2)
def preload(self, grids, sets):
# We need to deal with files first
files_keys = defaultdict(lambda: [])
- for g in sorted(grids): files_keys[g.filename].append(g)
+ sets = list(sets)
+ for g in grids: files_keys[g.filename].append(g)
for file in files_keys:
- nodes = [g.id for g in grids]
+ mylog.debug("Starting read %s", file)
+ nodes = [g.id for g in files_keys[file]]
+ nodes.sort()
data = HDF5LightReader.ReadMultipleGrids(file, nodes, sets)
+ mylog.debug("Read %s items from %s", len(data), os.path.basename(file))
for gid in data: self.queue[gid].update(data[gid])
Modified: branches/parallel_profiles/yt/lagos/HDF5LightReader.c
==============================================================================
--- branches/parallel_profiles/yt/lagos/HDF5LightReader.c (original)
+++ branches/parallel_profiles/yt/lagos/HDF5LightReader.c Mon Jul 28 12:59:48 2008
@@ -525,7 +525,7 @@
if (cur_data == NULL) {
PyErr_Format(_hdf5ReadError,
"ReadHDF5DataSet: Error reading (%s, %s, %s)",
- filename, grid_key, grid_node);
+ filename, grid_node_name, set_name);
goto _fail;
}
PyDict_SetItem(grid_data, oset_name, (PyObject *) cur_data);
Modified: branches/parallel_profiles/yt/lagos/HierarchyType.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/HierarchyType.py (original)
+++ branches/parallel_profiles/yt/lagos/HierarchyType.py Mon Jul 28 12:59:48 2008
@@ -147,6 +147,7 @@
if len(list_of_sets) == 0:
mylog.debug("Detected packed HDF5")
self.data_style = 6
+ self.queue = DataQueuePackedHDF5()
else:
mylog.debug("Detected unpacked HDF5")
self.data_style = 5
Modified: branches/parallel_profiles/yt/lagos/Profiles.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/Profiles.py (original)
+++ branches/parallel_profiles/yt/lagos/Profiles.py Mon Jul 28 12:59:48 2008
@@ -55,18 +55,27 @@
self._pdata = {}
self._lazy_reader = lazy_reader
+ def _get_dependencies(self, fields):
+ deps = []
+ for field in fields + self._get_bin_fields():
+ deps += fieldInfo[field].get_dependencies().requested
+ return list(set(deps))
+
def _lazy_add_fields(self, fields, weight, accumulation):
self._ngrids = 0
self.__data = {} # final results will go here
self.__weight_data = {} # we need to track the weights as we go
+ if hasattr(self._data_source.hierarchy, 'queue'):
+ self._data_source.hierarchy.queue.preload(self._get_grid_objs(),
+ self._get_dependencies(fields))
for field in fields:
self.__data[field] = self._get_empty_field()
self.__weight_data[field] = self._get_empty_field()
self.__used = self._get_empty_field().astype('bool')
- pbar = get_pbar('Binning grids', len(self._data_source._grids))
+ #pbar = get_pbar('Binning grids', len(self._data_source._grids))
for gi,grid in enumerate(self._get_grids()):
self._ngrids += 1
- pbar.update(gi)
+ #pbar.update(gi)
args = self._get_bins(grid, check_cut=True)
if not args: # No bins returned for this grid, so forget it!
continue
@@ -79,7 +88,7 @@
self.__used = (self.__used | u) # running 'or'
grid.clear_data()
# When the loop completes the parallel finalizer gets called
- pbar.finish()
+ #pbar.finish()
ub = na.where(self.__used)
for field in fields:
if weight: # Now, at the end, we divide out.
@@ -90,16 +99,14 @@
def _finalize_parallel(self):
from mpi4py import MPI # I am displeased by importing here
- temp = self._get_empty_field()
+ MPI.COMM_WORLD.Barrier()
for key in self.__data:
- MPI.COMM_WORLD.Allreduce(self.__data[key], temp, MPI.SUM)
- self.__data[key] += temp
+ self.__data[key] = \
+ MPI.COMM_WORLD.Allreduce(self.__data[key], op=MPI.SUM)
for key in self.__weight_data:
- MPI.COMM_WORLD.Allreduce(self.__weight_data[key], temp, MPI.SUM)
- self.__weight_data[key] += temp
- temp = self.__used.copy().astype('int32')
- MPI.COMM_WORLD.Allreduce(self.__used.astype('int32'), temp, MPI.SUM)
- self.__used = (temp > 0)
+ self.__weight_data[key] = \
+ MPI.COMM_WORLD.Allreduce(self.__weight_data[key], op=MPI.SUM)
+ self.__used = MPI.COMM_WORLD.Allreduce(self.__used, op=MPI.SUM)
def _unlazy_add_fields(self, fields, weight, accumulation):
for field in fields:
@@ -244,6 +251,9 @@
fid.write("\n")
fid.close()
+ def _get_bin_fields(self):
+ return [self.bin_field]
+
class BinnedProfile2D(BinnedProfile):
def __init__(self, data_source,
x_n_bins, x_bin_field, x_lower_bound, x_upper_bound, x_log,
@@ -356,6 +366,9 @@
fid.write("\n")
fid.close()
+ def _get_bin_fields(self):
+ return [self.x_bin_field, self.y_bin_field]
+
class BinnedProfile3D(BinnedProfile):
def __init__(self, data_source,
x_n_bins, x_bin_field, x_lower_bound, x_upper_bound, x_log,
@@ -457,6 +470,9 @@
def write_out(self, filename, format="%0.16e"):
pass # Will eventually dump HDF5
+ def _get_bin_fields(self):
+ return [self.x_bin_field, self.y_bin_field, self.z_bin_field]
+
def store_profile(self, name, force=False):
"""
By identifying the profile with a fixed, user-input *name* we can
Modified: branches/parallel_profiles/yt/parallel_tools.py
==============================================================================
--- branches/parallel_profiles/yt/parallel_tools.py (original)
+++ branches/parallel_profiles/yt/parallel_tools.py Mon Jul 28 12:59:48 2008
@@ -37,7 +37,7 @@
print "PARALLEL COMPATIBLE:", parallel_capable
class GridIterator(object):
- def __init__(self, pobj):
+ def __init__(self, pobj, just_list = False):
self.pobj = pobj
if hasattr(pobj, '_grids') and pobj._grids is not None:
gs = pobj._grids
@@ -45,6 +45,7 @@
gs = pobj._data_source._grids
self._grids = sorted(gs, key = lambda g: g.filename)
self.ng = len(self._grids)
+ self.just_list = just_list
def __iter__(self):
self.pos = 0
@@ -63,16 +64,17 @@
This takes an object, pobj, that implements ParallelAnalysisInterface,
and then does its thing.
"""
- def __init__(self, pobj):
- GridIterator.__init__(self, pobj)
+ def __init__(self, pobj, just_list = False):
+ GridIterator.__init__(self, pobj, just_list)
self._offset = MPI.COMM_WORLD.rank
self._skip = MPI.COMM_WORLD.size
# Note that we're doing this in advance, and with a simple means
# of choosing them; more advanced methods will be explored later.
- self.my_grid_ids = na.mgrid[self._offset:self.ng:self._skip]
+ upper, lower = na.mgrid[0:self.ng:(self._skip+1)*1j][self._offset:self._offset+2]
+ self.my_grid_ids = na.mgrid[upper:lower-1].astype("int64")
def __iter__(self):
- self.pobj._initialize_parallel()
+ if not self.just_list: self.pobj._initialize_parallel()
self.pos = 0
return self
@@ -81,7 +83,7 @@
gid = self.my_grid_ids[self.pos]
self.pos += 1
return self._grids[gid]
- self.pobj._finalize_parallel()
+ if not self.just_list: self.pobj._finalize_parallel()
raise StopIteration
class ParallelAnalysisInterface(object):
@@ -93,6 +95,12 @@
return ParallelGridIterator(self)
return GridIterator(self)
+ def _get_grid_objs(self):
+ if parallel_capable and \
+ ytcfg.getboolean("yt","parallel"):
+ return ParallelGridIterator(self, True)
+ return GridIterator(self, True)
+
def _initialize_parallel(self):
pass
More information about the yt-svn
mailing list