[yt-svn] commit/yt: 2 new changesets

Bitbucket commits-noreply at bitbucket.org
Mon Nov 14 12:53:53 PST 2011


2 new commits in yt:


https://bitbucket.org/yt_analysis/yt/changeset/4fa25db6e675/
changeset:   4fa25db6e675
branch:      yt
user:        sskory
date:        2011-11-14 19:32:53
summary:     Allowing parallel_objects() to work in serial.
affected #:  1 file

diff -r e9d4dba7c151996557b7c77a9d805d9ee863b365 -r 4fa25db6e675ad7ddd385fdc37b62bfb3b13f74e yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -344,37 +344,51 @@
     result_id = None
 
 def parallel_objects(objects, njobs, storage = None):
-    if not parallel_capable: raise RuntimeError
-    my_communicator = communication_system.communicators[-1]
-    my_size = my_communicator.size
-    if njobs > my_size:
-        mylog.error("You have asked for %s jobs, but you only have %s processors.",
-            njobs, my_size)
-        raise RuntimeError
-    my_rank = my_communicator.rank
-    all_new_comms = na.array_split(na.arange(my_size), njobs)
-    for i,comm_set in enumerate(all_new_comms):
-        if my_rank in comm_set:
-            my_new_id = i
-            break
-    communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
-    obj_ids = na.arange(len(objects))
-
-    to_share = {}
-    for result_id, obj in zip(obj_ids, objects)[my_new_id::njobs]:
+    if not parallel_capable:
+        mylog.warn("parallel_objects() is being used when parallel_capable is false. The loop is not being run in parallel. This may not be what was expected.")
+        to_share = {}
+        obj_ids = na.arange(len(objects))
+        for result_id, obj in zip(obj_ids, objects):
+            if storage is not None:
+                rstore = ResultsStorage()
+                rstore.result_id = result_id
+                yield rstore, obj
+                to_share[rstore.result_id] = rstore.result
+            else:
+                yield obj
         if storage is not None:
-            rstore = ResultsStorage()
-            rstore.result_id = result_id
-            yield rstore, obj
-            to_share[rstore.result_id] = rstore.result
-        else:
-            yield obj
-    communication_system.communicators.pop()
-    if storage is not None:
-        # Now we have to broadcast it
-        new_storage = my_communicator.par_combine_object(
-                to_share, datatype = 'dict', op = 'join')
-        storage.update(new_storage)
+            storage.update(to_share)
+    else:
+        my_communicator = communication_system.communicators[-1]
+        my_size = my_communicator.size
+        if njobs > my_size:
+            mylog.error("You have asked for %s jobs, but you only have %s processors.",
+                njobs, my_size)
+            raise RuntimeError
+        my_rank = my_communicator.rank
+        all_new_comms = na.array_split(na.arange(my_size), njobs)
+        for i,comm_set in enumerate(all_new_comms):
+            if my_rank in comm_set:
+                my_new_id = i
+                break
+        communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
+        obj_ids = na.arange(len(objects))
+    
+        to_share = {}
+        for result_id, obj in zip(obj_ids, objects)[my_new_id::njobs]:
+            if storage is not None:
+                rstore = ResultsStorage()
+                rstore.result_id = result_id
+                yield rstore, obj
+                to_share[rstore.result_id] = rstore.result
+            else:
+                yield obj
+        communication_system.communicators.pop()
+        if storage is not None:
+            # Now we have to broadcast it
+            new_storage = my_communicator.par_combine_object(
+                    to_share, datatype = 'dict', op = 'join')
+            storage.update(new_storage)
 
 class CommunicationSystem(object):
     communicators = []



https://bitbucket.org/yt_analysis/yt/changeset/d73dc0541b19/
changeset:   d73dc0541b19
branch:      yt
user:        sskory
date:        2011-11-14 21:03:05
summary:     A simpler way of making parallel_objects work in serial.
affected #:  1 file

diff -r 4fa25db6e675ad7ddd385fdc37b62bfb3b13f74e -r d73dc0541b193f00e758c77f5fd0b977025acfb5 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -345,50 +345,39 @@
 
 def parallel_objects(objects, njobs, storage = None):
     if not parallel_capable:
+        njobs = 1
         mylog.warn("parallel_objects() is being used when parallel_capable is false. The loop is not being run in parallel. This may not be what was expected.")
-        to_share = {}
-        obj_ids = na.arange(len(objects))
-        for result_id, obj in zip(obj_ids, objects):
-            if storage is not None:
-                rstore = ResultsStorage()
-                rstore.result_id = result_id
-                yield rstore, obj
-                to_share[rstore.result_id] = rstore.result
-            else:
-                yield obj
+    my_communicator = communication_system.communicators[-1]
+    my_size = my_communicator.size
+    if njobs > my_size:
+        mylog.error("You have asked for %s jobs, but you only have %s processors.",
+            njobs, my_size)
+        raise RuntimeError
+    my_rank = my_communicator.rank
+    all_new_comms = na.array_split(na.arange(my_size), njobs)
+    for i,comm_set in enumerate(all_new_comms):
+        if my_rank in comm_set:
+            my_new_id = i
+            break
+    if parallel_capable:
+        communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
+    obj_ids = na.arange(len(objects))
+
+    to_share = {}
+    for result_id, obj in zip(obj_ids, objects)[my_new_id::njobs]:
         if storage is not None:
-            storage.update(to_share)
-    else:
-        my_communicator = communication_system.communicators[-1]
-        my_size = my_communicator.size
-        if njobs > my_size:
-            mylog.error("You have asked for %s jobs, but you only have %s processors.",
-                njobs, my_size)
-            raise RuntimeError
-        my_rank = my_communicator.rank
-        all_new_comms = na.array_split(na.arange(my_size), njobs)
-        for i,comm_set in enumerate(all_new_comms):
-            if my_rank in comm_set:
-                my_new_id = i
-                break
-        communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
-        obj_ids = na.arange(len(objects))
-    
-        to_share = {}
-        for result_id, obj in zip(obj_ids, objects)[my_new_id::njobs]:
-            if storage is not None:
-                rstore = ResultsStorage()
-                rstore.result_id = result_id
-                yield rstore, obj
-                to_share[rstore.result_id] = rstore.result
-            else:
-                yield obj
-        communication_system.communicators.pop()
-        if storage is not None:
-            # Now we have to broadcast it
-            new_storage = my_communicator.par_combine_object(
-                    to_share, datatype = 'dict', op = 'join')
-            storage.update(new_storage)
+            rstore = ResultsStorage()
+            rstore.result_id = result_id
+            yield rstore, obj
+            to_share[rstore.result_id] = rstore.result
+        else:
+            yield obj
+    communication_system.communicators.pop()
+    if storage is not None:
+        # Now we have to broadcast it
+        new_storage = my_communicator.par_combine_object(
+                to_share, datatype = 'dict', op = 'join')
+        storage.update(new_storage)
 
 class CommunicationSystem(object):
     communicators = []

Repository URL: https://bitbucket.org/yt_analysis/yt/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.



More information about the yt-svn mailing list