[Yt-svn] yt-commit r684 - in branches/parallel_profiles/yt: . lagos

mturk at wrangler.dreamhost.com mturk at wrangler.dreamhost.com
Fri Jul 18 11:05:39 PDT 2008


Author: mturk
Date: Fri Jul 18 11:05:39 2008
New Revision: 684
URL: http://yt.spacepope.org/changeset/684

Log:
Initial, mostly non-functional import of some code I've sketched out.



Added:
   branches/parallel_profiles/yt/parallel_tools.py
Modified:
   branches/parallel_profiles/yt/lagos/Profiles.py
   branches/parallel_profiles/yt/lagos/__init__.py

Modified: branches/parallel_profiles/yt/lagos/Profiles.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/Profiles.py	(original)
+++ branches/parallel_profiles/yt/lagos/Profiles.py	Fri Jul 18 11:05:39 2008
@@ -48,10 +48,11 @@
 
 # Note we do not inherit from EnzoData.
 # We could, but I think we instead want to deal with the root datasource.
-class BinnedProfile:
+class BinnedProfile(ParallelAnalysisInterface):
     def __init__(self, data_source, lazy_reader):
         self._data_source = data_source
         self._data = {}
+        self._pdata = {}
         self._lazy_reader = lazy_reader
 
     def _lazy_add_fields(self, fields, weight, accumulation):
@@ -62,7 +63,7 @@
             weight_data[field] = self._get_empty_field()
         used = self._get_empty_field().astype('bool')
         pbar = get_pbar('Binning grids', len(self._data_source._grids))
-        for gi,grid in enumerate(self._data_source._grids):
+        for gi,grid in enumerate(self._get_grids(data, weight_data, used)):
             pbar.update(gi)
             args = self._get_bins(grid, check_cut=True)
             if not args: # No bins returned for this grid, so forget it!
@@ -77,6 +78,9 @@
             grid.clear_data()
         pbar.finish()
         ub = na.where(used)
+        self._finalize(data, weight_data, fields, weight, ub, used)
+
+    def _finalize(self, data, weight_data, fields, weight, ub, used):
         for field in fields:
             if weight: # Now, at the end, we divide out.
                 data[field][ub] /= weight_data[field][ub]

Modified: branches/parallel_profiles/yt/lagos/__init__.py
==============================================================================
--- branches/parallel_profiles/yt/lagos/__init__.py	(original)
+++ branches/parallel_profiles/yt/lagos/__init__.py	Fri Jul 18 11:05:39 2008
@@ -28,6 +28,7 @@
 
 from yt.config import ytcfg
 from yt.logger import lagosLogger as mylog
+from yt.parallel_tools import *
 
 try:
     from pyhdf_np import SD # NumPy
@@ -63,8 +64,7 @@
         pass
 
 if ytcfg.getboolean("lagos","usefortran"):
-    pass
-    #import EnzoFortranRoutines
+    import EnzoFortranRoutines
 
 # Now we import all the subfiles
 

Added: branches/parallel_profiles/yt/parallel_tools.py
==============================================================================
--- (empty file)
+++ branches/parallel_profiles/yt/parallel_tools.py	Fri Jul 18 11:05:39 2008
@@ -0,0 +1,98 @@
+"""
+Parallel data mapping techniques for yt
+
+ at author: U{Matthew Turk<http://www.stanford.edu/~mturk/>}
+ at organization: U{KIPAC<http://www-group.slac.stanford.edu/KIPAC/>}
+ at contact: U{mturk at slac.stanford.edu<mailto:mturk at slac.stanford.edu>}
+ at license:
+  Copyright (C) 2008 Matthew Turk.  All Rights Reserved.
+
+  This file is part of yt.
+
+  yt is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; either version 3 of the License, or
+  (at your option) any later version.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+"""
+
+import itertools
+from yt.arraytypes import *
+
+try:
+    from mpi4py import MPI
+    parallel_capable = True
+except ImportError:
+    parallel_capable = False
+
+class GridIterator(object):
+    def __init__(self, pobj):
+        self.pobj = pobj
+        if hasattr(pobj, '_grids') and pobj._grids is not None:
+            print "I Killed"
+            self._grids = pobj._grids
+        else:
+            print "Yo"
+            self._grids = pobj._data_source._grids
+        print pobj._data_source._grids, self._grids
+        self.ng = len(self._grids)
+
+    def __iter__(self):
+        self.pos = 0
+        return self
+
+    def next(self):
+        # We do this manually in case
+        # something else asks for us.pos
+        if self.pos < len(self._grids):
+            self.pos += 1
+            return self._grids[self.pos - 1]
+        raise StopIteration
+
+class ParallelGridIterator(GridIterator):
+    """
+    This takes an object, pobj, that implements ParallelAnalysisInterface,
+    and then does its thing.
+    """
+    def __init__(self, pobj):
+        GridIterator.__init__(self, pobj)
+        self._offset = MPI.COMM_WORLD.rank
+        self._skip = MPI.COMM_WORLD.size
+        # Note that we're doing this in advance, and with a simple means
+        # of choosing them; more advanced methods will be explored later.
+        self.my_grid_ids = na.mgrid[self._offset:self.ng:self._skip]
+        
+    def __iter__(self):
+        self.pobj._initialize_parallel()
+        self.pos = 0
+        return self
+
+    def next(self):
+        if self.pos < len(self.my_grid_ids):
+            gid = self.my_grids_ids[self.pos]
+            self.pos += 1
+            return self._grids[gid]
+        self.pobj._finalize_parallel()
+        raise StopIteration
+
+class ParallelAnalysisInterface(object):
+    _grids = None
+
+    def _get_grids(self):
+        if parallel_capable and \
+           ytcfg.get_boolean("yt","parallel"):
+            return ParallelGridIterator(self)
+        return GridIterator(self)
+
+    def _initialize_parallel(self):
+        pass
+
+    def _finalize_parallel(self):
+        pass



More information about the yt-svn mailing list