[yt-svn] commit/yt: 10 new changesets

Bitbucket commits-noreply at bitbucket.org
Fri Jun 22 14:57:50 PDT 2012


10 new commits in yt:


https://bitbucket.org/yt_analysis/yt/changeset/ad54bd076d43/
changeset:   ad54bd076d43
branch:      yt
user:        MatthewTurk
date:        2012-06-22 17:37:49
summary:     I have forgotten why this was added to the repository.
affected #:  1 file

diff -r f2a1a5d384dec9014d12bccee925ae14194312e5 -r ad54bd076d432147923c28c738915e49e95c2b8b yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ /dev/null
@@ -1,149 +0,0 @@
-"""
-A task queue for distributing work to worker agents
-
-Author: Matthew Turk <matthewturk at gmail.com>
-Affiliation: Columbia University
-Homepage: http://yt-project.org/
-License:
-  Copyright (C) 2011 Matthew Turk.  All Rights Reserved.
-
-  This file is part of yt.
-
-  yt is free software; you can redistribute it and/or modify
-  it under the terms of the GNU General Public License as published by
-  the Free Software Foundation; either version 3 of the License, or
-  (at your option) any later version.
-
-  This program is distributed in the hope that it will be useful,
-  but WITHOUT ANY WARRANTY; without even the implied warranty of
-  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-  GNU General Public License for more details.
-
-  You should have received a copy of the GNU General Public License
-  along with this program.  If not, see <http://www.gnu.org/licenses/>.
-"""
-
-import threading
-from yt.funcs import *
-
-# The idea here is that we have a set of tasks, which we want to distribute.
-# We'll try to make this forward-compatible.  To do so, we want to support the
-# idea that there's a single, global set of tasks, as well as consumers that
-# receive tasks from the main controller.  These consumers then pass them out
-# to executors.
-#
-# The middle level, the "Consumer," is only really distinct from the executor
-# in the case that there is an MPI subcommunicator.  The reason for the
-# separation is so that the controller only communicates with a single member
-# of each subcommunicator, which then passes that info back out.
-
-def locked(func):
-    @wraps(func)
-    def exclusive(self, *args, **kwargs):
-        with self.lock:
-            return func(self, *args, **kwargs)
-    return exclusive
-
-class YTTaskCommunicator(object):
-    # This should carefully be checked for a race condition, particularly in
-    # the wait() function
-    def __init__(self, interval = 2.0):
-        self.interval = interval
-        self.task_id = None
-        self.waiting = False
-        self.lock = threading.Lock()
-
-    @locked
-    def send_task(self, task_id):
-        self.task_id = task_id
-
-    @locked
-    def query(self):
-        return (self.waiting and self.task_id is None)
-
-    def wait(self):
-        self.waiting = True
-        while self.task_id is None:
-            time.sleep(self.interval)
-        with self.lock:
-            self.waiting = False
-            new_task_id = self.task_id
-            self.task_id = None
-        return new_task_id
-
-class YTTaskQueueController(threading.Thread):
-    # There's only one of these for every instance of yt -- whether than
-    # instance be spread across processors or not.
-    # We assume that this will exist in the process space of a consumer, so it
-    # will be threading based.
-    def __init__(self, tasks, interval = 2.0, communicators = None):
-        self.assignments = []
-        self.interval = interval
-        # Communicators can be anything but they have to implement a mechanism
-        # for saying, "I'm ready" and "I'm done"
-        self.tasks = tasks
-        self.communicators = communicators
-        threading.Thread.__init__(self)
-
-    def run(self):
-        # Now we bootstrap
-        for i,c in enumerate(self.communicators):
-            self.assignments.append(i)
-            if i == len(self.tasks): break
-            c.send_task(i)
-        while len(self.assignments) < len(self.tasks):
-            time.sleep(self.interval)
-            for i,c in enumerate(self.communicators):
-                if not c.query(): continue
-                print "Sending assignment %s to %s" % (
-                    len(self.assignments), i)
-                c.send_task(len(self.assignments))
-                self.assignments.append(i)
-                if len(self.assignments) >= len(self.tasks): break
-        terminated = 0
-        while terminated != len(self.communicators):
-            for i,c in enumerate(self.communicators):
-                if not c.query(): continue
-                c.send_task(-1)
-                terminated += 1
-                print "Terminated %s" % (i)
-
-class YTTaskQueueConsumer(object):
-    # One of these will exist per individual MPI task or one per MPI
-    # subcommunicator, depending on the level of parallelism.  They serve to
-    # talk to the YTTaskQueueController on one side and possibly several
-    # YTTaskExecutors on the other.
-    #
-    # One potential setup for this, when using MPI, would be to have the
-    # Executors each have one of these, but only the head process of that
-    # subcommunicator possess an external communicator.  Then in next_task,
-    # if the external communicator exists, one would probe that; otherwise,
-    # accept a broadcast from the internal communicator's 0th task.
-    def __init__(self, external_communicator, internal_communicator):
-        self.external_communicator = external_communicator
-        self.internal_communicator = internal_communicator
-
-    def next_task(self):
-        next_task = self.external_communicator.wait()
-        #self.internal_communicator.notify(next_task)
-        return next_task
-
-class YTTaskExecutor(object):
-    _count = 0
-    # One of these will exist per computational actor
-    def __init__(self, tasks, communicator):
-        self.communicator = communicator
-        self.tasks = tasks
-        self.name = "Runner%03s" % (self.__class__._count)
-        self.__class__._count += 1
-
-    def run(self):
-        # Note that right now this only works for a 1:1 mapping of
-        # YTTaskQueueConsumer to YTTaskExecutor
-        next_task = None
-        while 1:
-            next_task = self.communicator.next_task()
-            if next_task == -1: break
-            print "Executing on %s" % (self.name),
-            self.tasks[next_task]()
-        print "Concluded on %s" % (self.name)



https://bitbucket.org/yt_analysis/yt/changeset/277cfcda9b19/
changeset:   277cfcda9b19
branch:      yt
user:        MatthewTurk
date:        2012-06-22 17:57:57
summary:     Initial import of testing task queue code
affected #:  1 file

diff -r ad54bd076d432147923c28c738915e49e95c2b8b -r 277cfcda9b1915307e2b894d447888562fc275f7 yt/utilities/parallel_tools/task_queue.py
--- /dev/null
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -0,0 +1,146 @@
+"""
+Task queue in yt
+
+Author: Britton Smith <matthewturk at gmail.com>
+Affiliation: Michigan State University
+Author: Matthew Turk <matthewturk at gmail.com>
+Affiliation: Columbia University
+Homepage: http://yt-project.org/
+License:
+  Copyright (C) 2012 Matthew Turk.  All Rights Reserved.
+
+  This file is part of yt.
+
+  yt is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; either version 3 of the License, or
+  (at your option) any later version.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+"""
+
+from mpi4py import MPI
+import time, threading, random
+
+messages = dict(
+    task = dict(msg = 'next'),
+    result = dict(msg = 'result'),
+    task_req = dict(msg = 'task_req'),
+    end = dict(msg = 'no_more_tasks'),
+)
+
+class TaskQueueNonRoot(object):
+    def __init__(self, tasks):
+        self.tasks = tasks
+        self.results = None
+
+    def send_result(self, result):
+        new_msg = messages['result'].copy()
+        new_msg['value'] = result
+        MPI.COMM_WORLD.send(new_msg, dest = 0, tag=1)
+
+    def get_next(self):
+        msg = messages['task_req'].copy()
+        MPI.COMM_WORLD.send(msg, dest = 0, tag=1)
+        msg = MPI.COMM_WORLD.recv(source = 0, tag=2)
+        if msg['msg'] == messages['end']['msg']:
+            raise StopIteration
+        return msg['value']
+
+    def __iter__(self):
+        while 1:
+            yield self.get_next()
+
+    def run(self, callable):
+        for task in self:
+            self.send_result(callable(task))
+        return self.finalize()
+
+    def finalize(self):
+        return MPI.COMM_WORLD.bcast(None, root = 0)
+
+class TaskQueueRoot(TaskQueueNonRoot):
+    def __init__(self, tasks):
+        self.tasks = tasks
+        self.results = {}
+        self.assignments = {}
+        self._notified = 0
+        self._current = 0
+        self._remaining = len(self.tasks)
+        self.dist = threading.Thread(target=self.handle_assignments)
+        self.dist.daemon = True
+        self.dist.start()
+        # Set up threading here
+
+    def insert_result(self, source_id, result):
+        task_id = self.assignments[source_id]
+        self.results[task_id] = result
+
+    def assign_task(self, source_id):
+        if self._remaining == 0:
+            msg = messages['end'].copy()
+            self._notified += 1
+        else:
+            msg = messages['task'].copy()
+            task_id = self._current
+            task = self.tasks[task_id]
+            self.assignments[source_id] = task_id
+            self._current += 1
+            self._remaining -= 1
+            msg['value'] = task
+        MPI.COMM_WORLD.send(msg, dest = source_id, tag = 2)
+
+    def handle_assignments(self):
+        while 1:
+            st = MPI.Status()
+            MPI.COMM_WORLD.Probe(MPI.ANY_SOURCE, tag = 1, status = st)
+            msg = MPI.COMM_WORLD.recv(source = st.source, tag = 1)
+            if msg['msg'] == messages['result']['msg']:
+                self.insert_result(st.source, msg['value'])
+            elif msg['msg'] == messages['task_req']['msg']:
+                self.assign_task(st.source)
+            else:
+                print "GOT AN UNKNOWN MESSAGE", msg
+                raise RuntimeError
+            if self._notified >= MPI.COMM_WORLD.size: break
+
+    def finalize(self):
+        self.dist.join()
+        rv = MPI.COMM_WORLD.bcast(self.results, root = 0)
+        return rv
+
+tasks = range(1000)
+if MPI.COMM_WORLD.rank == 0:
+    q = TaskQueueRoot(tasks)
+else:
+    q = TaskQueueNonRoot(None)
+
+count = 0
+ttot = 0.0
+def func(t):
+    print "Working", MPI.COMM_WORLD.rank, t
+    ts = random.random()
+    time.sleep(ts)
+    global count, ttot
+    ttot += ts
+    count += 1
+    return t
+
+t1 = time.time()
+vals = q.run(func)
+t2 = time.time()
+if MPI.COMM_WORLD.rank == 0:
+    for i in sorted(vals):
+        print i, vals[i]
+        assert(i == vals[i])
+MPI.COMM_WORLD.barrier()
+for i in range(MPI.COMM_WORLD.size):
+    if i == MPI.COMM_WORLD.rank:
+        print "On proc %s, %s tasks were executed (%0.3e eff)" % (i, count,
+        ttot/(t2-t1))



https://bitbucket.org/yt_analysis/yt/changeset/9fd815710bad/
changeset:   9fd815710bad
branch:      yt
user:        brittonsmith
date:        2012-06-22 18:22:25
summary:     Changing task queue to use self.comm instead of COMM_WORLD.
affected #:  1 file

diff -r 277cfcda9b1915307e2b894d447888562fc275f7 -r 9fd815710bade5b167edce23b6e66c4030a6a2cb yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -28,6 +28,9 @@
 from mpi4py import MPI
 import time, threading, random
 
+from .parallel_analysis_interface import \
+    _get_comm
+
 messages = dict(
     task = dict(msg = 'next'),
     result = dict(msg = 'result'),
@@ -39,16 +42,17 @@
     def __init__(self, tasks):
         self.tasks = tasks
         self.results = None
+        self.comm = _get_comm(())
 
     def send_result(self, result):
         new_msg = messages['result'].copy()
         new_msg['value'] = result
-        MPI.COMM_WORLD.send(new_msg, dest = 0, tag=1)
+        self.comm.comm.send(new_msg, dest = 0, tag=1)
 
     def get_next(self):
         msg = messages['task_req'].copy()
-        MPI.COMM_WORLD.send(msg, dest = 0, tag=1)
-        msg = MPI.COMM_WORLD.recv(source = 0, tag=2)
+        self.comm.comm.send(msg, dest = 0, tag=1)
+        msg = self.comm.comm.recv(source = 0, tag=2)
         if msg['msg'] == messages['end']['msg']:
             raise StopIteration
         return msg['value']
@@ -63,7 +67,7 @@
         return self.finalize()
 
     def finalize(self):
-        return MPI.COMM_WORLD.bcast(None, root = 0)
+        return self.comm.comm.bcast(None, root = 0)
 
 class TaskQueueRoot(TaskQueueNonRoot):
     def __init__(self, tasks):
@@ -73,10 +77,12 @@
         self._notified = 0
         self._current = 0
         self._remaining = len(self.tasks)
-        self.dist = threading.Thread(target=self.handle_assignments)
-        self.dist.daemon = True
-        self.dist.start()
+        self.comm = _get_comm(())
+        self.handle_assignments()
         # Set up threading here
+        # self.dist = threading.Thread(target=self.handle_assignments)
+        # self.dist.daemon = True
+        # self.dist.start()
 
     def insert_result(self, source_id, result):
         task_id = self.assignments[source_id]
@@ -94,13 +100,13 @@
             self._current += 1
             self._remaining -= 1
             msg['value'] = task
-        MPI.COMM_WORLD.send(msg, dest = source_id, tag = 2)
+        self.comm.comm.send(msg, dest = source_id, tag = 2)
 
     def handle_assignments(self):
         while 1:
             st = MPI.Status()
-            MPI.COMM_WORLD.Probe(MPI.ANY_SOURCE, tag = 1, status = st)
-            msg = MPI.COMM_WORLD.recv(source = st.source, tag = 1)
+            self.comm.comm.Probe(MPI.ANY_SOURCE, tag = 1, status = st)
+            msg = self.comm.comm.recv(source = st.source, tag = 1)
             if msg['msg'] == messages['result']['msg']:
                 self.insert_result(st.source, msg['value'])
             elif msg['msg'] == messages['task_req']['msg']:
@@ -108,39 +114,10 @@
             else:
                 print "GOT AN UNKNOWN MESSAGE", msg
                 raise RuntimeError
-            if self._notified >= MPI.COMM_WORLD.size: break
+            if self._notified >= self.comm.comm.size: break
 
     def finalize(self):
         self.dist.join()
-        rv = MPI.COMM_WORLD.bcast(self.results, root = 0)
+        rv = self.comm.comm.bcast(self.results, root = 0)
         return rv
 
-tasks = range(1000)
-if MPI.COMM_WORLD.rank == 0:
-    q = TaskQueueRoot(tasks)
-else:
-    q = TaskQueueNonRoot(None)
-
-count = 0
-ttot = 0.0
-def func(t):
-    print "Working", MPI.COMM_WORLD.rank, t
-    ts = random.random()
-    time.sleep(ts)
-    global count, ttot
-    ttot += ts
-    count += 1
-    return t
-
-t1 = time.time()
-vals = q.run(func)
-t2 = time.time()
-if MPI.COMM_WORLD.rank == 0:
-    for i in sorted(vals):
-        print i, vals[i]
-        assert(i == vals[i])
-MPI.COMM_WORLD.barrier()
-for i in range(MPI.COMM_WORLD.size):
-    if i == MPI.COMM_WORLD.rank:
-        print "On proc %s, %s tasks were executed (%0.3e eff)" % (i, count,
-        ttot/(t2-t1))



https://bitbucket.org/yt_analysis/yt/changeset/07ea72979b41/
changeset:   07ea72979b41
branch:      yt
user:        MatthewTurk
date:        2012-06-22 18:24:11
summary:     Moving probing into the parallel analysis interface and using a callback.
affected #:  2 files

diff -r 9fd815710bade5b167edce23b6e66c4030a6a2cb -r 07ea72979b41e602b821b5afcf4dfca7262d3902 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -782,6 +782,11 @@
                                   (tmp_recv, (rsize, roff), MPI.CHAR))
         return recv
 
+    def probe(self, tag, callback):
+        st = MPI.Status()
+        self.comm.Probe(MPI.ANY_SOURCE, tag = tag, status = st)
+        callback(st)
+
 communication_system = CommunicationSystem()
 if parallel_capable:
     ranks = na.arange(MPI.COMM_WORLD.size)


diff -r 9fd815710bade5b167edce23b6e66c4030a6a2cb -r 07ea72979b41e602b821b5afcf4dfca7262d3902 yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -104,18 +104,19 @@
 
     def handle_assignments(self):
         while 1:
-            st = MPI.Status()
-            self.comm.comm.Probe(MPI.ANY_SOURCE, tag = 1, status = st)
-            msg = self.comm.comm.recv(source = st.source, tag = 1)
-            if msg['msg'] == messages['result']['msg']:
-                self.insert_result(st.source, msg['value'])
-            elif msg['msg'] == messages['task_req']['msg']:
-                self.assign_task(st.source)
-            else:
-                print "GOT AN UNKNOWN MESSAGE", msg
-                raise RuntimeError
+            self.comm.probe(1, self.handshake)
             if self._notified >= self.comm.comm.size: break
 
+    def handshake(self, status)
+        msg = self.comm.recv(source = status.source, tag = 1)
+        if msg['msg'] == messages['result']['msg']:
+            self.insert_result(st.source, msg['value'])
+        elif msg['msg'] == messages['task_req']['msg']:
+            self.assign_task(st.source)
+        else:
+            print "GOT AN UNKNOWN MESSAGE", msg
+            raise RuntimeError
+
     def finalize(self):
         self.dist.join()
         rv = self.comm.comm.bcast(self.results, root = 0)



https://bitbucket.org/yt_analysis/yt/changeset/370924cab655/
changeset:   370924cab655
branch:      yt
user:        MatthewTurk
date:        2012-06-22 18:41:08
summary:     Moving the probe into a probe loop, and changing how the task queue works.
affected #:  2 files

diff -r 07ea72979b41e602b821b5afcf4dfca7262d3902 -r 370924cab655d31af7145a775b9a5f879325c94b yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -782,10 +782,15 @@
                                   (tmp_recv, (rsize, roff), MPI.CHAR))
         return recv
 
-    def probe(self, tag, callback):
-        st = MPI.Status()
-        self.comm.Probe(MPI.ANY_SOURCE, tag = tag, status = st)
-        callback(st)
+    def probe_loop(self, tag, callback):
+        while 1:
+            st = MPI.Status()
+            self.comm.Probe(MPI.ANY_SOURCE, tag = tag, status = st)
+            try:
+                callback(st)
+            except StopIteration:
+                print "PROBE LOOP ENDING"
+                break
 
 communication_system = CommunicationSystem()
 if parallel_capable:


diff -r 07ea72979b41e602b821b5afcf4dfca7262d3902 -r 370924cab655d31af7145a775b9a5f879325c94b yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -54,6 +54,7 @@
         self.comm.comm.send(msg, dest = 0, tag=1)
         msg = self.comm.comm.recv(source = 0, tag=2)
         if msg['msg'] == messages['end']['msg']:
+            print "Notified to end"
             raise StopIteration
         return msg['value']
 
@@ -66,8 +67,8 @@
             self.send_result(callable(task))
         return self.finalize()
 
-    def finalize(self):
-        return self.comm.comm.bcast(None, root = 0)
+    def finalize(self, vals = None):
+        return self.comm.comm.bcast(vals, root = 0)
 
 class TaskQueueRoot(TaskQueueNonRoot):
     def __init__(self, tasks):
@@ -78,18 +79,23 @@
         self._current = 0
         self._remaining = len(self.tasks)
         self.comm = _get_comm(())
-        self.handle_assignments()
+        print "Done with probe loop"
         # Set up threading here
         # self.dist = threading.Thread(target=self.handle_assignments)
         # self.dist.daemon = True
         # self.dist.start()
 
+    def run(self, func = None):
+        self.comm.probe_loop(1, self.handle_assignment)
+        return self.finalize(self.results)
+
     def insert_result(self, source_id, result):
         task_id = self.assignments[source_id]
         self.results[task_id] = result
 
     def assign_task(self, source_id):
         if self._remaining == 0:
+            print "Notifying %s to end" % source_id
             msg = messages['end'].copy()
             self._notified += 1
         else:
@@ -102,23 +108,15 @@
             msg['value'] = task
         self.comm.comm.send(msg, dest = source_id, tag = 2)
 
-    def handle_assignments(self):
-        while 1:
-            self.comm.probe(1, self.handshake)
-            if self._notified >= self.comm.comm.size: break
-
-    def handshake(self, status)
-        msg = self.comm.recv(source = status.source, tag = 1)
+    def handle_assignment(self, status):
+        msg = self.comm.comm.recv(source = status.source, tag = 1)
         if msg['msg'] == messages['result']['msg']:
-            self.insert_result(st.source, msg['value'])
+            self.insert_result(status.source, msg['value'])
         elif msg['msg'] == messages['task_req']['msg']:
-            self.assign_task(st.source)
+            self.assign_task(status.source)
         else:
             print "GOT AN UNKNOWN MESSAGE", msg
             raise RuntimeError
-
-    def finalize(self):
-        self.dist.join()
-        rv = self.comm.comm.bcast(self.results, root = 0)
-        return rv
-
+        if self._notified >= self.comm.comm.size - 1:
+            print "NOTIFIED ENOUGH!"
+            raise StopIteration



https://bitbucket.org/yt_analysis/yt/changeset/18e28348e8a2/
changeset:   18e28348e8a2
branch:      yt
user:        brittonsmith
date:        2012-06-22 20:04:36
summary:     Added task_queue helper function.
affected #:  1 file

diff -r 370924cab655d31af7145a775b9a5f879325c94b -r 18e28348e8a22aa2c3ccd5a9b4361b7def3eb615 yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -29,6 +29,7 @@
 import time, threading, random
 
 from .parallel_analysis_interface import \
+    communication_system, \
     _get_comm
 
 messages = dict(
@@ -39,10 +40,10 @@
 )
 
 class TaskQueueNonRoot(object):
-    def __init__(self, tasks):
+    def __init__(self, comm, tasks):
         self.tasks = tasks
         self.results = None
-        self.comm = _get_comm(())
+        self.comm = comm
 
     def send_result(self, result):
         new_msg = messages['result'].copy()
@@ -71,15 +72,14 @@
         return self.comm.comm.bcast(vals, root = 0)
 
 class TaskQueueRoot(TaskQueueNonRoot):
-    def __init__(self, tasks):
+    def __init__(self, comm, tasks):
         self.tasks = tasks
         self.results = {}
         self.assignments = {}
         self._notified = 0
         self._current = 0
         self._remaining = len(self.tasks)
-        self.comm = _get_comm(())
-        print "Done with probe loop"
+        self.comm = comm
         # Set up threading here
         # self.dist = threading.Thread(target=self.handle_assignments)
         # self.dist.daemon = True
@@ -120,3 +120,11 @@
         if self._notified >= self.comm.comm.size - 1:
             print "NOTIFIED ENOUGH!"
             raise StopIteration
+
+def task_queue(func, tasks):
+    comm = _get_comm(())
+    if comm.comm.rank == 0:
+        my_q = TaskQueueRoot(comm, tasks)
+    else:
+        my_q = TaskQueueNonRoot(comm, None)
+    my_q.run(func)



https://bitbucket.org/yt_analysis/yt/changeset/3f42ee171bc3/
changeset:   3f42ee171bc3
branch:      yt
user:        brittonsmith
date:        2012-06-22 21:39:36
summary:     task_queue helper function now accepts njobs keyword and creates
subcomms.
affected #:  1 file

diff -r 18e28348e8a22aa2c3ccd5a9b4361b7def3eb615 -r 3f42ee171bc3f40de37aa29c7c163395fc1c0654 yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -26,11 +26,14 @@
 """
 
 from mpi4py import MPI
+import numpy as na
 import time, threading, random
 
+from yt.funcs import *
 from .parallel_analysis_interface import \
     communication_system, \
-    _get_comm
+    _get_comm, \
+    parallel_capable
 
 messages = dict(
     task = dict(msg = 'next'),
@@ -40,22 +43,27 @@
 )
 
 class TaskQueueNonRoot(object):
-    def __init__(self, comm, tasks):
+    def __init__(self, tasks, comm, subcomm):
         self.tasks = tasks
         self.results = None
         self.comm = comm
+        self.subcomm = subcomm
 
     def send_result(self, result):
         new_msg = messages['result'].copy()
         new_msg['value'] = result
-        self.comm.comm.send(new_msg, dest = 0, tag=1)
+        if self.subcomm.rank == 0:
+            self.comm.comm.send(new_msg, dest = 0, tag=1)
+        self.subcomm.barrier()
 
     def get_next(self):
         msg = messages['task_req'].copy()
-        self.comm.comm.send(msg, dest = 0, tag=1)
-        msg = self.comm.comm.recv(source = 0, tag=2)
+        if self.subcomm.rank == 0:
+            self.comm.comm.send(msg, dest = 0, tag=1)
+            msg = self.comm.comm.recv(source = 0, tag=2)
+        msg = self.subcomm.bcast(msg, root=0)
         if msg['msg'] == messages['end']['msg']:
-            print "Notified to end"
+            mylog.info("Notified to end")
             raise StopIteration
         return msg['value']
 
@@ -72,7 +80,8 @@
         return self.comm.comm.bcast(vals, root = 0)
 
 class TaskQueueRoot(TaskQueueNonRoot):
-    def __init__(self, comm, tasks):
+    def __init__(self, tasks, comm, njobs):
+        self.njobs = njobs
         self.tasks = tasks
         self.results = {}
         self.assignments = {}
@@ -117,14 +126,33 @@
         else:
             print "GOT AN UNKNOWN MESSAGE", msg
             raise RuntimeError
-        if self._notified >= self.comm.comm.size - 1:
+        if self._notified >= self.njobs:
             print "NOTIFIED ENOUGH!"
             raise StopIteration
 
-def task_queue(func, tasks):
+def task_queue(func, tasks, njobs=0):
     comm = _get_comm(())
+    if not parallel_capable:
+        mylog.error("Cannot create task queue for serial process.")
+        raise RunTimeError
+    my_size = comm.comm.size
+    if njobs <= 0:
+        njobs = my_size - 1
+    if njobs >= my_size:
+        mylog.error("You have asked for %s jobs, but only %s processors are available.",
+                    njobs, (my_size - 1))
+        raise RunTimeError
+    my_rank = comm.rank
+    all_new_comms = na.array_split(na.arange(1, my_size), njobs)
+    all_new_comms.insert(0, na.array([0]))
+    for i,comm_set in enumerate(all_new_comms):
+        if my_rank in comm_set:
+            my_new_id = i
+            break
+    subcomm = communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
+    
     if comm.comm.rank == 0:
-        my_q = TaskQueueRoot(comm, tasks)
+        my_q = TaskQueueRoot(tasks, comm, njobs)
     else:
-        my_q = TaskQueueNonRoot(comm, None)
-    my_q.run(func)
+        my_q = TaskQueueNonRoot(None, comm, subcomm)
+    return my_q.run(func)



https://bitbucket.org/yt_analysis/yt/changeset/ee6b53e913e6/
changeset:   ee6b53e913e6
branch:      yt
user:        brittonsmith
date:        2012-06-22 22:04:58
summary:     Cleaned up debug prints in task queue.
affected #:  1 file

diff -r 3f42ee171bc3f40de37aa29c7c163395fc1c0654 -r ee6b53e913e6fbc0a09fb498bb8ffbca81bb3894 yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -25,7 +25,6 @@
   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 """
 
-from mpi4py import MPI
 import numpy as na
 import time, threading, random
 
@@ -104,7 +103,7 @@
 
     def assign_task(self, source_id):
         if self._remaining == 0:
-            print "Notifying %s to end" % source_id
+            mylog.debug("Notifying %s to end", source_id)
             msg = messages['end'].copy()
             self._notified += 1
         else:
@@ -124,10 +123,9 @@
         elif msg['msg'] == messages['task_req']['msg']:
             self.assign_task(status.source)
         else:
-            print "GOT AN UNKNOWN MESSAGE", msg
+            mylog.error("GOT AN UNKNOWN MESSAGE: %s", msg)
             raise RuntimeError
         if self._notified >= self.njobs:
-            print "NOTIFIED ENOUGH!"
             raise StopIteration
 
 def task_queue(func, tasks, njobs=0):



https://bitbucket.org/yt_analysis/yt/changeset/df2899c15f59/
changeset:   df2899c15f59
branch:      yt
user:        brittonsmith
date:        2012-06-22 23:38:01
summary:     Added dynamic_parallel_objects function and dynamic keyword to parallel_objects
to call dynamic_parallel_objects.
affected #:  2 files

diff -r ee6b53e913e6fbc0a09fb498bb8ffbca81bb3894 -r df2899c15f599537ca370351e9f6e6cafa89a17a yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -340,7 +340,13 @@
     result = None
     result_id = None
 
-def parallel_objects(objects, njobs = 0, storage = None, barrier = True):
+def parallel_objects(objects, njobs = 0, storage = None, barrier = True,
+                     dynamic = False):
+    if dynamic:
+        from .task_queue import dynamic_parallel_objects
+        dynamic_parallel_objects(objects, njobs=njobs,
+                                 storage=storage)
+    
     if not parallel_capable:
         njobs = 1
     my_communicator = communication_system.communicators[-1]
@@ -789,7 +795,7 @@
             try:
                 callback(st)
             except StopIteration:
-                print "PROBE LOOP ENDING"
+                mylog.debug("Probe loop ending.")
                 break
 
 communication_system = CommunicationSystem()


diff -r ee6b53e913e6fbc0a09fb498bb8ffbca81bb3894 -r df2899c15f599537ca370351e9f6e6cafa89a17a yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -32,7 +32,8 @@
 from .parallel_analysis_interface import \
     communication_system, \
     _get_comm, \
-    parallel_capable
+    parallel_capable, \
+    ResultsStorage
 
 messages = dict(
     task = dict(msg = 'next'),
@@ -44,7 +45,7 @@
 class TaskQueueNonRoot(object):
     def __init__(self, tasks, comm, subcomm):
         self.tasks = tasks
-        self.results = None
+        self.results = {}
         self.comm = comm
         self.subcomm = subcomm
 
@@ -153,4 +154,46 @@
         my_q = TaskQueueRoot(tasks, comm, njobs)
     else:
         my_q = TaskQueueNonRoot(None, comm, subcomm)
+    communication_system.pop()
     return my_q.run(func)
+
+def dynamic_parallel_objects(tasks, njobs=0, storage=None):
+    comm = _get_comm(())
+    if not parallel_capable:
+        mylog.error("Cannot create task queue for serial process.")
+        raise RunTimeError
+    my_size = comm.comm.size
+    if njobs <= 0:
+        njobs = my_size - 1
+    if njobs >= my_size:
+        mylog.error("You have asked for %s jobs, but only %s processors are available.",
+                    njobs, (my_size - 1))
+        raise RunTimeError
+    my_rank = comm.rank
+    all_new_comms = na.array_split(na.arange(1, my_size), njobs)
+    all_new_comms.insert(0, na.array([0]))
+    for i,comm_set in enumerate(all_new_comms):
+        if my_rank in comm_set:
+            my_new_id = i
+            break
+    subcomm = communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
+    
+    if comm.comm.rank == 0:
+        my_q = TaskQueueRoot(tasks, comm, njobs)
+        my_q.comm.probe_loop(1, my_q.handle_assignment)
+    else:
+        my_q = TaskQueueNonRoot(None, comm, subcomm)
+        if storage is None:
+            for task in my_q:
+                yield task
+        else:
+            for task in my_q:
+                rstore = ResultsStorage()
+                yield rstore, task
+                my_q.send_result(rstore.result)
+
+    if storage is not None:
+        my_results = my_q.comm.comm.bcast(my_q.results, root=0)
+        storage.update(my_results)
+
+    communication_system.pop()



https://bitbucket.org/yt_analysis/yt/changeset/17225c236e6a/
changeset:   17225c236e6a
branch:      yt
user:        MatthewTurk
date:        2012-06-22 23:57:49
summary:     Merged in brittonsmith/post-office (pull request #176)
affected #:  2 files

diff -r f6cc8e9297e8b0a4250d5d975f8f0abf9ce62fcc -r 17225c236e6afc77beb9f5620dbc560e6ee26834 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -340,7 +340,13 @@
     result = None
     result_id = None
 
-def parallel_objects(objects, njobs = 0, storage = None, barrier = True):
+def parallel_objects(objects, njobs = 0, storage = None, barrier = True,
+                     dynamic = False):
+    if dynamic:
+        from .task_queue import dynamic_parallel_objects
+        dynamic_parallel_objects(objects, njobs=njobs,
+                                 storage=storage)
+    
     if not parallel_capable:
         njobs = 1
     my_communicator = communication_system.communicators[-1]
@@ -782,6 +788,16 @@
                                   (tmp_recv, (rsize, roff), MPI.CHAR))
         return recv
 
+    def probe_loop(self, tag, callback):
+        while 1:
+            st = MPI.Status()
+            self.comm.Probe(MPI.ANY_SOURCE, tag = tag, status = st)
+            try:
+                callback(st)
+            except StopIteration:
+                mylog.debug("Probe loop ending.")
+                break
+
 communication_system = CommunicationSystem()
 if parallel_capable:
     ranks = na.arange(MPI.COMM_WORLD.size)


diff -r f6cc8e9297e8b0a4250d5d975f8f0abf9ce62fcc -r 17225c236e6afc77beb9f5620dbc560e6ee26834 yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -1,11 +1,13 @@
 """
-A task queue for distributing work to worker agents
+Task queue in yt
 
+Author: Britton Smith <matthewturk at gmail.com>
+Affiliation: Michigan State University
 Author: Matthew Turk <matthewturk at gmail.com>
 Affiliation: Columbia University
 Homepage: http://yt-project.org/
 License:
-  Copyright (C) 2011 Matthew Turk.  All Rights Reserved.
+  Copyright (C) 2012 Matthew Turk.  All Rights Reserved.
 
   This file is part of yt.
 
@@ -23,127 +25,175 @@
   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 """
 
-import threading
+import numpy as na
+import time, threading, random
+
 from yt.funcs import *
+from .parallel_analysis_interface import \
+    communication_system, \
+    _get_comm, \
+    parallel_capable, \
+    ResultsStorage
 
-# The idea here is that we have a set of tasks, which we want to distribute.
-# We'll try to make this forward-compatible.  To do so, we want to support the
-# idea that there's a single, global set of tasks, as well as consumers that
-# receive tasks from the main controller.  These consumers then pass them out
-# to executors.
-#
-# The middle level, the "Consumer," is only really distinct from the executor
-# in the case that there is an MPI subcommunicator.  The reason for the
-# separation is so that the controller only communicates with a single member
-# of each subcommunicator, which then passes that info back out.
+messages = dict(
+    task = dict(msg = 'next'),
+    result = dict(msg = 'result'),
+    task_req = dict(msg = 'task_req'),
+    end = dict(msg = 'no_more_tasks'),
+)
 
-def locked(func):
-    @wraps(func)
-    def exclusive(self, *args, **kwargs):
-        with self.lock:
-            return func(self, *args, **kwargs)
-    return exclusive
+class TaskQueueNonRoot(object):
+    def __init__(self, tasks, comm, subcomm):
+        self.tasks = tasks
+        self.results = {}
+        self.comm = comm
+        self.subcomm = subcomm
 
-class YTTaskCommunicator(object):
-    # This should carefully be checked for a race condition, particularly in
-    # the wait() function
-    def __init__(self, interval = 2.0):
-        self.interval = interval
-        self.task_id = None
-        self.waiting = False
-        self.lock = threading.Lock()
+    def send_result(self, result):
+        new_msg = messages['result'].copy()
+        new_msg['value'] = result
+        if self.subcomm.rank == 0:
+            self.comm.comm.send(new_msg, dest = 0, tag=1)
+        self.subcomm.barrier()
 
-    @locked
-    def send_task(self, task_id):
-        self.task_id = task_id
+    def get_next(self):
+        msg = messages['task_req'].copy()
+        if self.subcomm.rank == 0:
+            self.comm.comm.send(msg, dest = 0, tag=1)
+            msg = self.comm.comm.recv(source = 0, tag=2)
+        msg = self.subcomm.bcast(msg, root=0)
+        if msg['msg'] == messages['end']['msg']:
+            mylog.info("Notified to end")
+            raise StopIteration
+        return msg['value']
 
-    @locked
-    def query(self):
-        return (self.waiting and self.task_id is None)
+    def __iter__(self):
+        while 1:
+            yield self.get_next()
 
-    def wait(self):
-        self.waiting = True
-        while self.task_id is None:
-            time.sleep(self.interval)
-        with self.lock:
-            self.waiting = False
-            new_task_id = self.task_id
-            self.task_id = None
-        return new_task_id
+    def run(self, callable):
+        for task in self:
+            self.send_result(callable(task))
+        return self.finalize()
 
-class YTTaskQueueController(threading.Thread):
-    # There's only one of these for every instance of yt -- whether than
-    # instance be spread across processors or not.
-    # We assume that this will exist in the process space of a consumer, so it
-    # will be threading based.
-    def __init__(self, tasks, interval = 2.0, communicators = None):
-        self.assignments = []
-        self.interval = interval
-        # Communicators can be anything but they have to implement a mechanism
-        # for saying, "I'm ready" and "I'm done"
+    def finalize(self, vals = None):
+        return self.comm.comm.bcast(vals, root = 0)
+
+class TaskQueueRoot(TaskQueueNonRoot):
+    def __init__(self, tasks, comm, njobs):
+        self.njobs = njobs
         self.tasks = tasks
-        self.communicators = communicators
-        threading.Thread.__init__(self)
+        self.results = {}
+        self.assignments = {}
+        self._notified = 0
+        self._current = 0
+        self._remaining = len(self.tasks)
+        self.comm = comm
+        # Set up threading here
+        # self.dist = threading.Thread(target=self.handle_assignments)
+        # self.dist.daemon = True
+        # self.dist.start()
 
-    def run(self):
-        # Now we bootstrap
-        for i,c in enumerate(self.communicators):
-            self.assignments.append(i)
-            if i == len(self.tasks): break
-            c.send_task(i)
-        while len(self.assignments) < len(self.tasks):
-            time.sleep(self.interval)
-            for i,c in enumerate(self.communicators):
-                if not c.query(): continue
-                print "Sending assignment %s to %s" % (
-                    len(self.assignments), i)
-                c.send_task(len(self.assignments))
-                self.assignments.append(i)
-                if len(self.assignments) >= len(self.tasks): break
-        terminated = 0
-        while terminated != len(self.communicators):
-            for i,c in enumerate(self.communicators):
-                if not c.query(): continue
-                c.send_task(-1)
-                terminated += 1
-                print "Terminated %s" % (i)
+    def run(self, func = None):
+        self.comm.probe_loop(1, self.handle_assignment)
+        return self.finalize(self.results)
 
-class YTTaskQueueConsumer(object):
-    # One of these will exist per individual MPI task or one per MPI
-    # subcommunicator, depending on the level of parallelism.  They serve to
-    # talk to the YTTaskQueueController on one side and possibly several
-    # YTTaskExecutors on the other.
-    #
-    # One potential setup for this, when using MPI, would be to have the
-    # Executors each have one of these, but only the head process of that
-    # subcommunicator possess an external communicator.  Then in next_task,
-    # if the external communicator exists, one would probe that; otherwise,
-    # accept a broadcast from the internal communicator's 0th task.
-    def __init__(self, external_communicator, internal_communicator):
-        self.external_communicator = external_communicator
-        self.internal_communicator = internal_communicator
+    def insert_result(self, source_id, result):
+        task_id = self.assignments[source_id]
+        self.results[task_id] = result
 
-    def next_task(self):
-        next_task = self.external_communicator.wait()
-        #self.internal_communicator.notify(next_task)
-        return next_task
+    def assign_task(self, source_id):
+        if self._remaining == 0:
+            mylog.debug("Notifying %s to end", source_id)
+            msg = messages['end'].copy()
+            self._notified += 1
+        else:
+            msg = messages['task'].copy()
+            task_id = self._current
+            task = self.tasks[task_id]
+            self.assignments[source_id] = task_id
+            self._current += 1
+            self._remaining -= 1
+            msg['value'] = task
+        self.comm.comm.send(msg, dest = source_id, tag = 2)
 
-class YTTaskExecutor(object):
-    _count = 0
-    # One of these will exist per computational actor
-    def __init__(self, tasks, communicator):
-        self.communicator = communicator
-        self.tasks = tasks
-        self.name = "Runner%03s" % (self.__class__._count)
-        self.__class__._count += 1
+    def handle_assignment(self, status):
+        msg = self.comm.comm.recv(source = status.source, tag = 1)
+        if msg['msg'] == messages['result']['msg']:
+            self.insert_result(status.source, msg['value'])
+        elif msg['msg'] == messages['task_req']['msg']:
+            self.assign_task(status.source)
+        else:
+            mylog.error("GOT AN UNKNOWN MESSAGE: %s", msg)
+            raise RuntimeError
+        if self._notified >= self.njobs:
+            raise StopIteration
 
-    def run(self):
-        # Note that right now this only works for a 1:1 mapping of
-        # YTTaskQueueConsumer to YTTaskExecutor
-        next_task = None
-        while 1:
-            next_task = self.communicator.next_task()
-            if next_task == -1: break
-            print "Executing on %s" % (self.name),
-            self.tasks[next_task]()
-        print "Concluded on %s" % (self.name)
+def task_queue(func, tasks, njobs=0):
+    comm = _get_comm(())
+    if not parallel_capable:
+        mylog.error("Cannot create task queue for serial process.")
+        raise RunTimeError
+    my_size = comm.comm.size
+    if njobs <= 0:
+        njobs = my_size - 1
+    if njobs >= my_size:
+        mylog.error("You have asked for %s jobs, but only %s processors are available.",
+                    njobs, (my_size - 1))
+        raise RunTimeError
+    my_rank = comm.rank
+    all_new_comms = na.array_split(na.arange(1, my_size), njobs)
+    all_new_comms.insert(0, na.array([0]))
+    for i,comm_set in enumerate(all_new_comms):
+        if my_rank in comm_set:
+            my_new_id = i
+            break
+    subcomm = communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
+    
+    if comm.comm.rank == 0:
+        my_q = TaskQueueRoot(tasks, comm, njobs)
+    else:
+        my_q = TaskQueueNonRoot(None, comm, subcomm)
+    communication_system.pop()
+    return my_q.run(func)
+
+def dynamic_parallel_objects(tasks, njobs=0, storage=None):
+    comm = _get_comm(())
+    if not parallel_capable:
+        mylog.error("Cannot create task queue for serial process.")
+        raise RunTimeError
+    my_size = comm.comm.size
+    if njobs <= 0:
+        njobs = my_size - 1
+    if njobs >= my_size:
+        mylog.error("You have asked for %s jobs, but only %s processors are available.",
+                    njobs, (my_size - 1))
+        raise RunTimeError
+    my_rank = comm.rank
+    all_new_comms = na.array_split(na.arange(1, my_size), njobs)
+    all_new_comms.insert(0, na.array([0]))
+    for i,comm_set in enumerate(all_new_comms):
+        if my_rank in comm_set:
+            my_new_id = i
+            break
+    subcomm = communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
+    
+    if comm.comm.rank == 0:
+        my_q = TaskQueueRoot(tasks, comm, njobs)
+        my_q.comm.probe_loop(1, my_q.handle_assignment)
+    else:
+        my_q = TaskQueueNonRoot(None, comm, subcomm)
+        if storage is None:
+            for task in my_q:
+                yield task
+        else:
+            for task in my_q:
+                rstore = ResultsStorage()
+                yield rstore, task
+                my_q.send_result(rstore.result)
+
+    if storage is not None:
+        my_results = my_q.comm.comm.bcast(my_q.results, root=0)
+        storage.update(my_results)
+
+    communication_system.pop()

Repository URL: https://bitbucket.org/yt_analysis/yt/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.



More information about the yt-svn mailing list