[yt-svn] commit/yt: 10 new changesets
Bitbucket
commits-noreply at bitbucket.org
Fri Jun 22 14:57:50 PDT 2012
10 new commits in yt:
https://bitbucket.org/yt_analysis/yt/changeset/ad54bd076d43/
changeset: ad54bd076d43
branch: yt
user: MatthewTurk
date: 2012-06-22 17:37:49
summary: I have forgotten why this was added to the repository.
affected #: 1 file
diff -r f2a1a5d384dec9014d12bccee925ae14194312e5 -r ad54bd076d432147923c28c738915e49e95c2b8b yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ /dev/null
@@ -1,149 +0,0 @@
-"""
-A task queue for distributing work to worker agents
-
-Author: Matthew Turk <matthewturk at gmail.com>
-Affiliation: Columbia University
-Homepage: http://yt-project.org/
-License:
- Copyright (C) 2011 Matthew Turk. All Rights Reserved.
-
- This file is part of yt.
-
- yt is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-
-import threading
-from yt.funcs import *
-
-# The idea here is that we have a set of tasks, which we want to distribute.
-# We'll try to make this forward-compatible. To do so, we want to support the
-# idea that there's a single, global set of tasks, as well as consumers that
-# receive tasks from the main controller. These consumers then pass them out
-# to executors.
-#
-# The middle level, the "Consumer," is only really distinct from the executor
-# in the case that there is an MPI subcommunicator. The reason for the
-# separation is so that the controller only communicates with a single member
-# of each subcommunicator, which then passes that info back out.
-
-def locked(func):
- @wraps(func)
- def exclusive(self, *args, **kwargs):
- with self.lock:
- return func(self, *args, **kwargs)
- return exclusive
-
-class YTTaskCommunicator(object):
- # This should carefully be checked for a race condition, particularly in
- # the wait() function
- def __init__(self, interval = 2.0):
- self.interval = interval
- self.task_id = None
- self.waiting = False
- self.lock = threading.Lock()
-
- @locked
- def send_task(self, task_id):
- self.task_id = task_id
-
- @locked
- def query(self):
- return (self.waiting and self.task_id is None)
-
- def wait(self):
- self.waiting = True
- while self.task_id is None:
- time.sleep(self.interval)
- with self.lock:
- self.waiting = False
- new_task_id = self.task_id
- self.task_id = None
- return new_task_id
-
-class YTTaskQueueController(threading.Thread):
- # There's only one of these for every instance of yt -- whether than
- # instance be spread across processors or not.
- # We assume that this will exist in the process space of a consumer, so it
- # will be threading based.
- def __init__(self, tasks, interval = 2.0, communicators = None):
- self.assignments = []
- self.interval = interval
- # Communicators can be anything but they have to implement a mechanism
- # for saying, "I'm ready" and "I'm done"
- self.tasks = tasks
- self.communicators = communicators
- threading.Thread.__init__(self)
-
- def run(self):
- # Now we bootstrap
- for i,c in enumerate(self.communicators):
- self.assignments.append(i)
- if i == len(self.tasks): break
- c.send_task(i)
- while len(self.assignments) < len(self.tasks):
- time.sleep(self.interval)
- for i,c in enumerate(self.communicators):
- if not c.query(): continue
- print "Sending assignment %s to %s" % (
- len(self.assignments), i)
- c.send_task(len(self.assignments))
- self.assignments.append(i)
- if len(self.assignments) >= len(self.tasks): break
- terminated = 0
- while terminated != len(self.communicators):
- for i,c in enumerate(self.communicators):
- if not c.query(): continue
- c.send_task(-1)
- terminated += 1
- print "Terminated %s" % (i)
-
-class YTTaskQueueConsumer(object):
- # One of these will exist per individual MPI task or one per MPI
- # subcommunicator, depending on the level of parallelism. They serve to
- # talk to the YTTaskQueueController on one side and possibly several
- # YTTaskExecutors on the other.
- #
- # One potential setup for this, when using MPI, would be to have the
- # Executors each have one of these, but only the head process of that
- # subcommunicator possess an external communicator. Then in next_task,
- # if the external communicator exists, one would probe that; otherwise,
- # accept a broadcast from the internal communicator's 0th task.
- def __init__(self, external_communicator, internal_communicator):
- self.external_communicator = external_communicator
- self.internal_communicator = internal_communicator
-
- def next_task(self):
- next_task = self.external_communicator.wait()
- #self.internal_communicator.notify(next_task)
- return next_task
-
-class YTTaskExecutor(object):
- _count = 0
- # One of these will exist per computational actor
- def __init__(self, tasks, communicator):
- self.communicator = communicator
- self.tasks = tasks
- self.name = "Runner%03s" % (self.__class__._count)
- self.__class__._count += 1
-
- def run(self):
- # Note that right now this only works for a 1:1 mapping of
- # YTTaskQueueConsumer to YTTaskExecutor
- next_task = None
- while 1:
- next_task = self.communicator.next_task()
- if next_task == -1: break
- print "Executing on %s" % (self.name),
- self.tasks[next_task]()
- print "Concluded on %s" % (self.name)
https://bitbucket.org/yt_analysis/yt/changeset/277cfcda9b19/
changeset: 277cfcda9b19
branch: yt
user: MatthewTurk
date: 2012-06-22 17:57:57
summary: Initial import of testing task queue code
affected #: 1 file
diff -r ad54bd076d432147923c28c738915e49e95c2b8b -r 277cfcda9b1915307e2b894d447888562fc275f7 yt/utilities/parallel_tools/task_queue.py
--- /dev/null
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -0,0 +1,146 @@
+"""
+Task queue in yt
+
+Author: Britton Smith <matthewturk at gmail.com>
+Affiliation: Michigan State University
+Author: Matthew Turk <matthewturk at gmail.com>
+Affiliation: Columbia University
+Homepage: http://yt-project.org/
+License:
+ Copyright (C) 2012 Matthew Turk. All Rights Reserved.
+
+ This file is part of yt.
+
+ yt is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+
+from mpi4py import MPI
+import time, threading, random
+
+messages = dict(
+ task = dict(msg = 'next'),
+ result = dict(msg = 'result'),
+ task_req = dict(msg = 'task_req'),
+ end = dict(msg = 'no_more_tasks'),
+)
+
+class TaskQueueNonRoot(object):
+ def __init__(self, tasks):
+ self.tasks = tasks
+ self.results = None
+
+ def send_result(self, result):
+ new_msg = messages['result'].copy()
+ new_msg['value'] = result
+ MPI.COMM_WORLD.send(new_msg, dest = 0, tag=1)
+
+ def get_next(self):
+ msg = messages['task_req'].copy()
+ MPI.COMM_WORLD.send(msg, dest = 0, tag=1)
+ msg = MPI.COMM_WORLD.recv(source = 0, tag=2)
+ if msg['msg'] == messages['end']['msg']:
+ raise StopIteration
+ return msg['value']
+
+ def __iter__(self):
+ while 1:
+ yield self.get_next()
+
+ def run(self, callable):
+ for task in self:
+ self.send_result(callable(task))
+ return self.finalize()
+
+ def finalize(self):
+ return MPI.COMM_WORLD.bcast(None, root = 0)
+
+class TaskQueueRoot(TaskQueueNonRoot):
+ def __init__(self, tasks):
+ self.tasks = tasks
+ self.results = {}
+ self.assignments = {}
+ self._notified = 0
+ self._current = 0
+ self._remaining = len(self.tasks)
+ self.dist = threading.Thread(target=self.handle_assignments)
+ self.dist.daemon = True
+ self.dist.start()
+ # Set up threading here
+
+ def insert_result(self, source_id, result):
+ task_id = self.assignments[source_id]
+ self.results[task_id] = result
+
+ def assign_task(self, source_id):
+ if self._remaining == 0:
+ msg = messages['end'].copy()
+ self._notified += 1
+ else:
+ msg = messages['task'].copy()
+ task_id = self._current
+ task = self.tasks[task_id]
+ self.assignments[source_id] = task_id
+ self._current += 1
+ self._remaining -= 1
+ msg['value'] = task
+ MPI.COMM_WORLD.send(msg, dest = source_id, tag = 2)
+
+ def handle_assignments(self):
+ while 1:
+ st = MPI.Status()
+ MPI.COMM_WORLD.Probe(MPI.ANY_SOURCE, tag = 1, status = st)
+ msg = MPI.COMM_WORLD.recv(source = st.source, tag = 1)
+ if msg['msg'] == messages['result']['msg']:
+ self.insert_result(st.source, msg['value'])
+ elif msg['msg'] == messages['task_req']['msg']:
+ self.assign_task(st.source)
+ else:
+ print "GOT AN UNKNOWN MESSAGE", msg
+ raise RuntimeError
+ if self._notified >= MPI.COMM_WORLD.size: break
+
+ def finalize(self):
+ self.dist.join()
+ rv = MPI.COMM_WORLD.bcast(self.results, root = 0)
+ return rv
+
+tasks = range(1000)
+if MPI.COMM_WORLD.rank == 0:
+ q = TaskQueueRoot(tasks)
+else:
+ q = TaskQueueNonRoot(None)
+
+count = 0
+ttot = 0.0
+def func(t):
+ print "Working", MPI.COMM_WORLD.rank, t
+ ts = random.random()
+ time.sleep(ts)
+ global count, ttot
+ ttot += ts
+ count += 1
+ return t
+
+t1 = time.time()
+vals = q.run(func)
+t2 = time.time()
+if MPI.COMM_WORLD.rank == 0:
+ for i in sorted(vals):
+ print i, vals[i]
+ assert(i == vals[i])
+MPI.COMM_WORLD.barrier()
+for i in range(MPI.COMM_WORLD.size):
+ if i == MPI.COMM_WORLD.rank:
+ print "On proc %s, %s tasks were executed (%0.3e eff)" % (i, count,
+ ttot/(t2-t1))
https://bitbucket.org/yt_analysis/yt/changeset/9fd815710bad/
changeset: 9fd815710bad
branch: yt
user: brittonsmith
date: 2012-06-22 18:22:25
summary: Changing task queue to use self.comm instead of COMM_WORLD.
affected #: 1 file
diff -r 277cfcda9b1915307e2b894d447888562fc275f7 -r 9fd815710bade5b167edce23b6e66c4030a6a2cb yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -28,6 +28,9 @@
from mpi4py import MPI
import time, threading, random
+from .parallel_analysis_interface import \
+ _get_comm
+
messages = dict(
task = dict(msg = 'next'),
result = dict(msg = 'result'),
@@ -39,16 +42,17 @@
def __init__(self, tasks):
self.tasks = tasks
self.results = None
+ self.comm = _get_comm(())
def send_result(self, result):
new_msg = messages['result'].copy()
new_msg['value'] = result
- MPI.COMM_WORLD.send(new_msg, dest = 0, tag=1)
+ self.comm.comm.send(new_msg, dest = 0, tag=1)
def get_next(self):
msg = messages['task_req'].copy()
- MPI.COMM_WORLD.send(msg, dest = 0, tag=1)
- msg = MPI.COMM_WORLD.recv(source = 0, tag=2)
+ self.comm.comm.send(msg, dest = 0, tag=1)
+ msg = self.comm.comm.recv(source = 0, tag=2)
if msg['msg'] == messages['end']['msg']:
raise StopIteration
return msg['value']
@@ -63,7 +67,7 @@
return self.finalize()
def finalize(self):
- return MPI.COMM_WORLD.bcast(None, root = 0)
+ return self.comm.comm.bcast(None, root = 0)
class TaskQueueRoot(TaskQueueNonRoot):
def __init__(self, tasks):
@@ -73,10 +77,12 @@
self._notified = 0
self._current = 0
self._remaining = len(self.tasks)
- self.dist = threading.Thread(target=self.handle_assignments)
- self.dist.daemon = True
- self.dist.start()
+ self.comm = _get_comm(())
+ self.handle_assignments()
# Set up threading here
+ # self.dist = threading.Thread(target=self.handle_assignments)
+ # self.dist.daemon = True
+ # self.dist.start()
def insert_result(self, source_id, result):
task_id = self.assignments[source_id]
@@ -94,13 +100,13 @@
self._current += 1
self._remaining -= 1
msg['value'] = task
- MPI.COMM_WORLD.send(msg, dest = source_id, tag = 2)
+ self.comm.comm.send(msg, dest = source_id, tag = 2)
def handle_assignments(self):
while 1:
st = MPI.Status()
- MPI.COMM_WORLD.Probe(MPI.ANY_SOURCE, tag = 1, status = st)
- msg = MPI.COMM_WORLD.recv(source = st.source, tag = 1)
+ self.comm.comm.Probe(MPI.ANY_SOURCE, tag = 1, status = st)
+ msg = self.comm.comm.recv(source = st.source, tag = 1)
if msg['msg'] == messages['result']['msg']:
self.insert_result(st.source, msg['value'])
elif msg['msg'] == messages['task_req']['msg']:
@@ -108,39 +114,10 @@
else:
print "GOT AN UNKNOWN MESSAGE", msg
raise RuntimeError
- if self._notified >= MPI.COMM_WORLD.size: break
+ if self._notified >= self.comm.comm.size: break
def finalize(self):
self.dist.join()
- rv = MPI.COMM_WORLD.bcast(self.results, root = 0)
+ rv = self.comm.comm.bcast(self.results, root = 0)
return rv
-tasks = range(1000)
-if MPI.COMM_WORLD.rank == 0:
- q = TaskQueueRoot(tasks)
-else:
- q = TaskQueueNonRoot(None)
-
-count = 0
-ttot = 0.0
-def func(t):
- print "Working", MPI.COMM_WORLD.rank, t
- ts = random.random()
- time.sleep(ts)
- global count, ttot
- ttot += ts
- count += 1
- return t
-
-t1 = time.time()
-vals = q.run(func)
-t2 = time.time()
-if MPI.COMM_WORLD.rank == 0:
- for i in sorted(vals):
- print i, vals[i]
- assert(i == vals[i])
-MPI.COMM_WORLD.barrier()
-for i in range(MPI.COMM_WORLD.size):
- if i == MPI.COMM_WORLD.rank:
- print "On proc %s, %s tasks were executed (%0.3e eff)" % (i, count,
- ttot/(t2-t1))
https://bitbucket.org/yt_analysis/yt/changeset/07ea72979b41/
changeset: 07ea72979b41
branch: yt
user: MatthewTurk
date: 2012-06-22 18:24:11
summary: Moving probing into the parallel analysis interface and using a callback.
affected #: 2 files
diff -r 9fd815710bade5b167edce23b6e66c4030a6a2cb -r 07ea72979b41e602b821b5afcf4dfca7262d3902 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -782,6 +782,11 @@
(tmp_recv, (rsize, roff), MPI.CHAR))
return recv
+ def probe(self, tag, callback):
+ st = MPI.Status()
+ self.comm.Probe(MPI.ANY_SOURCE, tag = tag, status = st)
+ callback(st)
+
communication_system = CommunicationSystem()
if parallel_capable:
ranks = na.arange(MPI.COMM_WORLD.size)
diff -r 9fd815710bade5b167edce23b6e66c4030a6a2cb -r 07ea72979b41e602b821b5afcf4dfca7262d3902 yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -104,18 +104,19 @@
def handle_assignments(self):
while 1:
- st = MPI.Status()
- self.comm.comm.Probe(MPI.ANY_SOURCE, tag = 1, status = st)
- msg = self.comm.comm.recv(source = st.source, tag = 1)
- if msg['msg'] == messages['result']['msg']:
- self.insert_result(st.source, msg['value'])
- elif msg['msg'] == messages['task_req']['msg']:
- self.assign_task(st.source)
- else:
- print "GOT AN UNKNOWN MESSAGE", msg
- raise RuntimeError
+ self.comm.probe(1, self.handshake)
if self._notified >= self.comm.comm.size: break
+ def handshake(self, status)
+ msg = self.comm.recv(source = status.source, tag = 1)
+ if msg['msg'] == messages['result']['msg']:
+ self.insert_result(st.source, msg['value'])
+ elif msg['msg'] == messages['task_req']['msg']:
+ self.assign_task(st.source)
+ else:
+ print "GOT AN UNKNOWN MESSAGE", msg
+ raise RuntimeError
+
def finalize(self):
self.dist.join()
rv = self.comm.comm.bcast(self.results, root = 0)
https://bitbucket.org/yt_analysis/yt/changeset/370924cab655/
changeset: 370924cab655
branch: yt
user: MatthewTurk
date: 2012-06-22 18:41:08
summary: Moving the probe into a probe loop, and changing how the task queue works.
affected #: 2 files
diff -r 07ea72979b41e602b821b5afcf4dfca7262d3902 -r 370924cab655d31af7145a775b9a5f879325c94b yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -782,10 +782,15 @@
(tmp_recv, (rsize, roff), MPI.CHAR))
return recv
- def probe(self, tag, callback):
- st = MPI.Status()
- self.comm.Probe(MPI.ANY_SOURCE, tag = tag, status = st)
- callback(st)
+ def probe_loop(self, tag, callback):
+ while 1:
+ st = MPI.Status()
+ self.comm.Probe(MPI.ANY_SOURCE, tag = tag, status = st)
+ try:
+ callback(st)
+ except StopIteration:
+ print "PROBE LOOP ENDING"
+ break
communication_system = CommunicationSystem()
if parallel_capable:
diff -r 07ea72979b41e602b821b5afcf4dfca7262d3902 -r 370924cab655d31af7145a775b9a5f879325c94b yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -54,6 +54,7 @@
self.comm.comm.send(msg, dest = 0, tag=1)
msg = self.comm.comm.recv(source = 0, tag=2)
if msg['msg'] == messages['end']['msg']:
+ print "Notified to end"
raise StopIteration
return msg['value']
@@ -66,8 +67,8 @@
self.send_result(callable(task))
return self.finalize()
- def finalize(self):
- return self.comm.comm.bcast(None, root = 0)
+ def finalize(self, vals = None):
+ return self.comm.comm.bcast(vals, root = 0)
class TaskQueueRoot(TaskQueueNonRoot):
def __init__(self, tasks):
@@ -78,18 +79,23 @@
self._current = 0
self._remaining = len(self.tasks)
self.comm = _get_comm(())
- self.handle_assignments()
+ print "Done with probe loop"
# Set up threading here
# self.dist = threading.Thread(target=self.handle_assignments)
# self.dist.daemon = True
# self.dist.start()
+ def run(self, func = None):
+ self.comm.probe_loop(1, self.handle_assignment)
+ return self.finalize(self.results)
+
def insert_result(self, source_id, result):
task_id = self.assignments[source_id]
self.results[task_id] = result
def assign_task(self, source_id):
if self._remaining == 0:
+ print "Notifying %s to end" % source_id
msg = messages['end'].copy()
self._notified += 1
else:
@@ -102,23 +108,15 @@
msg['value'] = task
self.comm.comm.send(msg, dest = source_id, tag = 2)
- def handle_assignments(self):
- while 1:
- self.comm.probe(1, self.handshake)
- if self._notified >= self.comm.comm.size: break
-
- def handshake(self, status)
- msg = self.comm.recv(source = status.source, tag = 1)
+ def handle_assignment(self, status):
+ msg = self.comm.comm.recv(source = status.source, tag = 1)
if msg['msg'] == messages['result']['msg']:
- self.insert_result(st.source, msg['value'])
+ self.insert_result(status.source, msg['value'])
elif msg['msg'] == messages['task_req']['msg']:
- self.assign_task(st.source)
+ self.assign_task(status.source)
else:
print "GOT AN UNKNOWN MESSAGE", msg
raise RuntimeError
-
- def finalize(self):
- self.dist.join()
- rv = self.comm.comm.bcast(self.results, root = 0)
- return rv
-
+ if self._notified >= self.comm.comm.size - 1:
+ print "NOTIFIED ENOUGH!"
+ raise StopIteration
https://bitbucket.org/yt_analysis/yt/changeset/18e28348e8a2/
changeset: 18e28348e8a2
branch: yt
user: brittonsmith
date: 2012-06-22 20:04:36
summary: Added task_queue helper function.
affected #: 1 file
diff -r 370924cab655d31af7145a775b9a5f879325c94b -r 18e28348e8a22aa2c3ccd5a9b4361b7def3eb615 yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -29,6 +29,7 @@
import time, threading, random
from .parallel_analysis_interface import \
+ communication_system, \
_get_comm
messages = dict(
@@ -39,10 +40,10 @@
)
class TaskQueueNonRoot(object):
- def __init__(self, tasks):
+ def __init__(self, comm, tasks):
self.tasks = tasks
self.results = None
- self.comm = _get_comm(())
+ self.comm = comm
def send_result(self, result):
new_msg = messages['result'].copy()
@@ -71,15 +72,14 @@
return self.comm.comm.bcast(vals, root = 0)
class TaskQueueRoot(TaskQueueNonRoot):
- def __init__(self, tasks):
+ def __init__(self, comm, tasks):
self.tasks = tasks
self.results = {}
self.assignments = {}
self._notified = 0
self._current = 0
self._remaining = len(self.tasks)
- self.comm = _get_comm(())
- print "Done with probe loop"
+ self.comm = comm
# Set up threading here
# self.dist = threading.Thread(target=self.handle_assignments)
# self.dist.daemon = True
@@ -120,3 +120,11 @@
if self._notified >= self.comm.comm.size - 1:
print "NOTIFIED ENOUGH!"
raise StopIteration
+
+def task_queue(func, tasks):
+ comm = _get_comm(())
+ if comm.comm.rank == 0:
+ my_q = TaskQueueRoot(comm, tasks)
+ else:
+ my_q = TaskQueueNonRoot(comm, None)
+ my_q.run(func)
https://bitbucket.org/yt_analysis/yt/changeset/3f42ee171bc3/
changeset: 3f42ee171bc3
branch: yt
user: brittonsmith
date: 2012-06-22 21:39:36
summary: task_queue helper function now accepts njobs keyword and creates
subcomms.
affected #: 1 file
diff -r 18e28348e8a22aa2c3ccd5a9b4361b7def3eb615 -r 3f42ee171bc3f40de37aa29c7c163395fc1c0654 yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -26,11 +26,14 @@
"""
from mpi4py import MPI
+import numpy as na
import time, threading, random
+from yt.funcs import *
from .parallel_analysis_interface import \
communication_system, \
- _get_comm
+ _get_comm, \
+ parallel_capable
messages = dict(
task = dict(msg = 'next'),
@@ -40,22 +43,27 @@
)
class TaskQueueNonRoot(object):
- def __init__(self, comm, tasks):
+ def __init__(self, tasks, comm, subcomm):
self.tasks = tasks
self.results = None
self.comm = comm
+ self.subcomm = subcomm
def send_result(self, result):
new_msg = messages['result'].copy()
new_msg['value'] = result
- self.comm.comm.send(new_msg, dest = 0, tag=1)
+ if self.subcomm.rank == 0:
+ self.comm.comm.send(new_msg, dest = 0, tag=1)
+ self.subcomm.barrier()
def get_next(self):
msg = messages['task_req'].copy()
- self.comm.comm.send(msg, dest = 0, tag=1)
- msg = self.comm.comm.recv(source = 0, tag=2)
+ if self.subcomm.rank == 0:
+ self.comm.comm.send(msg, dest = 0, tag=1)
+ msg = self.comm.comm.recv(source = 0, tag=2)
+ msg = self.subcomm.bcast(msg, root=0)
if msg['msg'] == messages['end']['msg']:
- print "Notified to end"
+ mylog.info("Notified to end")
raise StopIteration
return msg['value']
@@ -72,7 +80,8 @@
return self.comm.comm.bcast(vals, root = 0)
class TaskQueueRoot(TaskQueueNonRoot):
- def __init__(self, comm, tasks):
+ def __init__(self, tasks, comm, njobs):
+ self.njobs = njobs
self.tasks = tasks
self.results = {}
self.assignments = {}
@@ -117,14 +126,33 @@
else:
print "GOT AN UNKNOWN MESSAGE", msg
raise RuntimeError
- if self._notified >= self.comm.comm.size - 1:
+ if self._notified >= self.njobs:
print "NOTIFIED ENOUGH!"
raise StopIteration
-def task_queue(func, tasks):
+def task_queue(func, tasks, njobs=0):
comm = _get_comm(())
+ if not parallel_capable:
+ mylog.error("Cannot create task queue for serial process.")
+ raise RunTimeError
+ my_size = comm.comm.size
+ if njobs <= 0:
+ njobs = my_size - 1
+ if njobs >= my_size:
+ mylog.error("You have asked for %s jobs, but only %s processors are available.",
+ njobs, (my_size - 1))
+ raise RunTimeError
+ my_rank = comm.rank
+ all_new_comms = na.array_split(na.arange(1, my_size), njobs)
+ all_new_comms.insert(0, na.array([0]))
+ for i,comm_set in enumerate(all_new_comms):
+ if my_rank in comm_set:
+ my_new_id = i
+ break
+ subcomm = communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
+
if comm.comm.rank == 0:
- my_q = TaskQueueRoot(comm, tasks)
+ my_q = TaskQueueRoot(tasks, comm, njobs)
else:
- my_q = TaskQueueNonRoot(comm, None)
- my_q.run(func)
+ my_q = TaskQueueNonRoot(None, comm, subcomm)
+ return my_q.run(func)
https://bitbucket.org/yt_analysis/yt/changeset/ee6b53e913e6/
changeset: ee6b53e913e6
branch: yt
user: brittonsmith
date: 2012-06-22 22:04:58
summary: Cleaned up debug prints in task queue.
affected #: 1 file
diff -r 3f42ee171bc3f40de37aa29c7c163395fc1c0654 -r ee6b53e913e6fbc0a09fb498bb8ffbca81bb3894 yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -25,7 +25,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
-from mpi4py import MPI
import numpy as na
import time, threading, random
@@ -104,7 +103,7 @@
def assign_task(self, source_id):
if self._remaining == 0:
- print "Notifying %s to end" % source_id
+ mylog.debug("Notifying %s to end", source_id)
msg = messages['end'].copy()
self._notified += 1
else:
@@ -124,10 +123,9 @@
elif msg['msg'] == messages['task_req']['msg']:
self.assign_task(status.source)
else:
- print "GOT AN UNKNOWN MESSAGE", msg
+ mylog.error("GOT AN UNKNOWN MESSAGE: %s", msg)
raise RuntimeError
if self._notified >= self.njobs:
- print "NOTIFIED ENOUGH!"
raise StopIteration
def task_queue(func, tasks, njobs=0):
https://bitbucket.org/yt_analysis/yt/changeset/df2899c15f59/
changeset: df2899c15f59
branch: yt
user: brittonsmith
date: 2012-06-22 23:38:01
summary: Added dynamic_parallel_objects function and dynamic keyword to parallel_objects
to call dynamic_parallel_objects.
affected #: 2 files
diff -r ee6b53e913e6fbc0a09fb498bb8ffbca81bb3894 -r df2899c15f599537ca370351e9f6e6cafa89a17a yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -340,7 +340,13 @@
result = None
result_id = None
-def parallel_objects(objects, njobs = 0, storage = None, barrier = True):
+def parallel_objects(objects, njobs = 0, storage = None, barrier = True,
+ dynamic = False):
+ if dynamic:
+ from .task_queue import dynamic_parallel_objects
+ dynamic_parallel_objects(objects, njobs=njobs,
+ storage=storage)
+
if not parallel_capable:
njobs = 1
my_communicator = communication_system.communicators[-1]
@@ -789,7 +795,7 @@
try:
callback(st)
except StopIteration:
- print "PROBE LOOP ENDING"
+ mylog.debug("Probe loop ending.")
break
communication_system = CommunicationSystem()
diff -r ee6b53e913e6fbc0a09fb498bb8ffbca81bb3894 -r df2899c15f599537ca370351e9f6e6cafa89a17a yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -32,7 +32,8 @@
from .parallel_analysis_interface import \
communication_system, \
_get_comm, \
- parallel_capable
+ parallel_capable, \
+ ResultsStorage
messages = dict(
task = dict(msg = 'next'),
@@ -44,7 +45,7 @@
class TaskQueueNonRoot(object):
def __init__(self, tasks, comm, subcomm):
self.tasks = tasks
- self.results = None
+ self.results = {}
self.comm = comm
self.subcomm = subcomm
@@ -153,4 +154,46 @@
my_q = TaskQueueRoot(tasks, comm, njobs)
else:
my_q = TaskQueueNonRoot(None, comm, subcomm)
+ communication_system.pop()
return my_q.run(func)
+
+def dynamic_parallel_objects(tasks, njobs=0, storage=None):
+ comm = _get_comm(())
+ if not parallel_capable:
+ mylog.error("Cannot create task queue for serial process.")
+ raise RunTimeError
+ my_size = comm.comm.size
+ if njobs <= 0:
+ njobs = my_size - 1
+ if njobs >= my_size:
+ mylog.error("You have asked for %s jobs, but only %s processors are available.",
+ njobs, (my_size - 1))
+ raise RunTimeError
+ my_rank = comm.rank
+ all_new_comms = na.array_split(na.arange(1, my_size), njobs)
+ all_new_comms.insert(0, na.array([0]))
+ for i,comm_set in enumerate(all_new_comms):
+ if my_rank in comm_set:
+ my_new_id = i
+ break
+ subcomm = communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
+
+ if comm.comm.rank == 0:
+ my_q = TaskQueueRoot(tasks, comm, njobs)
+ my_q.comm.probe_loop(1, my_q.handle_assignment)
+ else:
+ my_q = TaskQueueNonRoot(None, comm, subcomm)
+ if storage is None:
+ for task in my_q:
+ yield task
+ else:
+ for task in my_q:
+ rstore = ResultsStorage()
+ yield rstore, task
+ my_q.send_result(rstore.result)
+
+ if storage is not None:
+ my_results = my_q.comm.comm.bcast(my_q.results, root=0)
+ storage.update(my_results)
+
+ communication_system.pop()
https://bitbucket.org/yt_analysis/yt/changeset/17225c236e6a/
changeset: 17225c236e6a
branch: yt
user: MatthewTurk
date: 2012-06-22 23:57:49
summary: Merged in brittonsmith/post-office (pull request #176)
affected #: 2 files
diff -r f6cc8e9297e8b0a4250d5d975f8f0abf9ce62fcc -r 17225c236e6afc77beb9f5620dbc560e6ee26834 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -340,7 +340,13 @@
result = None
result_id = None
-def parallel_objects(objects, njobs = 0, storage = None, barrier = True):
+def parallel_objects(objects, njobs = 0, storage = None, barrier = True,
+ dynamic = False):
+ if dynamic:
+ from .task_queue import dynamic_parallel_objects
+ dynamic_parallel_objects(objects, njobs=njobs,
+ storage=storage)
+
if not parallel_capable:
njobs = 1
my_communicator = communication_system.communicators[-1]
@@ -782,6 +788,16 @@
(tmp_recv, (rsize, roff), MPI.CHAR))
return recv
+ def probe_loop(self, tag, callback):
+ while 1:
+ st = MPI.Status()
+ self.comm.Probe(MPI.ANY_SOURCE, tag = tag, status = st)
+ try:
+ callback(st)
+ except StopIteration:
+ mylog.debug("Probe loop ending.")
+ break
+
communication_system = CommunicationSystem()
if parallel_capable:
ranks = na.arange(MPI.COMM_WORLD.size)
diff -r f6cc8e9297e8b0a4250d5d975f8f0abf9ce62fcc -r 17225c236e6afc77beb9f5620dbc560e6ee26834 yt/utilities/parallel_tools/task_queue.py
--- a/yt/utilities/parallel_tools/task_queue.py
+++ b/yt/utilities/parallel_tools/task_queue.py
@@ -1,11 +1,13 @@
"""
-A task queue for distributing work to worker agents
+Task queue in yt
+Author: Britton Smith <matthewturk at gmail.com>
+Affiliation: Michigan State University
Author: Matthew Turk <matthewturk at gmail.com>
Affiliation: Columbia University
Homepage: http://yt-project.org/
License:
- Copyright (C) 2011 Matthew Turk. All Rights Reserved.
+ Copyright (C) 2012 Matthew Turk. All Rights Reserved.
This file is part of yt.
@@ -23,127 +25,175 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
-import threading
+import numpy as na
+import time, threading, random
+
from yt.funcs import *
+from .parallel_analysis_interface import \
+ communication_system, \
+ _get_comm, \
+ parallel_capable, \
+ ResultsStorage
-# The idea here is that we have a set of tasks, which we want to distribute.
-# We'll try to make this forward-compatible. To do so, we want to support the
-# idea that there's a single, global set of tasks, as well as consumers that
-# receive tasks from the main controller. These consumers then pass them out
-# to executors.
-#
-# The middle level, the "Consumer," is only really distinct from the executor
-# in the case that there is an MPI subcommunicator. The reason for the
-# separation is so that the controller only communicates with a single member
-# of each subcommunicator, which then passes that info back out.
+messages = dict(
+ task = dict(msg = 'next'),
+ result = dict(msg = 'result'),
+ task_req = dict(msg = 'task_req'),
+ end = dict(msg = 'no_more_tasks'),
+)
-def locked(func):
- @wraps(func)
- def exclusive(self, *args, **kwargs):
- with self.lock:
- return func(self, *args, **kwargs)
- return exclusive
+class TaskQueueNonRoot(object):
+ def __init__(self, tasks, comm, subcomm):
+ self.tasks = tasks
+ self.results = {}
+ self.comm = comm
+ self.subcomm = subcomm
-class YTTaskCommunicator(object):
- # This should carefully be checked for a race condition, particularly in
- # the wait() function
- def __init__(self, interval = 2.0):
- self.interval = interval
- self.task_id = None
- self.waiting = False
- self.lock = threading.Lock()
+ def send_result(self, result):
+ new_msg = messages['result'].copy()
+ new_msg['value'] = result
+ if self.subcomm.rank == 0:
+ self.comm.comm.send(new_msg, dest = 0, tag=1)
+ self.subcomm.barrier()
- @locked
- def send_task(self, task_id):
- self.task_id = task_id
+ def get_next(self):
+ msg = messages['task_req'].copy()
+ if self.subcomm.rank == 0:
+ self.comm.comm.send(msg, dest = 0, tag=1)
+ msg = self.comm.comm.recv(source = 0, tag=2)
+ msg = self.subcomm.bcast(msg, root=0)
+ if msg['msg'] == messages['end']['msg']:
+ mylog.info("Notified to end")
+ raise StopIteration
+ return msg['value']
- @locked
- def query(self):
- return (self.waiting and self.task_id is None)
+ def __iter__(self):
+ while 1:
+ yield self.get_next()
- def wait(self):
- self.waiting = True
- while self.task_id is None:
- time.sleep(self.interval)
- with self.lock:
- self.waiting = False
- new_task_id = self.task_id
- self.task_id = None
- return new_task_id
+ def run(self, callable):
+ for task in self:
+ self.send_result(callable(task))
+ return self.finalize()
-class YTTaskQueueController(threading.Thread):
- # There's only one of these for every instance of yt -- whether than
- # instance be spread across processors or not.
- # We assume that this will exist in the process space of a consumer, so it
- # will be threading based.
- def __init__(self, tasks, interval = 2.0, communicators = None):
- self.assignments = []
- self.interval = interval
- # Communicators can be anything but they have to implement a mechanism
- # for saying, "I'm ready" and "I'm done"
+ def finalize(self, vals = None):
+ return self.comm.comm.bcast(vals, root = 0)
+
+class TaskQueueRoot(TaskQueueNonRoot):
+ def __init__(self, tasks, comm, njobs):
+ self.njobs = njobs
self.tasks = tasks
- self.communicators = communicators
- threading.Thread.__init__(self)
+ self.results = {}
+ self.assignments = {}
+ self._notified = 0
+ self._current = 0
+ self._remaining = len(self.tasks)
+ self.comm = comm
+ # Set up threading here
+ # self.dist = threading.Thread(target=self.handle_assignments)
+ # self.dist.daemon = True
+ # self.dist.start()
- def run(self):
- # Now we bootstrap
- for i,c in enumerate(self.communicators):
- self.assignments.append(i)
- if i == len(self.tasks): break
- c.send_task(i)
- while len(self.assignments) < len(self.tasks):
- time.sleep(self.interval)
- for i,c in enumerate(self.communicators):
- if not c.query(): continue
- print "Sending assignment %s to %s" % (
- len(self.assignments), i)
- c.send_task(len(self.assignments))
- self.assignments.append(i)
- if len(self.assignments) >= len(self.tasks): break
- terminated = 0
- while terminated != len(self.communicators):
- for i,c in enumerate(self.communicators):
- if not c.query(): continue
- c.send_task(-1)
- terminated += 1
- print "Terminated %s" % (i)
+ def run(self, func = None):
+ self.comm.probe_loop(1, self.handle_assignment)
+ return self.finalize(self.results)
-class YTTaskQueueConsumer(object):
- # One of these will exist per individual MPI task or one per MPI
- # subcommunicator, depending on the level of parallelism. They serve to
- # talk to the YTTaskQueueController on one side and possibly several
- # YTTaskExecutors on the other.
- #
- # One potential setup for this, when using MPI, would be to have the
- # Executors each have one of these, but only the head process of that
- # subcommunicator possess an external communicator. Then in next_task,
- # if the external communicator exists, one would probe that; otherwise,
- # accept a broadcast from the internal communicator's 0th task.
- def __init__(self, external_communicator, internal_communicator):
- self.external_communicator = external_communicator
- self.internal_communicator = internal_communicator
+ def insert_result(self, source_id, result):
+ task_id = self.assignments[source_id]
+ self.results[task_id] = result
- def next_task(self):
- next_task = self.external_communicator.wait()
- #self.internal_communicator.notify(next_task)
- return next_task
+ def assign_task(self, source_id):
+ if self._remaining == 0:
+ mylog.debug("Notifying %s to end", source_id)
+ msg = messages['end'].copy()
+ self._notified += 1
+ else:
+ msg = messages['task'].copy()
+ task_id = self._current
+ task = self.tasks[task_id]
+ self.assignments[source_id] = task_id
+ self._current += 1
+ self._remaining -= 1
+ msg['value'] = task
+ self.comm.comm.send(msg, dest = source_id, tag = 2)
-class YTTaskExecutor(object):
- _count = 0
- # One of these will exist per computational actor
- def __init__(self, tasks, communicator):
- self.communicator = communicator
- self.tasks = tasks
- self.name = "Runner%03s" % (self.__class__._count)
- self.__class__._count += 1
+ def handle_assignment(self, status):
+ msg = self.comm.comm.recv(source = status.source, tag = 1)
+ if msg['msg'] == messages['result']['msg']:
+ self.insert_result(status.source, msg['value'])
+ elif msg['msg'] == messages['task_req']['msg']:
+ self.assign_task(status.source)
+ else:
+ mylog.error("GOT AN UNKNOWN MESSAGE: %s", msg)
+ raise RuntimeError
+ if self._notified >= self.njobs:
+ raise StopIteration
- def run(self):
- # Note that right now this only works for a 1:1 mapping of
- # YTTaskQueueConsumer to YTTaskExecutor
- next_task = None
- while 1:
- next_task = self.communicator.next_task()
- if next_task == -1: break
- print "Executing on %s" % (self.name),
- self.tasks[next_task]()
- print "Concluded on %s" % (self.name)
+def task_queue(func, tasks, njobs=0):
+ comm = _get_comm(())
+ if not parallel_capable:
+ mylog.error("Cannot create task queue for serial process.")
+ raise RunTimeError
+ my_size = comm.comm.size
+ if njobs <= 0:
+ njobs = my_size - 1
+ if njobs >= my_size:
+ mylog.error("You have asked for %s jobs, but only %s processors are available.",
+ njobs, (my_size - 1))
+ raise RunTimeError
+ my_rank = comm.rank
+ all_new_comms = na.array_split(na.arange(1, my_size), njobs)
+ all_new_comms.insert(0, na.array([0]))
+ for i,comm_set in enumerate(all_new_comms):
+ if my_rank in comm_set:
+ my_new_id = i
+ break
+ subcomm = communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
+
+ if comm.comm.rank == 0:
+ my_q = TaskQueueRoot(tasks, comm, njobs)
+ else:
+ my_q = TaskQueueNonRoot(None, comm, subcomm)
+ communication_system.pop()
+ return my_q.run(func)
+
+def dynamic_parallel_objects(tasks, njobs=0, storage=None):
+ comm = _get_comm(())
+ if not parallel_capable:
+ mylog.error("Cannot create task queue for serial process.")
+ raise RunTimeError
+ my_size = comm.comm.size
+ if njobs <= 0:
+ njobs = my_size - 1
+ if njobs >= my_size:
+ mylog.error("You have asked for %s jobs, but only %s processors are available.",
+ njobs, (my_size - 1))
+ raise RunTimeError
+ my_rank = comm.rank
+ all_new_comms = na.array_split(na.arange(1, my_size), njobs)
+ all_new_comms.insert(0, na.array([0]))
+ for i,comm_set in enumerate(all_new_comms):
+ if my_rank in comm_set:
+ my_new_id = i
+ break
+ subcomm = communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
+
+ if comm.comm.rank == 0:
+ my_q = TaskQueueRoot(tasks, comm, njobs)
+ my_q.comm.probe_loop(1, my_q.handle_assignment)
+ else:
+ my_q = TaskQueueNonRoot(None, comm, subcomm)
+ if storage is None:
+ for task in my_q:
+ yield task
+ else:
+ for task in my_q:
+ rstore = ResultsStorage()
+ yield rstore, task
+ my_q.send_result(rstore.result)
+
+ if storage is not None:
+ my_results = my_q.comm.comm.bcast(my_q.results, root=0)
+ storage.update(my_results)
+
+ communication_system.pop()
Repository URL: https://bitbucket.org/yt_analysis/yt/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
More information about the yt-svn
mailing list