[Yt-svn] commit/yt: 3 new changesets
Bitbucket
commits-noreply at bitbucket.org
Wed Nov 9 06:29:07 PST 2011
3 new commits in yt:
https://bitbucket.org/yt_analysis/yt/changeset/87d3684c5448/
changeset: 87d3684c5448
branch: yt
user: MatthewTurk
date: 2011-11-08 21:32:18
summary: Fixing tabs => spaces
affected #: 1 file
diff -r f376abaf6942d5050c42dc2a49f258fb60566ae5 -r 87d3684c54487d051b3cbadd1e7d9d280d7ca7d8 yt/visualization/volume_rendering/camera.py
--- a/yt/visualization/volume_rendering/camera.py
+++ b/yt/visualization/volume_rendering/camera.py
@@ -582,8 +582,8 @@
transfer_function = None, fields = None,
sub_samples = 5, log_fields = None, volume = None,
pf = None, use_kd=True, no_ghost=False):
- ParallelAnalysisInterface.__init__(self)
- if pf is not None: self.pf = pf
+ ParallelAnalysisInterface.__init__(self)
+ if pf is not None: self.pf = pf
self.center = na.array(center, dtype='float64')
self.radius = radius
self.nside = nside
@@ -652,8 +652,8 @@
sub_samples = 5, log_fields = None, volume = None,
pf = None, use_kd=True, no_ghost=False,
rays_per_cell = 0.1, max_nside = 8192):
- ParallelAnalysisInterface.__init__(self)
- if pf is not None: self.pf = pf
+ ParallelAnalysisInterface.__init__(self)
+ if pf is not None: self.pf = pf
self.center = na.array(center, dtype='float64')
self.radius = radius
self.use_kd = use_kd
@@ -706,8 +706,8 @@
class StereoPairCamera(Camera):
def __init__(self, original_camera, relative_separation = 0.005):
- ParallelAnalysisInterface.__init__(self)
- self.original_camera = original_camera
+ ParallelAnalysisInterface.__init__(self)
+ self.original_camera = original_camera
self.relative_separation = relative_separation
def split(self):
https://bitbucket.org/yt_analysis/yt/changeset/6de131ab4dbe/
changeset: 6de131ab4dbe
branch: yt
user: MatthewTurk
date: 2011-11-08 21:50:14
summary: Implementing list/cat operation in par_combine_obj.
affected #: 1 file
diff -r 87d3684c54487d051b3cbadd1e7d9d280d7ca7d8 -r 6de131ab4dbe80293181ce5e6fed2e5161ff00c8 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -496,9 +496,16 @@
return data
elif datatype == "list" and op == "cat":
if self.comm.rank == 0:
- data = self.__mpi_recvlist(data)
+ storage = {0:ensure_list(data)}
+ for i in xrange(self.comm.size - 1):
+ st = MPI.Status()
+ d = self.comm.recv(source = MPI.ANY_SOURCE, status = st)
+ storage[st.source] = d
+ data = []
+ for i in xrange(self.comm.size):
+ data.extend(storage.pop(i))
else:
- self.comm.send(data, dest=0, tag=0)
+ self.comm.send(data, dest=0)
mylog.debug("Opening MPI Broadcast on %s", self.comm.rank)
data = self.comm.bcast(data, root=0)
return data
https://bitbucket.org/yt_analysis/yt/changeset/1996bb83bd9c/
changeset: 1996bb83bd9c
branch: yt
user: MatthewTurk
date: 2011-11-09 15:27:59
summary: Merged
affected #: 3 files
diff -r 6de131ab4dbe80293181ce5e6fed2e5161ff00c8 -r 1996bb83bd9c72a7d178462bfe61fa9b0f341941 yt/mods.py
--- a/yt/mods.py
+++ b/yt/mods.py
@@ -105,11 +105,15 @@
ColorTransferFunction, PlanckTransferFunction, ProjectionTransferFunction, \
HomogenizedVolume, Camera, off_axis_projection
+from yt.utilities.parallel_tools.parallel_analysis_interface import \
+ parallel_objects
+
for name, cls in callback_registry.items():
exec("%s = cls" % name)
from yt.convenience import all_pfs, max_spheres, load, projload
+
# We load plugins. Keep in mind, this can be fairly dangerous -
# the primary purpose is to allow people to have a set of functions
# that get used every time that they don't have to *define* every time.
diff -r 6de131ab4dbe80293181ce5e6fed2e5161ff00c8 -r 1996bb83bd9c72a7d178462bfe61fa9b0f341941 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -49,7 +49,7 @@
from mpi4py import MPI
parallel_capable = (MPI.COMM_WORLD.size > 1)
if parallel_capable:
- mylog.info("Parallel computation enabled: %s / %s",
+ mylog.info("Global parallel computation enabled: %s / %s",
MPI.COMM_WORLD.rank, MPI.COMM_WORLD.size)
ytcfg["yt","__global_parallel_rank"] = str(MPI.COMM_WORLD.rank)
ytcfg["yt","__global_parallel_size"] = str(MPI.COMM_WORLD.size)
@@ -61,8 +61,6 @@
#ytcfg["yt","StoreParameterFiles"] = "False"
# Now let's make sure we have the right options set.
if MPI.COMM_WORLD.rank > 0:
- if ytcfg.getboolean("yt","serialize"):
- ytcfg["yt","onlydeserialize"] = "True"
if ytcfg.getboolean("yt","LogFile"):
ytcfg["yt","LogFile"] = "False"
yt.utilities.logger.disable_file_logging()
@@ -150,8 +148,10 @@
def __init__(self, pobj, just_list = False, attr='_grids',
round_robin=False):
ObjectIterator.__init__(self, pobj, just_list, attr=attr)
- self._offset = MPI.COMM_WORLD.rank
- self._skip = MPI.COMM_WORLD.size
+ # pobj has to be a ParallelAnalysisInterface, so it must have a .comm
+ # object.
+ self._offset = pobj.comm.rank
+ self._skip = pobj.comm.size
# Note that we're doing this in advance, and with a simple means
# of choosing them; more advanced methods will be explored later.
if self._use_all:
@@ -182,11 +182,15 @@
retval = None
if self._processing or not self._distributed:
return func(self, *args, **kwargs)
- if self._owner == MPI.COMM_WORLD.rank:
+ comm = _get_comm((self,))
+ if self._owner == comm.rank:
self._processing = True
retval = func(self, *args, **kwargs)
self._processing = False
- retval = MPI.COMM_WORLD.bcast(retval, root=self._owner)
+ # To be sure we utilize the root= kwarg, we manually access the .comm
+ # attribute, which must be an instance of MPI.Intracomm, and call bcast
+ # on that.
+ retval = comm.comm.bcast(retval, root=self._owner)
#MPI.COMM_WORLD.Barrier()
return retval
return single_proc_results
@@ -220,6 +224,13 @@
return func(self, data, **kwargs)
return passage
+def _get_comm(args):
+ if len(args) > 0 and hasattr(args[0], "comm"):
+ comm = args[0].comm
+ else:
+ comm = communication_system.communicators[-1]
+ return comm
+
def parallel_blocking_call(func):
"""
This decorator blocks on entry and exit of a function.
@@ -227,10 +238,11 @@
@wraps(func)
def barrierize(*args, **kwargs):
mylog.debug("Entering barrier before %s", func.func_name)
- MPI.COMM_WORLD.Barrier()
+ comm = _get_comm(args)
+ comm.barrier()
retval = func(*args, **kwargs)
mylog.debug("Entering barrier after %s", func.func_name)
- MPI.COMM_WORLD.Barrier()
+ comm.barrier()
return retval
if parallel_capable:
return barrierize
@@ -244,10 +256,11 @@
"""
@wraps(f1)
def in_order(*args, **kwargs):
- if MPI.COMM_WORLD.rank == 0:
+ comm = _get_comm(args)
+ if comm.rank == 0:
f1(*args, **kwargs)
- MPI.COMM_WORLD.Barrier()
- if MPI.COMM_WORLD.rank != 0:
+ comm.barrier()
+ if comm.rank != 0:
f2(*args, **kwargs)
if not parallel_capable: return f1
return in_order
@@ -259,7 +272,8 @@
"""
@wraps(func)
def root_only(*args, **kwargs):
- if MPI.COMM_WORLD.rank == 0:
+ comm = _get_comm(args)
+ if comm.rank == 0:
try:
func(*args, **kwargs)
all_clear = 1
@@ -268,8 +282,7 @@
all_clear = 0
else:
all_clear = None
- #MPI.COMM_WORLD.Barrier()
- all_clear = MPI.COMM_WORLD.bcast(all_clear, root=0)
+ all_clear = comm.mpi_bcast_pickled(all_clear)
if not all_clear: raise RuntimeError
if parallel_capable: return root_only
return func
@@ -334,6 +347,10 @@
if not parallel_capable: raise RuntimeError
my_communicator = communication_system.communicators[-1]
my_size = my_communicator.size
+ if njobs > my_size:
+ mylog.error("You have asked for %s jobs, but you only have %s processors.",
+ njobs, my_size)
+ raise RuntimeError
my_rank = my_communicator.rank
all_new_comms = na.array_split(na.arange(my_size), njobs)
for i,comm_set in enumerate(all_new_comms):
@@ -367,31 +384,29 @@
self.communicators.append(Communicator(MPI.COMM_WORLD))
else:
self.communicators.append(Communicator(None))
- def push(self, size=None, ranks=None):
- raise NotImplementedError
- if size is None:
- size = len(available_ranks)
- if len(available_ranks) < size:
- raise RuntimeError
- if ranks is None:
- ranks = [available_ranks.pop() for i in range(size)]
-
- group = MPI.COMM_WORLD.Group.Incl(ranks)
- new_comm = MPI.COMM_WORLD.Create(group)
- self.communicators.append(Communicator(new_comm))
- return new_comm
+
+ def push(self, new_comm):
+ if not isinstance(new_comm, Communicator):
+ new_comm = Communicator(new_comm)
+ self.communicators.append(new_comm)
+ self._update_parallel_state(new_comm)
def push_with_ids(self, ids):
group = self.communicators[-1].comm.Get_group().Incl(ids)
new_comm = self.communicators[-1].comm.Create(group)
+ self.push(new_comm)
+ return new_comm
+
+ def _update_parallel_state(self, new_comm):
from yt.config import ytcfg
ytcfg["yt","__topcomm_parallel_size"] = str(new_comm.size)
ytcfg["yt","__topcomm_parallel_rank"] = str(new_comm.rank)
- self.communicators.append(Communicator(new_comm))
- return new_comm
+ if MPI.COMM_WORLD.rank > 0 and ytcfg.getboolean("yt","serialize"):
+ ytcfg["yt","onlydeserialize"] = "True"
def pop(self):
self.communicators.pop()
+ self._update_parallel_state(self.communicators[-1])
class Communicator(object):
comm = None
@@ -495,19 +510,12 @@
data = self.alltoallv_array(data, arr_size, offsets, sizes)
return data
elif datatype == "list" and op == "cat":
- if self.comm.rank == 0:
- storage = {0:ensure_list(data)}
- for i in xrange(self.comm.size - 1):
- st = MPI.Status()
- d = self.comm.recv(source = MPI.ANY_SOURCE, status = st)
- storage[st.source] = d
- data = []
- for i in xrange(self.comm.size):
- data.extend(storage.pop(i))
- else:
- self.comm.send(data, dest=0)
- mylog.debug("Opening MPI Broadcast on %s", self.comm.rank)
- data = self.comm.bcast(data, root=0)
+ recv_data = self.comm.allgather(data)
+ # Now flatten into a single list, since this
+ # returns us a list of lists.
+ data = []
+ while recv_data:
+ data.extend(recv_data.pop(0))
return data
raise NotImplementedError
diff -r 6de131ab4dbe80293181ce5e6fed2e5161ff00c8 -r 1996bb83bd9c72a7d178462bfe61fa9b0f341941 yt/utilities/rpdb.py
--- a/yt/utilities/rpdb.py
+++ b/yt/utilities/rpdb.py
@@ -66,6 +66,10 @@
server.server_close()
if size > 1:
from mpi4py import MPI
+ # This COMM_WORLD is okay. We want to barrierize here, while waiting
+ # for shutdown from the rest of the parallel group. If you are running
+ # with --rpdb it is assumed you know what you are doing and you won't
+ # let this get out of hand.
MPI.COMM_WORLD.Barrier()
class pdb_handler(object):
Repository URL: https://bitbucket.org/yt_analysis/yt/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
More information about the yt-svn
mailing list