[yt-svn] commit/yt: 8 new changesets
Bitbucket
commits-noreply at bitbucket.org
Thu Aug 23 11:09:34 PDT 2012
8 new commits in yt:
https://bitbucket.org/yt_analysis/yt/changeset/6a46bd81eb7b/
changeset: 6a46bd81eb7b
branch: yt
user: MatthewTurk
date: 2012-06-09 21:59:12
summary: Adding an IO stager
affected #: 1 file
diff -r 08f29a9cec42435bb0581a2059b4f1c7abd6c8c5 -r 6a46bd81eb7bb4c77eeadb045071dc7a2800bcc8 yt/utilities/parallel_tools/io_runner.py
--- /dev/null
+++ b/yt/utilities/parallel_tools/io_runner.py
@@ -0,0 +1,195 @@
+"""
+A simple IO staging mechanism
+
+Author: Matthew Turk <matthewturk at gmail.com>
+Affiliation: KIPAC/SLAC/Stanford
+Homepage: http://yt-project.org/
+License:
+ Copyright (C) 2008-2011 Matthew Turk. All Rights Reserved.
+
+ This file is part of yt.
+
+ yt is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+
+import os
+from .parallel_analysis_interface import ProcessorPool
+from yt.utilities.io_handler import BaseIOHandler
+from contextlib import contextmanager
+import time
+
+try:
+ from .parallel_analysis_interface import MPI
+except ImportError:
+ pass
+
+YT_TAG_MESSAGE = 317 # Cell 317 knows where to go
+
+class IOCommunicator(BaseIOHandler):
+ def __init__(self, pf, wg, pool):
+ mylog.info("Initializing IOCommunicator")
+ self.pf = pf
+ self.wg = wg # We don't need to use this!
+ self.pool = pool
+ self.comm = pool.comm
+ # We read our grids here
+ self.grids = []
+ storage = {}
+ grids = pf.h.grids.tolist()
+ grids.sort(key=lambda a:a.filename)
+ for sto, g in parallel_objects(grids, storage = storage):
+ sto.result = self.comm.rank
+ sto.result_id = g.id
+ self.grids.append(g)
+ self._id_offset = pf.h.grids[0]._id_offset
+ mylog.info("Reading from disk ...")
+ self.initialize_data()
+ mylog.info("Broadcasting ...")
+ self.comm.comm.bcast(storage, root = wg.ranks[0])
+ mylog.info("Done.")
+ self.hooks = []
+
+ def initialize_data(self):
+ pf = self.pf
+ fields = [f for f in pf.h.field_list
+ if not pf.field_info[f].particle_type]
+ pfields = [f for f in pf.h.field_list
+ if pf.field_info[f].particle_type]
+ # Preload is only defined for Enzo ...
+ if pf.h.io._data_style == "enzo_packed_3d":
+ self.queue = pf.h.io.queue
+ pf.h.io.preload(self.grids, fields)
+ for g in self.grids:
+ for f in fields:
+ if f not in self.queue[g.id]:
+ d = na.zeros(g.ActiveDimensions, dtype='float64')
+ self.queue[g.id][f] = d
+ for f in pfields:
+ self.queue[g.id][f] = self._read(g, f)
+ else:
+ self.queue = {}
+ for g in self.grids:
+ for f in fields + pfields:
+ self.queue[g.id][f] = pf.h.io._read(g, f)
+
+ def _read(self, g, f):
+ fi = self.pf.field_info[f]
+ if fi.particle_type and g.NumberOfParticles == 0:
+ # because this gets upcast to float
+ return na.array([],dtype='float64')
+ try:
+ temp = self.pf.h.io._read_data_set(g, f)
+ except:# self.pf.hierarchy.io._read_exception as exc:
+ if fi.not_in_all:
+ temp = na.zeros(g.ActiveDimensions, dtype='float64')
+ else:
+ raise
+ return temp
+
+ def wait(self):
+ status = MPI.Status()
+ while 1:
+ if self.comm.comm.Iprobe(MPI.ANY_SOURCE,
+ YT_TAG_MESSAGE,
+ status = status):
+ msg = self.comm.comm.recv(
+ source = status.source, tag = YT_TAG_MESSAGE)
+ if msg['op'] == "end":
+ mylog.debug("Shutting down IO.")
+ break
+ self._send_data(msg, status.source)
+ status = MPI.Status()
+ else:
+ time.sleep(1e-2)
+
+ def _send_data(self, msg, dest):
+ grid_id = msg['grid_id']
+ field = msg['field']
+ ts = self.queue[grid_id][field].astype("float64")
+ mylog.debug("Opening send to %s (%s)", dest, ts.shape)
+ self.hooks.append(self.comm.comm.Isend([ts, MPI.DOUBLE], dest = dest))
+
+class IOHandlerRemote(BaseIOHandler):
+ _data_style = "remote"
+
+ def __init__(self, pf, wg, pool):
+ self.pf = pf
+ self.wg = wg # probably won't need
+ self.pool = pool
+ self.comm = pool.comm
+ self.proc_map = self.comm.comm.bcast(None,
+ root = pool['io'].ranks[0])
+ super(IOHandlerRemote, self).__init__()
+
+ def _read_data_set(self, grid, field):
+ dest = self.proc_map[grid.id]
+ msg = dict(grid_id = grid.id, field = field, op="read")
+ mylog.debug("Requesting %s for %s from %s", field, grid, dest)
+ if self.pf.field_info[field].particle_type:
+ data = na.empty(grid.NumberOfParticles, 'float64')
+ else:
+ data = na.empty(grid.ActiveDimensions, 'float64')
+ hook = self.comm.comm.Irecv([data, MPI.DOUBLE], source = dest)
+ self.comm.comm.send(msg, dest = dest, tag = YT_TAG_MESSAGE)
+ mylog.debug("Waiting for data.")
+ MPI.Request.Wait(hook)
+ return data
+
+ def _read_data_slice(self, grid, field, axis, coord):
+ sl = [slice(None), slice(None), slice(None)]
+ sl[axis] = slice(coord, coord + 1)
+ #sl = tuple(reversed(sl))
+ return self._read_data_set(grid,field)[sl]
+
+ def terminate(self):
+ msg = dict(op='end')
+ if self.wg.comm.rank == 0:
+ for rank in self.pool['io'].ranks:
+ mylog.debug("Sending termination message to %s", rank)
+ self.comm.comm.send(msg, dest=rank, tag=YT_TAG_MESSAGE)
+
+ at contextmanager
+def remote_io(pf, wg, pool):
+ original_io = pf.h.io
+ pf.h.io = IOHandlerRemote(pf, wg, pool)
+ yield
+ pf.h.io.terminate()
+ pf.h.io = original_io
+
+def io_nodes(fn, n_io, n_work, func, *args, **kwargs):
+ pool, wg = ProcessorPool.from_sizes([(n_io, "io"), (n_work, "work")])
+ rv = None
+ if wg.name == "work":
+ pf = load(fn)
+ with remote_io(pf, wg, pool):
+ rv = func(pf, *args, **kwargs)
+ elif wg.name == "io":
+ pf = load(fn)
+ io = IOCommunicator(pf, wg, pool)
+ io.wait()
+ # We should broadcast the result
+ rv = pool.comm.mpi_bcast(rv, root=pool['work'].ranks[0])
+ pool.free_all()
+ mylog.debug("Return value: %s", rv)
+ return rv
+
+# Here is an example of how to use this functionality.
+if __name__ == "__main__":
+ def gq(pf):
+ dd = pf.h.all_data()
+ return dd.quantities["TotalQuantity"]("CellMassMsun")
+ q = io_nodes("DD0087/DD0087", 8, 24, gq)
+ mylog.info(q)
+
+
https://bitbucket.org/yt_analysis/yt/changeset/7216eb530568/
changeset: 7216eb530568
branch: yt
user: MatthewTurk
date: 2012-06-09 23:20:54
summary: Fixing indentation issue.
affected #: 1 file
diff -r 6a46bd81eb7bb4c77eeadb045071dc7a2800bcc8 -r 7216eb530568a5f3077423b91e69dd845fb5518c yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -269,7 +269,7 @@
self.size = size
self.ranks = ranks
self.comm = comm
- self.name = name
+ self.name = name
class ProcessorPool(object):
comm = None
@@ -292,11 +292,9 @@
raise RuntimeError
if ranks is None:
ranks = [self.available_ranks.pop(0) for i in range(size)]
-
- # Default name to the workgroup number.
+ # Default name to the workgroup number.
if name is None:
- name = string(len(workgroups))
-
+ name = string(len(workgroups))
group = self.comm.comm.Get_group().Incl(ranks)
new_comm = self.comm.comm.Create(group)
if self.comm.rank in ranks:
https://bitbucket.org/yt_analysis/yt/changeset/27ed700ec6d7/
changeset: 27ed700ec6d7
branch: yt
user: MatthewTurk
date: 2012-06-10 00:39:41
summary: Controller system skeleton
affected #: 2 files
diff -r 7216eb530568a5f3077423b91e69dd845fb5518c -r 27ed700ec6d7e6c4b35541224d82af412c840239 yt/utilities/parallel_tools/controller_system.py
--- /dev/null
+++ b/yt/utilities/parallel_tools/controller_system.py
@@ -0,0 +1,66 @@
+"""
+A queueing system based on MPI
+
+Author: Matthew Turk <matthewturk at gmail.com>
+Affiliation: Columbia
+Homepage: http://yt-project.org/
+License:
+ Copyright (C) 2012 Matthew Turk. All Rights Reserved.
+
+ This file is part of yt.
+
+ yt is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+
+try:
+ from .parallel_analysis_interface import MPI
+except ImportError:
+ pass
+from contextmanager import contextlib
+
+
+class WorkSplitter(object):
+ def __init__(self, controller, group1, group2):
+ self.group1 = group1
+ self.group2 = group2
+ self.controller = controller
+
+ @classmethod
+ def setup(cls, ng1, ng2):
+ pp, wg = ProcessorPool.from_sizes(
+ [(1, "controller"), (ng1, "group1"), (ng2, "group2")])
+ groupc = pp['controller']
+ group1 = pp['group1']
+ group2 = pp['group2']
+ obj = cls(groupc, group1, group2)
+ obj.run(wg.name)
+
+ def run(self, name):
+ if name == "controller":
+ self.run_controller()
+ elif name == "group1":
+ self.run_group1()
+ elif name == "group2":
+ self.run_group2()
+ else:
+ raise NotImplementedError
+
+ def run_controller(self):
+ raise NotImplementedError
+
+ def run_group1(self):
+ raise NotImplementedError
+
+ def run_group2(self):
+ raise NotImplementedError
diff -r 7216eb530568a5f3077423b91e69dd845fb5518c -r 27ed700ec6d7e6c4b35541224d82af412c840239 yt/utilities/parallel_tools/io_runner.py
--- a/yt/utilities/parallel_tools/io_runner.py
+++ b/yt/utilities/parallel_tools/io_runner.py
@@ -2,10 +2,10 @@
A simple IO staging mechanism
Author: Matthew Turk <matthewturk at gmail.com>
-Affiliation: KIPAC/SLAC/Stanford
+Affiliation: Columbia
Homepage: http://yt-project.org/
License:
- Copyright (C) 2008-2011 Matthew Turk. All Rights Reserved.
+ Copyright (C) 2012 Matthew Turk. All Rights Reserved.
This file is part of yt.
https://bitbucket.org/yt_analysis/yt/changeset/3e4e9d309c96/
changeset: 3e4e9d309c96
branch: yt
user: MatthewTurk
date: 2012-06-12 19:15:36
summary: Controller system
affected #: 1 file
diff -r 27ed700ec6d7e6c4b35541224d82af412c840239 -r 3e4e9d309c96b5757ccb414c9f1da44976ea4096 yt/utilities/parallel_tools/controller_system.py
--- a/yt/utilities/parallel_tools/controller_system.py
+++ b/yt/utilities/parallel_tools/controller_system.py
@@ -28,7 +28,7 @@
except ImportError:
pass
from contextmanager import contextlib
-
+from abc import ABCMeta, abstractmethod, abstractproperty
class WorkSplitter(object):
def __init__(self, controller, group1, group2):
@@ -56,11 +56,14 @@
else:
raise NotImplementedError
+ @abstractmethod
def run_controller(self):
- raise NotImplementedError
+ pass
+ @abstractmethod
def run_group1(self):
- raise NotImplementedError
+ pass
+ @abstractmethod
def run_group2(self):
- raise NotImplementedError
+ pass
https://bitbucket.org/yt_analysis/yt/changeset/4516f7cf678c/
changeset: 4516f7cf678c
branch: yt
user: MatthewTurk
date: 2012-06-28 18:42:52
summary: Fixing another missing pending request update location
affected #: 1 file
diff -r 437a66fd8588e4ab5735718bfa43b41f19b8fdf3 -r 4516f7cf678c91a89a0d5d74b4263bccedda8d0e yt/gui/reason/html/app/controller/Notebook.js
--- a/yt/gui/reason/html/app/controller/Notebook.js
+++ b/yt/gui/reason/html/app/controller/Notebook.js
@@ -73,9 +73,11 @@
},
addRequest: function(request_id, command) {
+ /*console.log("Adding request " + request_id);*/
this.getRequestsStore().add({
request_id: request_id, command: command,
});
+ reason.pending.update([this.getRequestsStore().count()]);
},
addCell: function(cell) {
@@ -85,6 +87,7 @@
var ind = this.getRequestsStore().find(
'request_id', cell['result_id']);
if (ind != -1) {
+ /*console.log("Removing request " + cell['result_id']);*/
var rec = this.getRequestsStore().removeAt(ind);
}
reason.pending.update([this.getRequestsStore().count()]);
https://bitbucket.org/yt_analysis/yt/changeset/a374af7dc9d3/
changeset: a374af7dc9d3
branch: yt
user: MatthewTurk
date: 2012-08-19 20:26:24
summary: Merging in an old changeset for Reason
affected #: 1 file
diff -r 7c5ad85490e8ade384a165f1af51e1ef7cd9f692 -r a374af7dc9d3f5555530d1b73046adc30e3b8732 yt/gui/reason/html/app/controller/Notebook.js
--- a/yt/gui/reason/html/app/controller/Notebook.js
+++ b/yt/gui/reason/html/app/controller/Notebook.js
@@ -73,9 +73,11 @@
},
addRequest: function(request_id, command) {
+ /*console.log("Adding request " + request_id);*/
this.getRequestsStore().add({
request_id: request_id, command: command,
});
+ reason.pending.update([this.getRequestsStore().count()]);
},
addCell: function(cell) {
@@ -85,6 +87,7 @@
var ind = this.getRequestsStore().find(
'request_id', cell['result_id']);
if (ind != -1) {
+ /*console.log("Removing request " + cell['result_id']);*/
var rec = this.getRequestsStore().removeAt(ind);
}
reason.pending.update([this.getRequestsStore().count()]);
https://bitbucket.org/yt_analysis/yt/changeset/2c7cce8d50ed/
changeset: 2c7cce8d50ed
branch: yt
user: MatthewTurk
date: 2012-08-19 20:27:37
summary: Merging the first pass at IO controller system, for staging IO on MPI nodes.
affected #: 3 files
diff -r a374af7dc9d3f5555530d1b73046adc30e3b8732 -r 2c7cce8d50ed4fb4459a03a465aabff97c56c885 yt/utilities/parallel_tools/controller_system.py
--- /dev/null
+++ b/yt/utilities/parallel_tools/controller_system.py
@@ -0,0 +1,69 @@
+"""
+A queueing system based on MPI
+
+Author: Matthew Turk <matthewturk at gmail.com>
+Affiliation: Columbia
+Homepage: http://yt-project.org/
+License:
+ Copyright (C) 2012 Matthew Turk. All Rights Reserved.
+
+ This file is part of yt.
+
+ yt is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+
+try:
+ from .parallel_analysis_interface import MPI
+except ImportError:
+ pass
+from contextmanager import contextlib
+from abc import ABCMeta, abstractmethod, abstractproperty
+
+class WorkSplitter(object):
+ def __init__(self, controller, group1, group2):
+ self.group1 = group1
+ self.group2 = group2
+ self.controller = controller
+
+ @classmethod
+ def setup(cls, ng1, ng2):
+ pp, wg = ProcessorPool.from_sizes(
+ [(1, "controller"), (ng1, "group1"), (ng2, "group2")])
+ groupc = pp['controller']
+ group1 = pp['group1']
+ group2 = pp['group2']
+ obj = cls(groupc, group1, group2)
+ obj.run(wg.name)
+
+ def run(self, name):
+ if name == "controller":
+ self.run_controller()
+ elif name == "group1":
+ self.run_group1()
+ elif name == "group2":
+ self.run_group2()
+ else:
+ raise NotImplementedError
+
+ @abstractmethod
+ def run_controller(self):
+ pass
+
+ @abstractmethod
+ def run_group1(self):
+ pass
+
+ @abstractmethod
+ def run_group2(self):
+ pass
diff -r a374af7dc9d3f5555530d1b73046adc30e3b8732 -r 2c7cce8d50ed4fb4459a03a465aabff97c56c885 yt/utilities/parallel_tools/io_runner.py
--- /dev/null
+++ b/yt/utilities/parallel_tools/io_runner.py
@@ -0,0 +1,195 @@
+"""
+A simple IO staging mechanism
+
+Author: Matthew Turk <matthewturk at gmail.com>
+Affiliation: Columbia
+Homepage: http://yt-project.org/
+License:
+ Copyright (C) 2012 Matthew Turk. All Rights Reserved.
+
+ This file is part of yt.
+
+ yt is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+
+import os
+from .parallel_analysis_interface import ProcessorPool
+from yt.utilities.io_handler import BaseIOHandler
+from contextlib import contextmanager
+import time
+
+try:
+ from .parallel_analysis_interface import MPI
+except ImportError:
+ pass
+
+YT_TAG_MESSAGE = 317 # Cell 317 knows where to go
+
+class IOCommunicator(BaseIOHandler):
+ def __init__(self, pf, wg, pool):
+ mylog.info("Initializing IOCommunicator")
+ self.pf = pf
+ self.wg = wg # We don't need to use this!
+ self.pool = pool
+ self.comm = pool.comm
+ # We read our grids here
+ self.grids = []
+ storage = {}
+ grids = pf.h.grids.tolist()
+ grids.sort(key=lambda a:a.filename)
+ for sto, g in parallel_objects(grids, storage = storage):
+ sto.result = self.comm.rank
+ sto.result_id = g.id
+ self.grids.append(g)
+ self._id_offset = pf.h.grids[0]._id_offset
+ mylog.info("Reading from disk ...")
+ self.initialize_data()
+ mylog.info("Broadcasting ...")
+ self.comm.comm.bcast(storage, root = wg.ranks[0])
+ mylog.info("Done.")
+ self.hooks = []
+
+ def initialize_data(self):
+ pf = self.pf
+ fields = [f for f in pf.h.field_list
+ if not pf.field_info[f].particle_type]
+ pfields = [f for f in pf.h.field_list
+ if pf.field_info[f].particle_type]
+ # Preload is only defined for Enzo ...
+ if pf.h.io._data_style == "enzo_packed_3d":
+ self.queue = pf.h.io.queue
+ pf.h.io.preload(self.grids, fields)
+ for g in self.grids:
+ for f in fields:
+ if f not in self.queue[g.id]:
+ d = na.zeros(g.ActiveDimensions, dtype='float64')
+ self.queue[g.id][f] = d
+ for f in pfields:
+ self.queue[g.id][f] = self._read(g, f)
+ else:
+ self.queue = {}
+ for g in self.grids:
+ for f in fields + pfields:
+ self.queue[g.id][f] = pf.h.io._read(g, f)
+
+ def _read(self, g, f):
+ fi = self.pf.field_info[f]
+ if fi.particle_type and g.NumberOfParticles == 0:
+ # because this gets upcast to float
+ return na.array([],dtype='float64')
+ try:
+ temp = self.pf.h.io._read_data_set(g, f)
+ except:# self.pf.hierarchy.io._read_exception as exc:
+ if fi.not_in_all:
+ temp = na.zeros(g.ActiveDimensions, dtype='float64')
+ else:
+ raise
+ return temp
+
+ def wait(self):
+ status = MPI.Status()
+ while 1:
+ if self.comm.comm.Iprobe(MPI.ANY_SOURCE,
+ YT_TAG_MESSAGE,
+ status = status):
+ msg = self.comm.comm.recv(
+ source = status.source, tag = YT_TAG_MESSAGE)
+ if msg['op'] == "end":
+ mylog.debug("Shutting down IO.")
+ break
+ self._send_data(msg, status.source)
+ status = MPI.Status()
+ else:
+ time.sleep(1e-2)
+
+ def _send_data(self, msg, dest):
+ grid_id = msg['grid_id']
+ field = msg['field']
+ ts = self.queue[grid_id][field].astype("float64")
+ mylog.debug("Opening send to %s (%s)", dest, ts.shape)
+ self.hooks.append(self.comm.comm.Isend([ts, MPI.DOUBLE], dest = dest))
+
+class IOHandlerRemote(BaseIOHandler):
+ _data_style = "remote"
+
+ def __init__(self, pf, wg, pool):
+ self.pf = pf
+ self.wg = wg # probably won't need
+ self.pool = pool
+ self.comm = pool.comm
+ self.proc_map = self.comm.comm.bcast(None,
+ root = pool['io'].ranks[0])
+ super(IOHandlerRemote, self).__init__()
+
+ def _read_data_set(self, grid, field):
+ dest = self.proc_map[grid.id]
+ msg = dict(grid_id = grid.id, field = field, op="read")
+ mylog.debug("Requesting %s for %s from %s", field, grid, dest)
+ if self.pf.field_info[field].particle_type:
+ data = na.empty(grid.NumberOfParticles, 'float64')
+ else:
+ data = na.empty(grid.ActiveDimensions, 'float64')
+ hook = self.comm.comm.Irecv([data, MPI.DOUBLE], source = dest)
+ self.comm.comm.send(msg, dest = dest, tag = YT_TAG_MESSAGE)
+ mylog.debug("Waiting for data.")
+ MPI.Request.Wait(hook)
+ return data
+
+ def _read_data_slice(self, grid, field, axis, coord):
+ sl = [slice(None), slice(None), slice(None)]
+ sl[axis] = slice(coord, coord + 1)
+ #sl = tuple(reversed(sl))
+ return self._read_data_set(grid,field)[sl]
+
+ def terminate(self):
+ msg = dict(op='end')
+ if self.wg.comm.rank == 0:
+ for rank in self.pool['io'].ranks:
+ mylog.debug("Sending termination message to %s", rank)
+ self.comm.comm.send(msg, dest=rank, tag=YT_TAG_MESSAGE)
+
+ at contextmanager
+def remote_io(pf, wg, pool):
+ original_io = pf.h.io
+ pf.h.io = IOHandlerRemote(pf, wg, pool)
+ yield
+ pf.h.io.terminate()
+ pf.h.io = original_io
+
+def io_nodes(fn, n_io, n_work, func, *args, **kwargs):
+ pool, wg = ProcessorPool.from_sizes([(n_io, "io"), (n_work, "work")])
+ rv = None
+ if wg.name == "work":
+ pf = load(fn)
+ with remote_io(pf, wg, pool):
+ rv = func(pf, *args, **kwargs)
+ elif wg.name == "io":
+ pf = load(fn)
+ io = IOCommunicator(pf, wg, pool)
+ io.wait()
+ # We should broadcast the result
+ rv = pool.comm.mpi_bcast(rv, root=pool['work'].ranks[0])
+ pool.free_all()
+ mylog.debug("Return value: %s", rv)
+ return rv
+
+# Here is an example of how to use this functionality.
+if __name__ == "__main__":
+ def gq(pf):
+ dd = pf.h.all_data()
+ return dd.quantities["TotalQuantity"]("CellMassMsun")
+ q = io_nodes("DD0087/DD0087", 8, 24, gq)
+ mylog.info(q)
+
+
diff -r a374af7dc9d3f5555530d1b73046adc30e3b8732 -r 2c7cce8d50ed4fb4459a03a465aabff97c56c885 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -271,7 +271,7 @@
self.size = size
self.ranks = ranks
self.comm = comm
- self.name = name
+ self.name = name
class ProcessorPool(object):
comm = None
@@ -294,11 +294,9 @@
raise RuntimeError
if ranks is None:
ranks = [self.available_ranks.pop(0) for i in range(size)]
-
- # Default name to the workgroup number.
+ # Default name to the workgroup number.
if name is None:
- name = string(len(workgroups))
-
+ name = string(len(workgroups))
group = self.comm.comm.Get_group().Incl(ranks)
new_comm = self.comm.comm.Create(group)
if self.comm.rank in ranks:
https://bitbucket.org/yt_analysis/yt/changeset/a24cadac71b4/
changeset: a24cadac71b4
branch: yt
user: jsoishi
date: 2012-08-23 20:09:32
summary: Merged in MatthewTurk/yt (pull request #248)
affected #: 4 files
diff -r 7a7c32a24138a2430741e8c39438856893d0fe3b -r a24cadac71b410cb4b9f356f763249fa3df9a4bb yt/gui/reason/html/app/controller/Notebook.js
--- a/yt/gui/reason/html/app/controller/Notebook.js
+++ b/yt/gui/reason/html/app/controller/Notebook.js
@@ -73,9 +73,11 @@
},
addRequest: function(request_id, command) {
+ /*console.log("Adding request " + request_id);*/
this.getRequestsStore().add({
request_id: request_id, command: command,
});
+ reason.pending.update([this.getRequestsStore().count()]);
},
addCell: function(cell) {
@@ -85,6 +87,7 @@
var ind = this.getRequestsStore().find(
'request_id', cell['result_id']);
if (ind != -1) {
+ /*console.log("Removing request " + cell['result_id']);*/
var rec = this.getRequestsStore().removeAt(ind);
}
reason.pending.update([this.getRequestsStore().count()]);
diff -r 7a7c32a24138a2430741e8c39438856893d0fe3b -r a24cadac71b410cb4b9f356f763249fa3df9a4bb yt/utilities/parallel_tools/controller_system.py
--- /dev/null
+++ b/yt/utilities/parallel_tools/controller_system.py
@@ -0,0 +1,69 @@
+"""
+A queueing system based on MPI
+
+Author: Matthew Turk <matthewturk at gmail.com>
+Affiliation: Columbia
+Homepage: http://yt-project.org/
+License:
+ Copyright (C) 2012 Matthew Turk. All Rights Reserved.
+
+ This file is part of yt.
+
+ yt is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+
+try:
+ from .parallel_analysis_interface import MPI
+except ImportError:
+ pass
+from contextmanager import contextlib
+from abc import ABCMeta, abstractmethod, abstractproperty
+
+class WorkSplitter(object):
+ def __init__(self, controller, group1, group2):
+ self.group1 = group1
+ self.group2 = group2
+ self.controller = controller
+
+ @classmethod
+ def setup(cls, ng1, ng2):
+ pp, wg = ProcessorPool.from_sizes(
+ [(1, "controller"), (ng1, "group1"), (ng2, "group2")])
+ groupc = pp['controller']
+ group1 = pp['group1']
+ group2 = pp['group2']
+ obj = cls(groupc, group1, group2)
+ obj.run(wg.name)
+
+ def run(self, name):
+ if name == "controller":
+ self.run_controller()
+ elif name == "group1":
+ self.run_group1()
+ elif name == "group2":
+ self.run_group2()
+ else:
+ raise NotImplementedError
+
+ @abstractmethod
+ def run_controller(self):
+ pass
+
+ @abstractmethod
+ def run_group1(self):
+ pass
+
+ @abstractmethod
+ def run_group2(self):
+ pass
diff -r 7a7c32a24138a2430741e8c39438856893d0fe3b -r a24cadac71b410cb4b9f356f763249fa3df9a4bb yt/utilities/parallel_tools/io_runner.py
--- /dev/null
+++ b/yt/utilities/parallel_tools/io_runner.py
@@ -0,0 +1,195 @@
+"""
+A simple IO staging mechanism
+
+Author: Matthew Turk <matthewturk at gmail.com>
+Affiliation: Columbia
+Homepage: http://yt-project.org/
+License:
+ Copyright (C) 2012 Matthew Turk. All Rights Reserved.
+
+ This file is part of yt.
+
+ yt is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+
+import os
+from .parallel_analysis_interface import ProcessorPool
+from yt.utilities.io_handler import BaseIOHandler
+from contextlib import contextmanager
+import time
+
+try:
+ from .parallel_analysis_interface import MPI
+except ImportError:
+ pass
+
+YT_TAG_MESSAGE = 317 # Cell 317 knows where to go
+
+class IOCommunicator(BaseIOHandler):
+ def __init__(self, pf, wg, pool):
+ mylog.info("Initializing IOCommunicator")
+ self.pf = pf
+ self.wg = wg # We don't need to use this!
+ self.pool = pool
+ self.comm = pool.comm
+ # We read our grids here
+ self.grids = []
+ storage = {}
+ grids = pf.h.grids.tolist()
+ grids.sort(key=lambda a:a.filename)
+ for sto, g in parallel_objects(grids, storage = storage):
+ sto.result = self.comm.rank
+ sto.result_id = g.id
+ self.grids.append(g)
+ self._id_offset = pf.h.grids[0]._id_offset
+ mylog.info("Reading from disk ...")
+ self.initialize_data()
+ mylog.info("Broadcasting ...")
+ self.comm.comm.bcast(storage, root = wg.ranks[0])
+ mylog.info("Done.")
+ self.hooks = []
+
+ def initialize_data(self):
+ pf = self.pf
+ fields = [f for f in pf.h.field_list
+ if not pf.field_info[f].particle_type]
+ pfields = [f for f in pf.h.field_list
+ if pf.field_info[f].particle_type]
+ # Preload is only defined for Enzo ...
+ if pf.h.io._data_style == "enzo_packed_3d":
+ self.queue = pf.h.io.queue
+ pf.h.io.preload(self.grids, fields)
+ for g in self.grids:
+ for f in fields:
+ if f not in self.queue[g.id]:
+ d = na.zeros(g.ActiveDimensions, dtype='float64')
+ self.queue[g.id][f] = d
+ for f in pfields:
+ self.queue[g.id][f] = self._read(g, f)
+ else:
+ self.queue = {}
+ for g in self.grids:
+ for f in fields + pfields:
+ self.queue[g.id][f] = pf.h.io._read(g, f)
+
+ def _read(self, g, f):
+ fi = self.pf.field_info[f]
+ if fi.particle_type and g.NumberOfParticles == 0:
+ # because this gets upcast to float
+ return na.array([],dtype='float64')
+ try:
+ temp = self.pf.h.io._read_data_set(g, f)
+ except:# self.pf.hierarchy.io._read_exception as exc:
+ if fi.not_in_all:
+ temp = na.zeros(g.ActiveDimensions, dtype='float64')
+ else:
+ raise
+ return temp
+
+ def wait(self):
+ status = MPI.Status()
+ while 1:
+ if self.comm.comm.Iprobe(MPI.ANY_SOURCE,
+ YT_TAG_MESSAGE,
+ status = status):
+ msg = self.comm.comm.recv(
+ source = status.source, tag = YT_TAG_MESSAGE)
+ if msg['op'] == "end":
+ mylog.debug("Shutting down IO.")
+ break
+ self._send_data(msg, status.source)
+ status = MPI.Status()
+ else:
+ time.sleep(1e-2)
+
+ def _send_data(self, msg, dest):
+ grid_id = msg['grid_id']
+ field = msg['field']
+ ts = self.queue[grid_id][field].astype("float64")
+ mylog.debug("Opening send to %s (%s)", dest, ts.shape)
+ self.hooks.append(self.comm.comm.Isend([ts, MPI.DOUBLE], dest = dest))
+
+class IOHandlerRemote(BaseIOHandler):
+ _data_style = "remote"
+
+ def __init__(self, pf, wg, pool):
+ self.pf = pf
+ self.wg = wg # probably won't need
+ self.pool = pool
+ self.comm = pool.comm
+ self.proc_map = self.comm.comm.bcast(None,
+ root = pool['io'].ranks[0])
+ super(IOHandlerRemote, self).__init__()
+
+ def _read_data_set(self, grid, field):
+ dest = self.proc_map[grid.id]
+ msg = dict(grid_id = grid.id, field = field, op="read")
+ mylog.debug("Requesting %s for %s from %s", field, grid, dest)
+ if self.pf.field_info[field].particle_type:
+ data = na.empty(grid.NumberOfParticles, 'float64')
+ else:
+ data = na.empty(grid.ActiveDimensions, 'float64')
+ hook = self.comm.comm.Irecv([data, MPI.DOUBLE], source = dest)
+ self.comm.comm.send(msg, dest = dest, tag = YT_TAG_MESSAGE)
+ mylog.debug("Waiting for data.")
+ MPI.Request.Wait(hook)
+ return data
+
+ def _read_data_slice(self, grid, field, axis, coord):
+ sl = [slice(None), slice(None), slice(None)]
+ sl[axis] = slice(coord, coord + 1)
+ #sl = tuple(reversed(sl))
+ return self._read_data_set(grid,field)[sl]
+
+ def terminate(self):
+ msg = dict(op='end')
+ if self.wg.comm.rank == 0:
+ for rank in self.pool['io'].ranks:
+ mylog.debug("Sending termination message to %s", rank)
+ self.comm.comm.send(msg, dest=rank, tag=YT_TAG_MESSAGE)
+
+ at contextmanager
+def remote_io(pf, wg, pool):
+ original_io = pf.h.io
+ pf.h.io = IOHandlerRemote(pf, wg, pool)
+ yield
+ pf.h.io.terminate()
+ pf.h.io = original_io
+
+def io_nodes(fn, n_io, n_work, func, *args, **kwargs):
+ pool, wg = ProcessorPool.from_sizes([(n_io, "io"), (n_work, "work")])
+ rv = None
+ if wg.name == "work":
+ pf = load(fn)
+ with remote_io(pf, wg, pool):
+ rv = func(pf, *args, **kwargs)
+ elif wg.name == "io":
+ pf = load(fn)
+ io = IOCommunicator(pf, wg, pool)
+ io.wait()
+ # We should broadcast the result
+ rv = pool.comm.mpi_bcast(rv, root=pool['work'].ranks[0])
+ pool.free_all()
+ mylog.debug("Return value: %s", rv)
+ return rv
+
+# Here is an example of how to use this functionality.
+if __name__ == "__main__":
+ def gq(pf):
+ dd = pf.h.all_data()
+ return dd.quantities["TotalQuantity"]("CellMassMsun")
+ q = io_nodes("DD0087/DD0087", 8, 24, gq)
+ mylog.info(q)
+
+
diff -r 7a7c32a24138a2430741e8c39438856893d0fe3b -r a24cadac71b410cb4b9f356f763249fa3df9a4bb yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -271,7 +271,7 @@
self.size = size
self.ranks = ranks
self.comm = comm
- self.name = name
+ self.name = name
class ProcessorPool(object):
comm = None
@@ -294,11 +294,9 @@
raise RuntimeError
if ranks is None:
ranks = [self.available_ranks.pop(0) for i in range(size)]
-
- # Default name to the workgroup number.
+ # Default name to the workgroup number.
if name is None:
- name = string(len(workgroups))
-
+ name = string(len(workgroups))
group = self.comm.comm.Get_group().Incl(ranks)
new_comm = self.comm.comm.Create(group)
if self.comm.rank in ranks:
Repository URL: https://bitbucket.org/yt_analysis/yt/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
More information about the yt-svn
mailing list