[Yt-svn] yt: Substantial speedups on DerivedQuantities run in parallel.
hg at spacepope.org
hg at spacepope.org
Sat Sep 25 12:56:53 PDT 2010
hg Repository: yt
details: yt/rev/d075b21170dd
changeset: 3413:d075b21170dd
user: Matthew Turk <matthewturk at gmail.com>
date:
Sat Sep 25 12:56:27 2010 -0700
description:
Substantial speedups on DerivedQuantities run in parallel.
* Now we use an alltoall rather than turn-by-turn communication. This applies
to anything using _mpi_catarrays, which includes parallel slicing as well.
* No more pickling of arrays for the passing of data sizes.
* Derived quantities now preload by default if run in parallel.
* Issues with (lazily-evaluated) derived quantities reading data multiple
times have been fixed by using a local cache inside the
GridChildMaskWrapper.
diffstat:
yt/data_objects/data_containers.py | 24 ++++++-----
yt/data_objects/derived_quantities.py | 30 ++++++++++++--
yt/utilities/parallel_tools/parallel_analysis_interface.py | 27 ++++++++-----
3 files changed, 55 insertions(+), 26 deletions(-)
diffs (148 lines):
diff -r 2f3e0a0d802f -r d075b21170dd yt/data_objects/data_containers.py
--- a/yt/data_objects/data_containers.py Thu Sep 23 22:35:15 2010 -0700
+++ b/yt/data_objects/data_containers.py Sat Sep 25 12:56:27 2010 -0700
@@ -764,20 +764,22 @@
points.append(self._generate_grid_coords(grid))
if len(points) == 0: points = None
else: points = na.concatenate(points)
- t = self._mpi_catarray(points)
- self['px'] = t[:,0]
- self['py'] = t[:,1]
- self['pz'] = t[:,2]
- self['pdx'] = t[:,3]
- self['pdy'] = t[:,4]
- self['pdz'] = t[:,3] # Does not matter!
+ # We have to transpose here so that _mpi_catarray works properly, as
+ # it and the alltoall assume the long axis is the last one.
+ t = self._mpi_catarray(points.transpose())
+ self['px'] = t[0,:]
+ self['py'] = t[1,:]
+ self['pz'] = t[2,:]
+ self['pdx'] = t[3,:]
+ self['pdy'] = t[4,:]
+ self['pdz'] = t[3,:] # Does not matter!
# Now we set the *actual* coordinates
- self[axis_names[x_dict[self.axis]]] = t[:,0]
- self[axis_names[y_dict[self.axis]]] = t[:,1]
- self[axis_names[self.axis]] = t[:,2]
+ self[axis_names[x_dict[self.axis]]] = t[0,:]
+ self[axis_names[y_dict[self.axis]]] = t[1,:]
+ self[axis_names[self.axis]] = t[2,:]
- self.ActiveDimensions = (t.shape[0], 1, 1)
+ self.ActiveDimensions = (t.shape[1], 1, 1)
def _get_list_of_grids(self):
goodI = ((self.pf.h.grid_right_edge[:,self.axis] > self.coord)
diff -r 2f3e0a0d802f -r d075b21170dd yt/data_objects/derived_quantities.py
--- a/yt/data_objects/derived_quantities.py Thu Sep 23 22:35:15 2010 -0700
+++ b/yt/data_objects/derived_quantities.py Sat Sep 25 12:56:27 2010 -0700
@@ -29,11 +29,12 @@
from yt.funcs import *
+from yt.config import ytcfg
+from yt.data_objects.field_info_container import \
+ FieldDetector
from yt.utilities.data_point_utilities import FindBindingEnergy
from yt.utilities.parallel_tools.parallel_analysis_interface import \
ParallelAnalysisInterface
-from yt.funcs import \
- get_pbar, wraps
__CUDA_BLOCK_SIZE = 256
@@ -43,10 +44,21 @@
def __init__(self, grid, data_source):
self.grid = grid
self.data_source = data_source
+ # We have a local cache so that *within* a call to the DerivedQuantity
+ # function, we only read each field once. Otherwise, when preloading
+ # the field will be popped and removed and lost if the underlying data
+ # source's _get_data_from_grid method is wrapped by restore_state.
+ # This is common. So, if data[something] is accessed multiple times by
+ # a single quantity, the second time will re-read the data the slow
+ # way.
+ self.local_cache = {}
def __getattr__(self, attr):
return getattr(self.grid, attr)
def __getitem__(self, item):
- return self.data_source._get_data_from_grid(self.grid, item)
+ if item not in self.local_cache:
+ data = self.data_source._get_data_from_grid(self.grid, item)
+ self.local_cache[item] = data
+ return self.local_cache[item]
class DerivedQuantity(ParallelAnalysisInterface):
def __init__(self, collection, name, function,
@@ -64,7 +76,7 @@
def __call__(self, *args, **kwargs):
lazy_reader = kwargs.pop('lazy_reader', True)
- preload = kwargs.pop('preload', False)
+ preload = kwargs.pop('preload', ytcfg.getboolean("yt","__parallel"))
if preload:
if not lazy_reader: mylog.debug("Turning on lazy_reader because of preload")
lazy_reader = True
@@ -89,7 +101,15 @@
return self.c_func(self._data_source, *self.retvals)
def _finalize_parallel(self):
- self.retvals = [na.array(self._mpi_catlist(my_list)) for my_list in self.retvals]
+ # Note that we do some fancy footwork here.
+ # _mpi_catarray and its affiliated alltoall function
+ # assume that the *long* axis is the last one. However,
+ # our long axis is the first one!
+ rv = []
+ for my_list in self.retvals:
+ data = na.array(my_list).transpose()
+ rv.append(self._mpi_catarray(data).transpose())
+ self.retvals = rv
def _call_func_unlazy(self, args, kwargs):
retval = self.func(self._data_source, *args, **kwargs)
diff -r 2f3e0a0d802f -r d075b21170dd yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py Thu Sep 23 22:35:15 2010 -0700
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py Sat Sep 25 12:56:27 2010 -0700
@@ -882,8 +882,13 @@
field_keys.sort()
size = data[field_keys[0]].shape[-1]
# MPI_Scan is an inclusive scan
- sizes = MPI.COMM_WORLD.alltoall( [size]*MPI.COMM_WORLD.size )
- offsets = na.add.accumulate([0] + sizes)[:-1]
+ outsizes = na.array([size] * MPI.COMM_WORLD.size, dtype='int64')
+ sizes = outsizes.copy()
+ MPI.COMM_WORLD.Alltoall( [outsizes, MPI.LONG], [sizes, MPI.LONG] )
+ # This nested concatenate is to get the shapes to work out correctly;
+ # if we just add [0] to sizes, it will broadcast a summation, not a
+ # concatenation.
+ offsets = na.add.accumulate(na.concatenate([[0], sizes]))[:-1]
arr_size = MPI.COMM_WORLD.allreduce(size, op=MPI.SUM)
for key in field_keys:
dd = data[key]
@@ -1299,14 +1304,16 @@
@parallel_passthrough
def _mpi_catarray(self, data):
- self._barrier()
- if MPI.COMM_WORLD.rank == 0:
- data = self.__mpi_recvarrays(data)
- else:
- _send_array(data, dest=0, tag=0)
- mylog.debug("Opening MPI Broadcast on %s", MPI.COMM_WORLD.rank)
- data = _bcast_array(data, root=0)
- self._barrier()
+ size = data.shape[-1]
+ outsizes = na.array([size] * MPI.COMM_WORLD.size, dtype='int64')
+ sizes = outsizes.copy()
+ MPI.COMM_WORLD.Alltoall( [outsizes, MPI.LONG], [sizes, MPI.LONG] )
+ # This nested concatenate is to get the shapes to work out correctly;
+ # if we just add [0] to sizes, it will broadcast a summation, not a
+ # concatenation.
+ offsets = na.add.accumulate(na.concatenate([[0], sizes]))[:-1]
+ arr_size = MPI.COMM_WORLD.allreduce(size, op=MPI.SUM)
+ data = _alltoallv_array(data, arr_size, offsets, sizes)
return data
@parallel_passthrough
More information about the yt-svn
mailing list