[Yt-svn] yt-commit r727 - branches/parallel_profiles/yt/lagos
mturk at wrangler.dreamhost.com
mturk at wrangler.dreamhost.com
Fri Aug 8 15:41:10 PDT 2008
Author: mturk
Date: Fri Aug 8 15:41:08 2008
New Revision: 727
URL: http://yt.spacepope.org/changeset/727
Log:
Projections now work in parallel transparently. There's a chance some of this
stuff broke parallel profiles. Looking into it.
PlotCollections will be parallel enabled next.
Modified:
branches/parallel_profiles/yt/lagos/BaseDataTypes.py
branches/parallel_profiles/yt/lagos/DataReadingFuncs.py
branches/parallel_profiles/yt/lagos/HierarchyType.py
branches/parallel_profiles/yt/lagos/ParallelTools.py
branches/parallel_profiles/yt/lagos/Profiles.py
Modified: branches/parallel_profiles/yt/lagos/BaseDataTypes.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/BaseDataTypes.py (original)
+++ branches/parallel_profiles/yt/lagos/BaseDataTypes.py Fri Aug 8 15:41:08 2008
@@ -718,22 +718,15 @@
of that weight.
"""
Enzo2DData.__init__(self, axis, field, pf, **kwargs)
- if not source:
- self._check_region = False
- source = EnzoGridCollection(center, self.hierarchy.grids)
- self._okay_to_serialize = True
- else:
- self._okay_to_serialize = False
- self._check_region = True
- self.source = source
+ self.center = center
+ self._initialize_source()
self._grids = self.source._grids
if max_level == None:
max_level = self.hierarchy.maxLevel
- if source is not None:
- max_level = min(max_level, source.gridLevels.max())
+ if self.source is not None:
+ max_level = min(max_level, self.source.gridLevels.max())
self._max_level = max_level
self._weight = weight_field
- self.center = center
self.func = na.sum # for the future
self.__retval_coords = {}
self.__retval_fields = {}
@@ -747,6 +740,16 @@
self._refresh_data()
if self._okay_to_serialize: self._serialize()
+ def _initialize_source(self, source = None):
+ if source is None:
+ check, source = self._partition_hierarchy_2d(self.axis)
+ self._check_region = check
+ self._okay_to_serialize = (not check)
+ else:
+ self._okay_to_serialize = False
+ self._check_region = True
+ self.source = source
+
#@time_execution
def __cache_data(self):
rdf = self.hierarchy.grid.readDataFast
@@ -783,6 +786,7 @@
mylog.info("Finished calculating overlap.")
def _serialize(self):
+ if not self.should_i_write(): return
mylog.info("Serializing data...")
node_name = "%s_%s_%s" % (self.fields[0], self._weight, self.axis)
mylog.info("nodeName: %s", node_name)
@@ -819,8 +823,6 @@
def __project_level(self, level, fields):
grids_to_project = self.source.select_grids(level)
- self.hierarchy.queue.preload(grids_to_project,
- self._get_dependencies(fields))
dls, convs = self.__get_dls(grids_to_project[0], fields)
zero_out = (level != self._max_level)
pbar = get_pbar('Projecting level % 2i / % 2i ' \
@@ -932,6 +934,8 @@
coord_data = []
field_data = []
dxs = []
+ self.hierarchy.queue.preload(self.source._grids,
+ self._get_dependencies(fields))
for level in range(0, self._max_level+1):
my_coords, my_dx, my_fields = self.__project_level(level, fields)
coord_data.append(my_coords)
@@ -950,14 +954,20 @@
field_data = na.concatenate(field_data, axis=1)
dxs = na.concatenate(dxs, axis=1)
# We now convert to half-widths and center-points
- self.data['pdx'] = dxs
- self.data['px'] = (coord_data[0,:]+0.5) * self['pdx']
- self.data['py'] = (coord_data[1,:]+0.5) * self['pdx']
- self.data['pdx'] *= 0.5
- self.data['pdy'] = self.data['pdx'].copy()
+ data = {}
+ data['pdx'] = dxs
+ data['px'] = (coord_data[0,:]+0.5) * data['pdx']
+ data['py'] = (coord_data[1,:]+0.5) * data['pdx']
+ data['pdx'] *= 0.5
+ data['pdy'] = data['pdx'].copy()
+ data['fields'] = field_data
+ data['weight_field'] = coord_data[3,:]
+ # Now we run the finalizer, which is ignored if we don't need it
+ data = self._mpi_catdict(data)
+ field_data = data.pop('fields')
for fi, field in enumerate(fields):
self[field] = field_data[fi,:]
- self.data['weight_field'] = coord_data[3,:]
+ for i in data.keys(): self[i] = data.pop(i)
def add_fields(self, fields, weight = "CellMassMsun"):
pass
Modified: branches/parallel_profiles/yt/lagos/DataReadingFuncs.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/DataReadingFuncs.py (original)
+++ branches/parallel_profiles/yt/lagos/DataReadingFuncs.py Fri Aug 8 15:41:08 2008
@@ -189,3 +189,4 @@
data = HDF5LightReader.ReadMultipleGrids(file, nodes, sets)
mylog.debug("Read %s items from %s", len(data), os.path.basename(file))
for gid in data: self.queue[gid].update(data[gid])
+ mylog.debug("Finished read of %s", sets)
Modified: branches/parallel_profiles/yt/lagos/HierarchyType.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/HierarchyType.py (original)
+++ branches/parallel_profiles/yt/lagos/HierarchyType.py Fri Aug 8 15:41:08 2008
@@ -179,6 +179,7 @@
self.cutting = classobj("EnzoCuttingPlane",(EnzoCuttingPlaneBase,), dd)
self.ray = classobj("EnzoOrthoRay",(EnzoOrthoRayBase,), dd)
self.disk = classobj("EnzoCylinder",(EnzoCylinderBase,), dd)
+ self.grid_collection = classobj("EnzoGridCollection",(EnzoGridCollection,), dd)
def __initialize_data_file(self):
if not ytcfg.getboolean('lagos','serialize'): return
Modified: branches/parallel_profiles/yt/lagos/ParallelTools.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/ParallelTools.py (original)
+++ branches/parallel_profiles/yt/lagos/ParallelTools.py Fri Aug 8 15:41:08 2008
@@ -106,8 +106,55 @@
def _finalize_parallel(self):
pass
+ def _partition_hierarchy_2d(self, axis):
+ if not parallel_capable:
+ return False, self.hierarchy.grid_collection(self.center, self.hierarchy.grids)
+
+ cc = MPI.Compute_dims(MPI.COMM_WORLD.size, 2)
+ mi = MPI.COMM_WORLD.rank
+ cx, cy = na.unravel_index(mi, cc)
+ x = na.mgrid[0:1:(cc[0]+1)*1j][cx:cx+2]
+ y = na.mgrid[0:1:(cc[1]+1)*1j][cy:cy+2]
+
+ LE = na.zeros(3, dtype='float64')
+ RE = na.ones(3, dtype='float64')
+ LE[x_dict[axis]] = x[0] # It actually doesn't matter if this is x or y
+ RE[x_dict[axis]] = x[1]
+ LE[y_dict[axis]] = y[0]
+ RE[y_dict[axis]] = y[1]
+
+ return True, self.hierarchy.region(self.center, LE, RE)
+
+ def _mpi_catdict(self, data):
+ if not parallel_capable: return data
+ mylog.debug("Opening MPI Barrier on %s", MPI.COMM_WORLD.rank)
+ MPI.COMM_WORLD.Barrier()
+ if MPI.COMM_WORLD.rank == 0:
+ data = self.__mpi_recvdict(data)
+ else:
+ MPI.COMM_WORLD.Send(data, dest=0, tag=0)
+ mylog.debug("Opening MPI Broadcast on %s", MPI.COMM_WORLD.rank)
+ data = MPI.COMM_WORLD.Bcast(data, root=0)
+ MPI.COMM_WORLD.Barrier()
+ return data
+
+ def __mpi_recvdict(self, data):
+ # First we receive, then we make a new dict.
+ for i in range(1,MPI.COMM_WORLD.size):
+ buf = MPI.COMM_WORLD.Recv(source=i, tag=0)
+ for j in buf: data[j] = na.concatenate([data[j],buf[j]], axis=-1)
+ return data
+
+ def _should_i_write(self):
+ if not parallel_capable: return True
+ return (MPI.COMM_WORLD == 0)
+
+ def _mpi_allsum(self, data):
+ MPI.COMM_WORLD.Barrier()
+ return MPI.COMM_WORLD.Allreduce(data, op=MPI.SUM)
+
def _get_dependencies(self, fields):
deps = []
for field in fields:
- deps += fieldInfo[field].get_dependencies().requested
+ deps += ensure_list(fieldInfo[field].get_dependencies().requested)
return list(set(deps))
Modified: branches/parallel_profiles/yt/lagos/Profiles.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/Profiles.py (original)
+++ branches/parallel_profiles/yt/lagos/Profiles.py Fri Aug 8 15:41:08 2008
@@ -98,15 +98,11 @@
del self.__data, self.__weight_data, self.__used
def _finalize_parallel(self):
- from mpi4py import MPI # I am displeased by importing here
- MPI.COMM_WORLD.Barrier()
for key in self.__data:
- self.__data[key] = \
- MPI.COMM_WORLD.Allreduce(self.__data[key], op=MPI.SUM)
+ self.__data[key] = self._mpi_gather(self.__data[key])
for key in self.__weight_data:
- self.__weight_data[key] = \
- MPI.COMM_WORLD.Allreduce(self.__weight_data[key], op=MPI.SUM)
- self.__used = MPI.COMM_WORLD.Allreduce(self.__used, op=MPI.SUM)
+ self.__weight_data[key] = self._mpi_gather(self.__weight_data[key])
+ self.__used = self._mpi_allsum(self.__used)
def _unlazy_add_fields(self, fields, weight, accumulation):
for field in fields:
More information about the yt-svn
mailing list