[Yt-svn] commit/yt: 3 new changesets

Bitbucket commits-noreply at bitbucket.org
Wed Nov 9 06:29:07 PST 2011


3 new commits in yt:


https://bitbucket.org/yt_analysis/yt/changeset/87d3684c5448/
changeset:   87d3684c5448
branch:      yt
user:        MatthewTurk
date:        2011-11-08 21:32:18
summary:     Fixing tabs => spaces
affected #:  1 file

diff -r f376abaf6942d5050c42dc2a49f258fb60566ae5 -r 87d3684c54487d051b3cbadd1e7d9d280d7ca7d8 yt/visualization/volume_rendering/camera.py
--- a/yt/visualization/volume_rendering/camera.py
+++ b/yt/visualization/volume_rendering/camera.py
@@ -582,8 +582,8 @@
                  transfer_function = None, fields = None,
                  sub_samples = 5, log_fields = None, volume = None,
                  pf = None, use_kd=True, no_ghost=False):
-	ParallelAnalysisInterface.__init__(self)
-	if pf is not None: self.pf = pf
+        ParallelAnalysisInterface.__init__(self)
+        if pf is not None: self.pf = pf
         self.center = na.array(center, dtype='float64')
         self.radius = radius
         self.nside = nside
@@ -652,8 +652,8 @@
                  sub_samples = 5, log_fields = None, volume = None,
                  pf = None, use_kd=True, no_ghost=False,
                  rays_per_cell = 0.1, max_nside = 8192):
-	ParallelAnalysisInterface.__init__(self)
-	if pf is not None: self.pf = pf
+        ParallelAnalysisInterface.__init__(self)
+        if pf is not None: self.pf = pf
         self.center = na.array(center, dtype='float64')
         self.radius = radius
         self.use_kd = use_kd
@@ -706,8 +706,8 @@
 
 class StereoPairCamera(Camera):
     def __init__(self, original_camera, relative_separation = 0.005):
-	ParallelAnalysisInterface.__init__(self)
-	self.original_camera = original_camera
+        ParallelAnalysisInterface.__init__(self)
+        self.original_camera = original_camera
         self.relative_separation = relative_separation
 
     def split(self):



https://bitbucket.org/yt_analysis/yt/changeset/6de131ab4dbe/
changeset:   6de131ab4dbe
branch:      yt
user:        MatthewTurk
date:        2011-11-08 21:50:14
summary:     Implementing list/cat operation in par_combine_obj.
affected #:  1 file

diff -r 87d3684c54487d051b3cbadd1e7d9d280d7ca7d8 -r 6de131ab4dbe80293181ce5e6fed2e5161ff00c8 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -496,9 +496,16 @@
             return data
         elif datatype == "list" and op == "cat":
             if self.comm.rank == 0:
-                data = self.__mpi_recvlist(data)
+                storage = {0:ensure_list(data)}
+                for i in xrange(self.comm.size - 1):
+                    st = MPI.Status()
+                    d = self.comm.recv(source = MPI.ANY_SOURCE, status = st)
+                    storage[st.source] = d
+                data = []
+                for i in xrange(self.comm.size):
+                    data.extend(storage.pop(i))
             else:
-                self.comm.send(data, dest=0, tag=0)
+                self.comm.send(data, dest=0)
             mylog.debug("Opening MPI Broadcast on %s", self.comm.rank)
             data = self.comm.bcast(data, root=0)
             return data



https://bitbucket.org/yt_analysis/yt/changeset/1996bb83bd9c/
changeset:   1996bb83bd9c
branch:      yt
user:        MatthewTurk
date:        2011-11-09 15:27:59
summary:     Merged
affected #:  3 files

diff -r 6de131ab4dbe80293181ce5e6fed2e5161ff00c8 -r 1996bb83bd9c72a7d178462bfe61fa9b0f341941 yt/mods.py
--- a/yt/mods.py
+++ b/yt/mods.py
@@ -105,11 +105,15 @@
     ColorTransferFunction, PlanckTransferFunction, ProjectionTransferFunction, \
     HomogenizedVolume, Camera, off_axis_projection
 
+from yt.utilities.parallel_tools.parallel_analysis_interface import \
+    parallel_objects
+
 for name, cls in callback_registry.items():
     exec("%s = cls" % name)
 
 from yt.convenience import all_pfs, max_spheres, load, projload
 
+
 # We load plugins.  Keep in mind, this can be fairly dangerous -
 # the primary purpose is to allow people to have a set of functions
 # that get used every time that they don't have to *define* every time.


diff -r 6de131ab4dbe80293181ce5e6fed2e5161ff00c8 -r 1996bb83bd9c72a7d178462bfe61fa9b0f341941 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -49,7 +49,7 @@
     from mpi4py import MPI
     parallel_capable = (MPI.COMM_WORLD.size > 1)
     if parallel_capable:
-        mylog.info("Parallel computation enabled: %s / %s",
+        mylog.info("Global parallel computation enabled: %s / %s",
                    MPI.COMM_WORLD.rank, MPI.COMM_WORLD.size)
         ytcfg["yt","__global_parallel_rank"] = str(MPI.COMM_WORLD.rank)
         ytcfg["yt","__global_parallel_size"] = str(MPI.COMM_WORLD.size)
@@ -61,8 +61,6 @@
         #ytcfg["yt","StoreParameterFiles"] = "False"
         # Now let's make sure we have the right options set.
         if MPI.COMM_WORLD.rank > 0:
-            if ytcfg.getboolean("yt","serialize"):
-                ytcfg["yt","onlydeserialize"] = "True"
             if ytcfg.getboolean("yt","LogFile"):
                 ytcfg["yt","LogFile"] = "False"
                 yt.utilities.logger.disable_file_logging()
@@ -150,8 +148,10 @@
     def __init__(self, pobj, just_list = False, attr='_grids',
                  round_robin=False):
         ObjectIterator.__init__(self, pobj, just_list, attr=attr)
-        self._offset = MPI.COMM_WORLD.rank
-        self._skip = MPI.COMM_WORLD.size
+        # pobj has to be a ParallelAnalysisInterface, so it must have a .comm
+        # object.
+        self._offset = pobj.comm.rank
+        self._skip = pobj.comm.size
         # Note that we're doing this in advance, and with a simple means
         # of choosing them; more advanced methods will be explored later.
         if self._use_all:
@@ -182,11 +182,15 @@
         retval = None
         if self._processing or not self._distributed:
             return func(self, *args, **kwargs)
-        if self._owner == MPI.COMM_WORLD.rank:
+        comm = _get_comm((self,))
+        if self._owner == comm.rank:
             self._processing = True
             retval = func(self, *args, **kwargs)
             self._processing = False
-        retval = MPI.COMM_WORLD.bcast(retval, root=self._owner)
+        # To be sure we utilize the root= kwarg, we manually access the .comm
+        # attribute, which must be an instance of MPI.Intracomm, and call bcast
+        # on that.
+        retval = comm.comm.bcast(retval, root=self._owner)
         #MPI.COMM_WORLD.Barrier()
         return retval
     return single_proc_results
@@ -220,6 +224,13 @@
         return func(self, data, **kwargs)
     return passage
 
+def _get_comm(args):
+    if len(args) > 0 and hasattr(args[0], "comm"):
+        comm = args[0].comm
+    else:
+        comm = communication_system.communicators[-1]
+    return comm
+
 def parallel_blocking_call(func):
     """
     This decorator blocks on entry and exit of a function.
@@ -227,10 +238,11 @@
     @wraps(func)
     def barrierize(*args, **kwargs):
         mylog.debug("Entering barrier before %s", func.func_name)
-        MPI.COMM_WORLD.Barrier()
+        comm = _get_comm(args)
+        comm.barrier()
         retval = func(*args, **kwargs)
         mylog.debug("Entering barrier after %s", func.func_name)
-        MPI.COMM_WORLD.Barrier()
+        comm.barrier()
         return retval
     if parallel_capable:
         return barrierize
@@ -244,10 +256,11 @@
     """
     @wraps(f1)
     def in_order(*args, **kwargs):
-        if MPI.COMM_WORLD.rank == 0:
+        comm = _get_comm(args)
+        if comm.rank == 0:
             f1(*args, **kwargs)
-        MPI.COMM_WORLD.Barrier()
-        if MPI.COMM_WORLD.rank != 0:
+        comm.barrier()
+        if comm.rank != 0:
             f2(*args, **kwargs)
     if not parallel_capable: return f1
     return in_order
@@ -259,7 +272,8 @@
     """
     @wraps(func)
     def root_only(*args, **kwargs):
-        if MPI.COMM_WORLD.rank == 0:
+        comm = _get_comm(args)
+        if comm.rank == 0:
             try:
                 func(*args, **kwargs)
                 all_clear = 1
@@ -268,8 +282,7 @@
                 all_clear = 0
         else:
             all_clear = None
-        #MPI.COMM_WORLD.Barrier()
-        all_clear = MPI.COMM_WORLD.bcast(all_clear, root=0)
+        all_clear = comm.mpi_bcast_pickled(all_clear)
         if not all_clear: raise RuntimeError
     if parallel_capable: return root_only
     return func
@@ -334,6 +347,10 @@
     if not parallel_capable: raise RuntimeError
     my_communicator = communication_system.communicators[-1]
     my_size = my_communicator.size
+    if njobs > my_size:
+        mylog.error("You have asked for %s jobs, but you only have %s processors.",
+            njobs, my_size)
+        raise RuntimeError
     my_rank = my_communicator.rank
     all_new_comms = na.array_split(na.arange(my_size), njobs)
     for i,comm_set in enumerate(all_new_comms):
@@ -367,31 +384,29 @@
             self.communicators.append(Communicator(MPI.COMM_WORLD))
         else:
             self.communicators.append(Communicator(None))
-    def push(self, size=None, ranks=None):
-        raise NotImplementedError
-        if size is None:
-            size = len(available_ranks)
-        if len(available_ranks) < size:
-            raise RuntimeError
-        if ranks is None:
-            ranks = [available_ranks.pop() for i in range(size)]
-        
-        group = MPI.COMM_WORLD.Group.Incl(ranks)
-        new_comm = MPI.COMM_WORLD.Create(group)
-        self.communicators.append(Communicator(new_comm))
-        return new_comm
+
+    def push(self, new_comm):
+        if not isinstance(new_comm, Communicator):
+            new_comm = Communicator(new_comm)
+        self.communicators.append(new_comm)
+        self._update_parallel_state(new_comm)
 
     def push_with_ids(self, ids):
         group = self.communicators[-1].comm.Get_group().Incl(ids)
         new_comm = self.communicators[-1].comm.Create(group)
+        self.push(new_comm)
+        return new_comm
+
+    def _update_parallel_state(self, new_comm):
         from yt.config import ytcfg
         ytcfg["yt","__topcomm_parallel_size"] = str(new_comm.size)
         ytcfg["yt","__topcomm_parallel_rank"] = str(new_comm.rank)
-        self.communicators.append(Communicator(new_comm))
-        return new_comm
+        if MPI.COMM_WORLD.rank > 0 and ytcfg.getboolean("yt","serialize"):
+            ytcfg["yt","onlydeserialize"] = "True"
 
     def pop(self):
         self.communicators.pop()
+        self._update_parallel_state(self.communicators[-1])
 
 class Communicator(object):
     comm = None
@@ -495,19 +510,12 @@
             data = self.alltoallv_array(data, arr_size, offsets, sizes)
             return data
         elif datatype == "list" and op == "cat":
-            if self.comm.rank == 0:
-                storage = {0:ensure_list(data)}
-                for i in xrange(self.comm.size - 1):
-                    st = MPI.Status()
-                    d = self.comm.recv(source = MPI.ANY_SOURCE, status = st)
-                    storage[st.source] = d
-                data = []
-                for i in xrange(self.comm.size):
-                    data.extend(storage.pop(i))
-            else:
-                self.comm.send(data, dest=0)
-            mylog.debug("Opening MPI Broadcast on %s", self.comm.rank)
-            data = self.comm.bcast(data, root=0)
+            recv_data = self.comm.allgather(data)
+            # Now flatten into a single list, since this 
+            # returns us a list of lists.
+            data = []
+            while recv_data:
+                data.extend(recv_data.pop(0))
             return data
         raise NotImplementedError
 


diff -r 6de131ab4dbe80293181ce5e6fed2e5161ff00c8 -r 1996bb83bd9c72a7d178462bfe61fa9b0f341941 yt/utilities/rpdb.py
--- a/yt/utilities/rpdb.py
+++ b/yt/utilities/rpdb.py
@@ -66,6 +66,10 @@
     server.server_close()
     if size > 1:
         from mpi4py import MPI
+        # This COMM_WORLD is okay.  We want to barrierize here, while waiting
+        # for shutdown from the rest of the parallel group.  If you are running
+        # with --rpdb it is assumed you know what you are doing and you won't
+        # let this get out of hand.
         MPI.COMM_WORLD.Barrier()
 
 class pdb_handler(object):

Repository URL: https://bitbucket.org/yt_analysis/yt/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.



More information about the yt-svn mailing list