[yt-svn] commit/yt: 2 new changesets
Bitbucket
commits-noreply at bitbucket.org
Mon Nov 14 12:53:53 PST 2011
2 new commits in yt:
https://bitbucket.org/yt_analysis/yt/changeset/4fa25db6e675/
changeset: 4fa25db6e675
branch: yt
user: sskory
date: 2011-11-14 19:32:53
summary: Allowing parallel_objects() to work in serial.
affected #: 1 file
diff -r e9d4dba7c151996557b7c77a9d805d9ee863b365 -r 4fa25db6e675ad7ddd385fdc37b62bfb3b13f74e yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -344,37 +344,51 @@
result_id = None
def parallel_objects(objects, njobs, storage = None):
- if not parallel_capable: raise RuntimeError
- my_communicator = communication_system.communicators[-1]
- my_size = my_communicator.size
- if njobs > my_size:
- mylog.error("You have asked for %s jobs, but you only have %s processors.",
- njobs, my_size)
- raise RuntimeError
- my_rank = my_communicator.rank
- all_new_comms = na.array_split(na.arange(my_size), njobs)
- for i,comm_set in enumerate(all_new_comms):
- if my_rank in comm_set:
- my_new_id = i
- break
- communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
- obj_ids = na.arange(len(objects))
-
- to_share = {}
- for result_id, obj in zip(obj_ids, objects)[my_new_id::njobs]:
+ if not parallel_capable:
+ mylog.warn("parallel_objects() is being used when parallel_capable is false. The loop is not being run in parallel. This may not be what was expected.")
+ to_share = {}
+ obj_ids = na.arange(len(objects))
+ for result_id, obj in zip(obj_ids, objects):
+ if storage is not None:
+ rstore = ResultsStorage()
+ rstore.result_id = result_id
+ yield rstore, obj
+ to_share[rstore.result_id] = rstore.result
+ else:
+ yield obj
if storage is not None:
- rstore = ResultsStorage()
- rstore.result_id = result_id
- yield rstore, obj
- to_share[rstore.result_id] = rstore.result
- else:
- yield obj
- communication_system.communicators.pop()
- if storage is not None:
- # Now we have to broadcast it
- new_storage = my_communicator.par_combine_object(
- to_share, datatype = 'dict', op = 'join')
- storage.update(new_storage)
+ storage.update(to_share)
+ else:
+ my_communicator = communication_system.communicators[-1]
+ my_size = my_communicator.size
+ if njobs > my_size:
+ mylog.error("You have asked for %s jobs, but you only have %s processors.",
+ njobs, my_size)
+ raise RuntimeError
+ my_rank = my_communicator.rank
+ all_new_comms = na.array_split(na.arange(my_size), njobs)
+ for i,comm_set in enumerate(all_new_comms):
+ if my_rank in comm_set:
+ my_new_id = i
+ break
+ communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
+ obj_ids = na.arange(len(objects))
+
+ to_share = {}
+ for result_id, obj in zip(obj_ids, objects)[my_new_id::njobs]:
+ if storage is not None:
+ rstore = ResultsStorage()
+ rstore.result_id = result_id
+ yield rstore, obj
+ to_share[rstore.result_id] = rstore.result
+ else:
+ yield obj
+ communication_system.communicators.pop()
+ if storage is not None:
+ # Now we have to broadcast it
+ new_storage = my_communicator.par_combine_object(
+ to_share, datatype = 'dict', op = 'join')
+ storage.update(new_storage)
class CommunicationSystem(object):
communicators = []
https://bitbucket.org/yt_analysis/yt/changeset/d73dc0541b19/
changeset: d73dc0541b19
branch: yt
user: sskory
date: 2011-11-14 21:03:05
summary: A simpler way of making parallel_objects work in serial.
affected #: 1 file
diff -r 4fa25db6e675ad7ddd385fdc37b62bfb3b13f74e -r d73dc0541b193f00e758c77f5fd0b977025acfb5 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -345,50 +345,39 @@
def parallel_objects(objects, njobs, storage = None):
if not parallel_capable:
+ njobs = 1
mylog.warn("parallel_objects() is being used when parallel_capable is false. The loop is not being run in parallel. This may not be what was expected.")
- to_share = {}
- obj_ids = na.arange(len(objects))
- for result_id, obj in zip(obj_ids, objects):
- if storage is not None:
- rstore = ResultsStorage()
- rstore.result_id = result_id
- yield rstore, obj
- to_share[rstore.result_id] = rstore.result
- else:
- yield obj
+ my_communicator = communication_system.communicators[-1]
+ my_size = my_communicator.size
+ if njobs > my_size:
+ mylog.error("You have asked for %s jobs, but you only have %s processors.",
+ njobs, my_size)
+ raise RuntimeError
+ my_rank = my_communicator.rank
+ all_new_comms = na.array_split(na.arange(my_size), njobs)
+ for i,comm_set in enumerate(all_new_comms):
+ if my_rank in comm_set:
+ my_new_id = i
+ break
+ if parallel_capable:
+ communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
+ obj_ids = na.arange(len(objects))
+
+ to_share = {}
+ for result_id, obj in zip(obj_ids, objects)[my_new_id::njobs]:
if storage is not None:
- storage.update(to_share)
- else:
- my_communicator = communication_system.communicators[-1]
- my_size = my_communicator.size
- if njobs > my_size:
- mylog.error("You have asked for %s jobs, but you only have %s processors.",
- njobs, my_size)
- raise RuntimeError
- my_rank = my_communicator.rank
- all_new_comms = na.array_split(na.arange(my_size), njobs)
- for i,comm_set in enumerate(all_new_comms):
- if my_rank in comm_set:
- my_new_id = i
- break
- communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
- obj_ids = na.arange(len(objects))
-
- to_share = {}
- for result_id, obj in zip(obj_ids, objects)[my_new_id::njobs]:
- if storage is not None:
- rstore = ResultsStorage()
- rstore.result_id = result_id
- yield rstore, obj
- to_share[rstore.result_id] = rstore.result
- else:
- yield obj
- communication_system.communicators.pop()
- if storage is not None:
- # Now we have to broadcast it
- new_storage = my_communicator.par_combine_object(
- to_share, datatype = 'dict', op = 'join')
- storage.update(new_storage)
+ rstore = ResultsStorage()
+ rstore.result_id = result_id
+ yield rstore, obj
+ to_share[rstore.result_id] = rstore.result
+ else:
+ yield obj
+ communication_system.communicators.pop()
+ if storage is not None:
+ # Now we have to broadcast it
+ new_storage = my_communicator.par_combine_object(
+ to_share, datatype = 'dict', op = 'join')
+ storage.update(new_storage)
class CommunicationSystem(object):
communicators = []
Repository URL: https://bitbucket.org/yt_analysis/yt/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
More information about the yt-svn
mailing list