[Yt-svn] yt: Substantial speedups on DerivedQuantities run in parallel.

hg at spacepope.org hg at spacepope.org
Sat Sep 25 12:56:53 PDT 2010


hg Repository: yt
details:   yt/rev/d075b21170dd
changeset: 3413:d075b21170dd
user:      Matthew Turk <matthewturk at gmail.com>
date:
Sat Sep 25 12:56:27 2010 -0700
description:
Substantial speedups on DerivedQuantities run in parallel.

 * Now we use an alltoall rather than turn-by-turn communication.  This applies
   to anything using _mpi_catarrays, which includes parallel slicing as well.
 * No more pickling of arrays for the passing of data sizes.
 * Derived quantities now preload by default if run in parallel.
 * Issues with (lazily-evaluated) derived quantities reading data multiple
   times have been fixed by using a local cache inside the
   GridChildMaskWrapper.

diffstat:

 yt/data_objects/data_containers.py                         |  24 ++++++-----
 yt/data_objects/derived_quantities.py                      |  30 ++++++++++++--
 yt/utilities/parallel_tools/parallel_analysis_interface.py |  27 ++++++++-----
 3 files changed, 55 insertions(+), 26 deletions(-)

diffs (148 lines):

diff -r 2f3e0a0d802f -r d075b21170dd yt/data_objects/data_containers.py
--- a/yt/data_objects/data_containers.py	Thu Sep 23 22:35:15 2010 -0700
+++ b/yt/data_objects/data_containers.py	Sat Sep 25 12:56:27 2010 -0700
@@ -764,20 +764,22 @@
             points.append(self._generate_grid_coords(grid))
         if len(points) == 0: points = None
         else: points = na.concatenate(points)
-        t = self._mpi_catarray(points)
-        self['px'] = t[:,0]
-        self['py'] = t[:,1]
-        self['pz'] = t[:,2]
-        self['pdx'] = t[:,3]
-        self['pdy'] = t[:,4]
-        self['pdz'] = t[:,3] # Does not matter!
+        # We have to transpose here so that _mpi_catarray works properly, as
+        # it and the alltoall assume the long axis is the last one.
+        t = self._mpi_catarray(points.transpose())
+        self['px'] = t[0,:]
+        self['py'] = t[1,:]
+        self['pz'] = t[2,:]
+        self['pdx'] = t[3,:]
+        self['pdy'] = t[4,:]
+        self['pdz'] = t[3,:] # Does not matter!
 
         # Now we set the *actual* coordinates
-        self[axis_names[x_dict[self.axis]]] = t[:,0]
-        self[axis_names[y_dict[self.axis]]] = t[:,1]
-        self[axis_names[self.axis]] = t[:,2]
+        self[axis_names[x_dict[self.axis]]] = t[0,:]
+        self[axis_names[y_dict[self.axis]]] = t[1,:]
+        self[axis_names[self.axis]] = t[2,:]
 
-        self.ActiveDimensions = (t.shape[0], 1, 1)
+        self.ActiveDimensions = (t.shape[1], 1, 1)
 
     def _get_list_of_grids(self):
         goodI = ((self.pf.h.grid_right_edge[:,self.axis] > self.coord)
diff -r 2f3e0a0d802f -r d075b21170dd yt/data_objects/derived_quantities.py
--- a/yt/data_objects/derived_quantities.py	Thu Sep 23 22:35:15 2010 -0700
+++ b/yt/data_objects/derived_quantities.py	Sat Sep 25 12:56:27 2010 -0700
@@ -29,11 +29,12 @@
 
 from yt.funcs import *
 
+from yt.config import ytcfg
+from yt.data_objects.field_info_container import \
+    FieldDetector
 from yt.utilities.data_point_utilities import FindBindingEnergy
 from yt.utilities.parallel_tools.parallel_analysis_interface import \
     ParallelAnalysisInterface
-from yt.funcs import \
-    get_pbar, wraps
 
 __CUDA_BLOCK_SIZE = 256
 
@@ -43,10 +44,21 @@
     def __init__(self, grid, data_source):
         self.grid = grid
         self.data_source = data_source
+        # We have a local cache so that *within* a call to the DerivedQuantity
+        # function, we only read each field once.  Otherwise, when preloading
+        # the field will be popped and removed and lost if the underlying data
+        # source's _get_data_from_grid method is wrapped by restore_state.
+        # This is common.  So, if data[something] is accessed multiple times by
+        # a single quantity, the second time will re-read the data the slow
+        # way.
+        self.local_cache = {}
     def __getattr__(self, attr):
         return getattr(self.grid, attr)
     def __getitem__(self, item):
-        return self.data_source._get_data_from_grid(self.grid, item)
+        if item not in self.local_cache:
+            data = self.data_source._get_data_from_grid(self.grid, item)
+            self.local_cache[item] = data
+        return self.local_cache[item]
 
 class DerivedQuantity(ParallelAnalysisInterface):
     def __init__(self, collection, name, function,
@@ -64,7 +76,7 @@
 
     def __call__(self, *args, **kwargs):
         lazy_reader = kwargs.pop('lazy_reader', True)
-        preload = kwargs.pop('preload', False)
+        preload = kwargs.pop('preload', ytcfg.getboolean("yt","__parallel"))
         if preload:
             if not lazy_reader: mylog.debug("Turning on lazy_reader because of preload")
             lazy_reader = True
@@ -89,7 +101,15 @@
         return self.c_func(self._data_source, *self.retvals)
 
     def _finalize_parallel(self):
-        self.retvals = [na.array(self._mpi_catlist(my_list)) for my_list in self.retvals]
+        # Note that we do some fancy footwork here.
+        # _mpi_catarray and its affiliated alltoall function
+        # assume that the *long* axis is the last one.  However,
+        # our long axis is the first one!
+        rv = []
+        for my_list in self.retvals:
+            data = na.array(my_list).transpose()
+            rv.append(self._mpi_catarray(data).transpose())
+        self.retvals = rv
         
     def _call_func_unlazy(self, args, kwargs):
         retval = self.func(self._data_source, *args, **kwargs)
diff -r 2f3e0a0d802f -r d075b21170dd yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py	Thu Sep 23 22:35:15 2010 -0700
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py	Sat Sep 25 12:56:27 2010 -0700
@@ -882,8 +882,13 @@
         field_keys.sort()
         size = data[field_keys[0]].shape[-1]
         # MPI_Scan is an inclusive scan
-        sizes = MPI.COMM_WORLD.alltoall( [size]*MPI.COMM_WORLD.size )
-        offsets = na.add.accumulate([0] + sizes)[:-1]
+        outsizes = na.array([size] * MPI.COMM_WORLD.size, dtype='int64')
+        sizes = outsizes.copy()
+        MPI.COMM_WORLD.Alltoall( [outsizes, MPI.LONG], [sizes, MPI.LONG] )
+        # This nested concatenate is to get the shapes to work out correctly;
+        # if we just add [0] to sizes, it will broadcast a summation, not a
+        # concatenation.
+        offsets = na.add.accumulate(na.concatenate([[0], sizes]))[:-1]
         arr_size = MPI.COMM_WORLD.allreduce(size, op=MPI.SUM)
         for key in field_keys:
             dd = data[key]
@@ -1299,14 +1304,16 @@
 
     @parallel_passthrough
     def _mpi_catarray(self, data):
-        self._barrier()
-        if MPI.COMM_WORLD.rank == 0:
-            data = self.__mpi_recvarrays(data)
-        else:
-            _send_array(data, dest=0, tag=0)
-        mylog.debug("Opening MPI Broadcast on %s", MPI.COMM_WORLD.rank)
-        data = _bcast_array(data, root=0)
-        self._barrier()
+        size = data.shape[-1]
+        outsizes = na.array([size] * MPI.COMM_WORLD.size, dtype='int64')
+        sizes = outsizes.copy()
+        MPI.COMM_WORLD.Alltoall( [outsizes, MPI.LONG], [sizes, MPI.LONG] )
+        # This nested concatenate is to get the shapes to work out correctly;
+        # if we just add [0] to sizes, it will broadcast a summation, not a
+        # concatenation.
+        offsets = na.add.accumulate(na.concatenate([[0], sizes]))[:-1]
+        arr_size = MPI.COMM_WORLD.allreduce(size, op=MPI.SUM)
+        data = _alltoallv_array(data, arr_size, offsets, sizes)
         return data
 
     @parallel_passthrough



More information about the yt-svn mailing list