[Yt-svn] commit/yt: 5 new changesets

Bitbucket commits-noreply at bitbucket.org
Tue Nov 8 14:48:17 PST 2011


5 new commits in yt:


https://bitbucket.org/yt_analysis/yt/changeset/397c5cdd41d7/
changeset:   397c5cdd41d7
branch:      yt
user:        MatthewTurk
date:        2011-11-08 16:19:34
summary:     First pass at removing all remaining unnecessary COMM_WORLD usages.
affected #:  2 files

diff -r f376abaf6942d5050c42dc2a49f258fb60566ae5 -r 397c5cdd41d722f47494415c465f0dff617d83f8 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -49,7 +49,7 @@
     from mpi4py import MPI
     parallel_capable = (MPI.COMM_WORLD.size > 1)
     if parallel_capable:
-        mylog.info("Parallel computation enabled: %s / %s",
+        mylog.info("Global parallel computation enabled: %s / %s",
                    MPI.COMM_WORLD.rank, MPI.COMM_WORLD.size)
         ytcfg["yt","__global_parallel_rank"] = str(MPI.COMM_WORLD.rank)
         ytcfg["yt","__global_parallel_size"] = str(MPI.COMM_WORLD.size)
@@ -61,8 +61,6 @@
         #ytcfg["yt","StoreParameterFiles"] = "False"
         # Now let's make sure we have the right options set.
         if MPI.COMM_WORLD.rank > 0:
-            if ytcfg.getboolean("yt","serialize"):
-                ytcfg["yt","onlydeserialize"] = "True"
             if ytcfg.getboolean("yt","LogFile"):
                 ytcfg["yt","LogFile"] = "False"
                 yt.utilities.logger.disable_file_logging()
@@ -150,8 +148,10 @@
     def __init__(self, pobj, just_list = False, attr='_grids',
                  round_robin=False):
         ObjectIterator.__init__(self, pobj, just_list, attr=attr)
-        self._offset = MPI.COMM_WORLD.rank
-        self._skip = MPI.COMM_WORLD.size
+        # pobj has to be a ParallelAnalysisInterface, so it must have a .comm
+        # object.
+        self._offset = pobj.comm.rank
+        self._skip = pobj.comm.size
         # Note that we're doing this in advance, and with a simple means
         # of choosing them; more advanced methods will be explored later.
         if self._use_all:
@@ -182,11 +182,14 @@
         retval = None
         if self._processing or not self._distributed:
             return func(self, *args, **kwargs)
-        if self._owner == MPI.COMM_WORLD.rank:
+        if self._owner == self.comm.rank:
             self._processing = True
             retval = func(self, *args, **kwargs)
             self._processing = False
-        retval = MPI.COMM_WORLD.bcast(retval, root=self._owner)
+        # To be sure we utilize the root= kwarg, we manually access the .comm
+        # attribute, which must be an instance of MPI.Intracomm, and call bcast
+        # on that.
+        retval = self.comm.comm.bcast(retval, root=self._owner)
         #MPI.COMM_WORLD.Barrier()
         return retval
     return single_proc_results
@@ -220,6 +223,12 @@
         return func(self, data, **kwargs)
     return passage
 
+def _get_comm(args):
+    if len(args) > 0 and hasattr(args[0], "comm"):
+        comm = args[0].comm
+    else:
+        comm = communication_system.communicators[-1]
+
 def parallel_blocking_call(func):
     """
     This decorator blocks on entry and exit of a function.
@@ -227,10 +236,11 @@
     @wraps(func)
     def barrierize(*args, **kwargs):
         mylog.debug("Entering barrier before %s", func.func_name)
-        MPI.COMM_WORLD.Barrier()
+        comm = _get_comm(args)
+        comm.barrier()
         retval = func(*args, **kwargs)
         mylog.debug("Entering barrier after %s", func.func_name)
-        MPI.COMM_WORLD.Barrier()
+        comm.barrier()
         return retval
     if parallel_capable:
         return barrierize
@@ -244,10 +254,11 @@
     """
     @wraps(f1)
     def in_order(*args, **kwargs):
-        if MPI.COMM_WORLD.rank == 0:
+        comm = _get_comm(args)
+        if comm.rank == 0:
             f1(*args, **kwargs)
-        MPI.COMM_WORLD.Barrier()
-        if MPI.COMM_WORLD.rank != 0:
+        comm.barrier()
+        if comm.rank != 0:
             f2(*args, **kwargs)
     if not parallel_capable: return f1
     return in_order
@@ -259,7 +270,8 @@
     """
     @wraps(func)
     def root_only(*args, **kwargs):
-        if MPI.COMM_WORLD.rank == 0:
+        comm = _get_comm(args)
+        if comm.rank == 0:
             try:
                 func(*args, **kwargs)
                 all_clear = 1
@@ -268,8 +280,7 @@
                 all_clear = 0
         else:
             all_clear = None
-        #MPI.COMM_WORLD.Barrier()
-        all_clear = MPI.COMM_WORLD.bcast(all_clear, root=0)
+        all_clear = comm.mpi_bcast_pickled(all_clear, root=0)
         if not all_clear: raise RuntimeError
     if parallel_capable: return root_only
     return func
@@ -367,31 +378,29 @@
             self.communicators.append(Communicator(MPI.COMM_WORLD))
         else:
             self.communicators.append(Communicator(None))
-    def push(self, size=None, ranks=None):
-        raise NotImplementedError
-        if size is None:
-            size = len(available_ranks)
-        if len(available_ranks) < size:
-            raise RuntimeError
-        if ranks is None:
-            ranks = [available_ranks.pop() for i in range(size)]
-        
-        group = MPI.COMM_WORLD.Group.Incl(ranks)
-        new_comm = MPI.COMM_WORLD.Create(group)
-        self.communicators.append(Communicator(new_comm))
-        return new_comm
+
+    def push(self, new_comm):
+        if not isinstance(new_comm, Communicator):
+            new_comm = Communicator(new_comm)
+        self.communicators.append(new_comm)
+        self._update_parallel_state(new_comm)
 
     def push_with_ids(self, ids):
         group = self.communicators[-1].comm.Get_group().Incl(ids)
         new_comm = self.communicators[-1].comm.Create(group)
+        self.push(new_comm)
+        return new_comm
+
+    def _update_parallel_state(self, new_comm):
         from yt.config import ytcfg
         ytcfg["yt","__topcomm_parallel_size"] = str(new_comm.size)
         ytcfg["yt","__topcomm_parallel_rank"] = str(new_comm.rank)
-        self.communicators.append(Communicator(new_comm))
-        return new_comm
+        if MPI.COMM_WORLD.rank > 0 and ytcfg.getboolean("yt","serialize"):
+            ytcfg["yt","onlydeserialize"] = "True"
 
     def pop(self):
         self.communicators.pop()
+        self._update_parallel_state(self.communicators[-1])
 
 class Communicator(object):
     comm = None


diff -r f376abaf6942d5050c42dc2a49f258fb60566ae5 -r 397c5cdd41d722f47494415c465f0dff617d83f8 yt/utilities/rpdb.py
--- a/yt/utilities/rpdb.py
+++ b/yt/utilities/rpdb.py
@@ -66,6 +66,10 @@
     server.server_close()
     if size > 1:
         from mpi4py import MPI
+        # This COMM_WORLD is okay.  We want to barrierize here, while waiting
+        # for shutdown from the rest of the parallel group.  If you are running
+        # with --rpdb it is assumed you know what you are doing and you won't
+        # let this get out of hand.
         MPI.COMM_WORLD.Barrier()
 
 class pdb_handler(object):



https://bitbucket.org/yt_analysis/yt/changeset/ff8eddc95c0a/
changeset:   ff8eddc95c0a
branch:      yt
user:        MatthewTurk
date:        2011-11-08 16:35:37
summary:     A few more fixes, and an explicit import of parallel_objects into yt.mods.
affected #:  2 files

diff -r 397c5cdd41d722f47494415c465f0dff617d83f8 -r ff8eddc95c0abe6d72835397b9bad72d3c2afb2f yt/mods.py
--- a/yt/mods.py
+++ b/yt/mods.py
@@ -105,11 +105,15 @@
     ColorTransferFunction, PlanckTransferFunction, ProjectionTransferFunction, \
     HomogenizedVolume, Camera, off_axis_projection
 
+from yt.utilities.parallel_tools.parallel_analysis_interface import \
+    parallel_objects
+
 for name, cls in callback_registry.items():
     exec("%s = cls" % name)
 
 from yt.convenience import all_pfs, max_spheres, load, projload
 
+
 # We load plugins.  Keep in mind, this can be fairly dangerous -
 # the primary purpose is to allow people to have a set of functions
 # that get used every time that they don't have to *define* every time.


diff -r 397c5cdd41d722f47494415c465f0dff617d83f8 -r ff8eddc95c0abe6d72835397b9bad72d3c2afb2f yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -182,14 +182,15 @@
         retval = None
         if self._processing or not self._distributed:
             return func(self, *args, **kwargs)
-        if self._owner == self.comm.rank:
+        comm = _get_comm((self,))
+        if self._owner == comm.rank:
             self._processing = True
             retval = func(self, *args, **kwargs)
             self._processing = False
         # To be sure we utilize the root= kwarg, we manually access the .comm
         # attribute, which must be an instance of MPI.Intracomm, and call bcast
         # on that.
-        retval = self.comm.comm.bcast(retval, root=self._owner)
+        retval = comm.comm.bcast(retval, root=self._owner)
         #MPI.COMM_WORLD.Barrier()
         return retval
     return single_proc_results
@@ -228,6 +229,7 @@
         comm = args[0].comm
     else:
         comm = communication_system.communicators[-1]
+    return comm
 
 def parallel_blocking_call(func):
     """
@@ -280,7 +282,7 @@
                 all_clear = 0
         else:
             all_clear = None
-        all_clear = comm.mpi_bcast_pickled(all_clear, root=0)
+        all_clear = comm.mpi_bcast_pickled(all_clear)
         if not all_clear: raise RuntimeError
     if parallel_capable: return root_only
     return func



https://bitbucket.org/yt_analysis/yt/changeset/0d9b8909abbe/
changeset:   0d9b8909abbe
branch:      yt
user:        MatthewTurk
date:        2011-11-08 19:40:13
summary:     Adding a check for the number of jobs versus the size.
affected #:  1 file

diff -r ff8eddc95c0abe6d72835397b9bad72d3c2afb2f -r 0d9b8909abbe499355a0daf61cca73af17fa70df yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -347,6 +347,10 @@
     if not parallel_capable: raise RuntimeError
     my_communicator = communication_system.communicators[-1]
     my_size = my_communicator.size
+    if njobs > my_size:
+        mylog.error("You have asked for %s jobs, but you only have %s processors.",
+            njobs, my_size)
+        raise RuntimeError
     my_rank = my_communicator.rank
     all_new_comms = na.array_split(na.arange(my_size), njobs)
     for i,comm_set in enumerate(all_new_comms):



https://bitbucket.org/yt_analysis/yt/changeset/fdedc447ab74/
changeset:   fdedc447ab74
branch:      yt
user:        brittonsmith
date:        2011-11-08 23:19:47
summary:     Replaced the list and cat functionality in par_combine_object that used the
__mpi_recvlist that was clobber with an allgather.
affected #:  1 file

diff -r 0d9b8909abbe499355a0daf61cca73af17fa70df -r fdedc447ab741923db466d0fd749bee8fcfae28d yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -510,12 +510,12 @@
             data = self.alltoallv_array(data, arr_size, offsets, sizes)
             return data
         elif datatype == "list" and op == "cat":
-            if self.comm.rank == 0:
-                data = self.__mpi_recvlist(data)
-            else:
-                self.comm.send(data, dest=0, tag=0)
-            mylog.debug("Opening MPI Broadcast on %s", self.comm.rank)
-            data = self.comm.bcast(data, root=0)
+            recv_data = self.comm.allgather(data)
+            # Now flatten into a single list, since this 
+            # returns us a list of lists.
+            data = []
+            while recv_data:
+                data.extend(recv_data.pop(0))
             return data
         raise NotImplementedError
 



https://bitbucket.org/yt_analysis/yt/changeset/c8283ef86376/
changeset:   c8283ef86376
branch:      yt
user:        brittonsmith
date:        2011-11-08 23:20:06
summary:     Merged.
affected #:  0 files

Repository URL: https://bitbucket.org/yt_analysis/yt/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.



More information about the yt-svn mailing list