[yt-svn] commit/yt: MatthewTurk: Time series now operates, by default, in parallel.

Bitbucket commits-noreply at bitbucket.org
Sat Mar 31 01:45:44 PDT 2012


1 new commit in yt:


https://bitbucket.org/yt_analysis/yt/changeset/8fe96bd100a7/
changeset:   8fe96bd100a7
branch:      yt
user:        MatthewTurk
date:        2012-03-29 22:53:24
summary:     Time series now operates, by default, in parallel.
affected #:  1 file

diff -r f9d3b8d60a61f198d21df52f245e4831a4e7f3ca -r 8fe96bd100a7d6ae6a8b79543cc13a401ed36f10 yt/data_objects/time_series.py
--- a/yt/data_objects/time_series.py
+++ b/yt/data_objects/time_series.py
@@ -32,6 +32,8 @@
     analysis_task_registry, AnalysisTask
 from .derived_quantities import quantity_info
 from yt.utilities.exceptions import YTException
+from yt.utilities.parallel_tools.parallel_analysis_interface \
+    import parallel_objects
 
 class AnalysisTaskProxy(object):
     def __init__(self, time_series):
@@ -74,7 +76,7 @@
         raise AttributeError(attr)
 
 class TimeSeriesData(object):
-    def __init__(self, outputs = None):
+    def __init__(self, outputs = None, parallel = True):
         if outputs is None: outputs = []
         self.outputs = outputs
         self.tasks = AnalysisTaskProxy(self)
@@ -82,6 +84,7 @@
         for type_name in data_object_registry:
             setattr(self, type_name, functools.partial(
                 TimeSeriesDataObject, self, type_name))
+        self.parallel = parallel
 
     def __iter__(self):
         # We can make this fancier, but this works
@@ -101,9 +104,14 @@
         
     def eval(self, tasks, obj=None):
         tasks = ensure_list(tasks)
-        return_values = []
-        for pf in self:
-            return_values.append([])
+        return_values = {}
+        if self.parallel == False:
+            njobs = 1
+        else:
+            if self.parallel == True: njobs = -1
+            else: njobs = self.parallel
+        for store, pf in parallel_objects(self.outputs, njobs, return_values):
+            store.result = []
             for task in tasks:
                 try:
                     style = inspect.getargspec(task.eval)[0][1]
@@ -119,27 +127,28 @@
                 # small.
                 except YTException as rv:
                     pass
-                return_values[-1].append(rv)
-        return return_values
+                store.result.append(rv)
+        return [v for k, v in sorted(return_values.items())]
 
     @classmethod
-    def from_filenames(cls, filename_list):
+    def from_filenames(cls, filename_list, parallel = True):
         outputs = []
         for fn in filename_list:
             outputs.append(load(fn))
-        obj = cls(outputs)
+        obj = cls(outputs, parallel = parallel)
         return obj
 
     @classmethod
     def from_output_log(cls, output_log,
-                        line_prefix = "DATASET WRITTEN"):
+                        line_prefix = "DATASET WRITTEN",
+                        parallel = True):
         outputs = []
         for line in open(output_log):
             if not line.startswith(line_prefix): continue
             cut_line = line[len(line_prefix):].strip()
             fn = cut_line.split()[0]
             outputs.append(load(fn))
-        obj = cls(outputs)
+        obj = cls(outputs, parallel = parallel)
         return obj
 
 class TimeSeriesQuantitiesContainer(object):

Repository URL: https://bitbucket.org/yt_analysis/yt/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.



More information about the yt-svn mailing list