[Yt-svn] commit/yt: 3 new changesets
Bitbucket
commits-noreply at bitbucket.org
Tue Nov 8 12:59:03 PST 2011
3 new commits in yt:
https://bitbucket.org/yt_analysis/yt/changeset/d87a904b9273/
changeset: d87a904b9273
branch: yt
user: MatthewTurk
date: 2011-11-08 16:19:34
summary: First pass at removing all remaining unnecessary COMM_WORLD usages.
affected #: 2 files
diff -r 4e417d8c0176dc4c90548407558a7d9767971d04 -r d87a904b9273745c790d990e2e83d411ece2d00b yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -49,7 +49,7 @@
from mpi4py import MPI
parallel_capable = (MPI.COMM_WORLD.size > 1)
if parallel_capable:
- mylog.info("Parallel computation enabled: %s / %s",
+ mylog.info("Global parallel computation enabled: %s / %s",
MPI.COMM_WORLD.rank, MPI.COMM_WORLD.size)
ytcfg["yt","__global_parallel_rank"] = str(MPI.COMM_WORLD.rank)
ytcfg["yt","__global_parallel_size"] = str(MPI.COMM_WORLD.size)
@@ -61,8 +61,6 @@
#ytcfg["yt","StoreParameterFiles"] = "False"
# Now let's make sure we have the right options set.
if MPI.COMM_WORLD.rank > 0:
- if ytcfg.getboolean("yt","serialize"):
- ytcfg["yt","onlydeserialize"] = "True"
if ytcfg.getboolean("yt","LogFile"):
ytcfg["yt","LogFile"] = "False"
yt.utilities.logger.disable_file_logging()
@@ -150,8 +148,10 @@
def __init__(self, pobj, just_list = False, attr='_grids',
round_robin=False):
ObjectIterator.__init__(self, pobj, just_list, attr=attr)
- self._offset = MPI.COMM_WORLD.rank
- self._skip = MPI.COMM_WORLD.size
+ # pobj has to be a ParallelAnalysisInterface, so it must have a .comm
+ # object.
+ self._offset = pobj.comm.rank
+ self._skip = pobj.comm.size
# Note that we're doing this in advance, and with a simple means
# of choosing them; more advanced methods will be explored later.
if self._use_all:
@@ -182,11 +182,14 @@
retval = None
if self._processing or not self._distributed:
return func(self, *args, **kwargs)
- if self._owner == MPI.COMM_WORLD.rank:
+ if self._owner == self.comm.rank:
self._processing = True
retval = func(self, *args, **kwargs)
self._processing = False
- retval = MPI.COMM_WORLD.bcast(retval, root=self._owner)
+ # To be sure we utilize the root= kwarg, we manually access the .comm
+ # attribute, which must be an instance of MPI.Intracomm, and call bcast
+ # on that.
+ retval = self.comm.comm.bcast(retval, root=self._owner)
#MPI.COMM_WORLD.Barrier()
return retval
return single_proc_results
@@ -220,6 +223,12 @@
return func(self, data, **kwargs)
return passage
+def _get_comm(args):
+ if len(args) > 0 and hasattr(args[0], "comm"):
+ comm = args[0].comm
+ else:
+ comm = communication_system.communicators[-1]
+
def parallel_blocking_call(func):
"""
This decorator blocks on entry and exit of a function.
@@ -227,10 +236,11 @@
@wraps(func)
def barrierize(*args, **kwargs):
mylog.debug("Entering barrier before %s", func.func_name)
- MPI.COMM_WORLD.Barrier()
+ comm = _get_comm(args)
+ comm.barrier()
retval = func(*args, **kwargs)
mylog.debug("Entering barrier after %s", func.func_name)
- MPI.COMM_WORLD.Barrier()
+ comm.barrier()
return retval
if parallel_capable:
return barrierize
@@ -244,10 +254,11 @@
"""
@wraps(f1)
def in_order(*args, **kwargs):
- if MPI.COMM_WORLD.rank == 0:
+ comm = _get_comm(args)
+ if comm.rank == 0:
f1(*args, **kwargs)
- MPI.COMM_WORLD.Barrier()
- if MPI.COMM_WORLD.rank != 0:
+ comm.barrier()
+ if comm.rank != 0:
f2(*args, **kwargs)
if not parallel_capable: return f1
return in_order
@@ -259,7 +270,8 @@
"""
@wraps(func)
def root_only(*args, **kwargs):
- if MPI.COMM_WORLD.rank == 0:
+ comm = _get_comm(args)
+ if comm.rank == 0:
try:
func(*args, **kwargs)
all_clear = 1
@@ -268,8 +280,7 @@
all_clear = 0
else:
all_clear = None
- #MPI.COMM_WORLD.Barrier()
- all_clear = MPI.COMM_WORLD.bcast(all_clear, root=0)
+ all_clear = comm.mpi_bcast_pickled(all_clear, root=0)
if not all_clear: raise RuntimeError
if parallel_capable: return root_only
return func
@@ -367,31 +378,29 @@
self.communicators.append(Communicator(MPI.COMM_WORLD))
else:
self.communicators.append(Communicator(None))
- def push(self, size=None, ranks=None):
- raise NotImplementedError
- if size is None:
- size = len(available_ranks)
- if len(available_ranks) < size:
- raise RuntimeError
- if ranks is None:
- ranks = [available_ranks.pop() for i in range(size)]
-
- group = MPI.COMM_WORLD.Group.Incl(ranks)
- new_comm = MPI.COMM_WORLD.Create(group)
- self.communicators.append(Communicator(new_comm))
- return new_comm
+
+ def push(self, new_comm):
+ if not isinstance(new_comm, Communicator):
+ new_comm = Communicator(new_comm)
+ self.communicators.append(new_comm)
+ self._update_parallel_state(new_comm)
def push_with_ids(self, ids):
group = self.communicators[-1].comm.Get_group().Incl(ids)
new_comm = self.communicators[-1].comm.Create(group)
+ self.push(new_comm)
+ return new_comm
+
+ def _update_parallel_state(self, new_comm):
from yt.config import ytcfg
ytcfg["yt","__topcomm_parallel_size"] = str(new_comm.size)
ytcfg["yt","__topcomm_parallel_rank"] = str(new_comm.rank)
- self.communicators.append(Communicator(new_comm))
- return new_comm
+ if MPI.COMM_WORLD.rank > 0 and ytcfg.getboolean("yt","serialize"):
+ ytcfg["yt","onlydeserialize"] = "True"
def pop(self):
self.communicators.pop()
+ self._update_parallel_state(self.communicators[-1])
class Communicator(object):
comm = None
diff -r 4e417d8c0176dc4c90548407558a7d9767971d04 -r d87a904b9273745c790d990e2e83d411ece2d00b yt/utilities/rpdb.py
--- a/yt/utilities/rpdb.py
+++ b/yt/utilities/rpdb.py
@@ -66,6 +66,10 @@
server.server_close()
if size > 1:
from mpi4py import MPI
+ # This COMM_WORLD is okay. We want to barrierize here, while waiting
+ # for shutdown from the rest of the parallel group. If you are running
+ # with --rpdb it is assumed you know what you are doing and you won't
+ # let this get out of hand.
MPI.COMM_WORLD.Barrier()
class pdb_handler(object):
https://bitbucket.org/yt_analysis/yt/changeset/28d6044ac924/
changeset: 28d6044ac924
branch: yt
user: MatthewTurk
date: 2011-11-08 16:35:37
summary: A few more fixes, and an explicit import of parallel_objects into yt.mods.
affected #: 2 files
diff -r d87a904b9273745c790d990e2e83d411ece2d00b -r 28d6044ac924b8f051404ad5dc1cda778a8499a5 yt/mods.py
--- a/yt/mods.py
+++ b/yt/mods.py
@@ -105,11 +105,15 @@
ColorTransferFunction, PlanckTransferFunction, ProjectionTransferFunction, \
HomogenizedVolume, Camera, off_axis_projection
+from yt.utilities.parallel_tools.parallel_analysis_interface import \
+ parallel_objects
+
for name, cls in callback_registry.items():
exec("%s = cls" % name)
from yt.convenience import all_pfs, max_spheres, load, projload
+
# We load plugins. Keep in mind, this can be fairly dangerous -
# the primary purpose is to allow people to have a set of functions
# that get used every time that they don't have to *define* every time.
diff -r d87a904b9273745c790d990e2e83d411ece2d00b -r 28d6044ac924b8f051404ad5dc1cda778a8499a5 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -182,14 +182,15 @@
retval = None
if self._processing or not self._distributed:
return func(self, *args, **kwargs)
- if self._owner == self.comm.rank:
+ comm = _get_comm((self,))
+ if self._owner == comm.rank:
self._processing = True
retval = func(self, *args, **kwargs)
self._processing = False
# To be sure we utilize the root= kwarg, we manually access the .comm
# attribute, which must be an instance of MPI.Intracomm, and call bcast
# on that.
- retval = self.comm.comm.bcast(retval, root=self._owner)
+ retval = comm.comm.bcast(retval, root=self._owner)
#MPI.COMM_WORLD.Barrier()
return retval
return single_proc_results
@@ -228,6 +229,7 @@
comm = args[0].comm
else:
comm = communication_system.communicators[-1]
+ return comm
def parallel_blocking_call(func):
"""
@@ -280,7 +282,7 @@
all_clear = 0
else:
all_clear = None
- all_clear = comm.mpi_bcast_pickled(all_clear, root=0)
+ all_clear = comm.mpi_bcast_pickled(all_clear)
if not all_clear: raise RuntimeError
if parallel_capable: return root_only
return func
https://bitbucket.org/yt_analysis/yt/changeset/858192d3c8e7/
changeset: 858192d3c8e7
branch: yt
user: MatthewTurk
date: 2011-11-08 19:40:13
summary: Adding a check for the number of jobs versus the size.
affected #: 1 file
diff -r 28d6044ac924b8f051404ad5dc1cda778a8499a5 -r 858192d3c8e76333689c4aaa2059331364f03f21 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -347,6 +347,10 @@
if not parallel_capable: raise RuntimeError
my_communicator = communication_system.communicators[-1]
my_size = my_communicator.size
+ if njobs > my_size:
+ mylog.error("You have asked for %s jobs, but you only have %s processors.",
+ njobs, my_size)
+ raise RuntimeError
my_rank = my_communicator.rank
all_new_comms = na.array_split(na.arange(my_size), njobs)
for i,comm_set in enumerate(all_new_comms):
Repository URL: https://bitbucket.org/yt_analysis/yt/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
More information about the yt-svn
mailing list