[yt-svn] commit/yt-3.0: 9 new changesets
commits-noreply at bitbucket.org
commits-noreply at bitbucket.org
Sat Jun 29 10:08:59 PDT 2013
9 new commits in yt-3.0:
https://bitbucket.org/yt_analysis/yt-3.0/commits/1547cd15d3f4/
Changeset: 1547cd15d3f4
Branch: yt-3.0
User: MatthewTurk
Date: 2013-06-11 19:56:27
Summary: Adding first pass at parallel_ring iterator.
Affected #: 1 file
diff -r 00a361746464f7adf59927eb04fbbc49bb145d45 -r 1547cd15d3f47456071fcd44d62b61adec86338a yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -458,6 +458,44 @@
if barrier:
my_communicator.barrier()
+def parallel_ring(objects, generator_func, mutable = False):
+ # Mutable governs whether or not we will be modifying the arrays, or
+ # whether they are identical to generator_func(obj)
+ if mutable: raise NotImplementedError
+ my_comm = communication_system.communicators[-1]
+ my_size = my_comm.size
+ my_rank = my_comm.rank # This will also be the first object we access
+ if not parallel_capable and not mutable:
+ for obj in objects:
+ yield generator_func(obj)
+ return
+ if len(objects) != my_size or mutable:
+ raise NotImplementedError
+ generate_endpoints = len(objects) > my_size
+ # Now we need to do pairwise sends
+ source = (my_rank - 1) % my_size
+ dest = (my_rank + 1) % my_size
+ oiter = itertools.islice(itertools.cycle(objects),
+ my_rank, my_rank+len(objects))
+ idata = None
+ isize = np.zeros((1,), dtype="int64")
+ osize = np.zeros((1,), dtype="int64")
+ for obj in oiter:
+ if generate_endpoints and my_rank in (0, my_size) or idata is None:
+ idata = generator_func(obj)
+ yield obj, idata
+ # We first send to the previous processor
+ osize[0] = idata.size
+ t1 = my_comm.mpi_nonblocking_recv(isize, source)
+ t2 = my_comm.mpi_nonblocking_send(osize, dest)
+ my_comm.mpi_Request_Waitall([t1, t2])
+ odata = idata
+ idata = np.empty(isize[0], dtype=odata.dtype)
+ t3 = my_comm.mpi_nonblocking_send(odata, dest, dtype=odata.dtype)
+ t4 = my_comm.mpi_nonblocking_recv(idata, source, dtype=odata.dtype)
+ my_comm.mpi_Request_Waitall([t3, t4])
+ del odata
+
class CommunicationSystem(object):
communicators = []
https://bitbucket.org/yt_analysis/yt-3.0/commits/29f50d773524/
Changeset: 29f50d773524
Branch: yt-3.0
User: MatthewTurk
Date: 2013-06-11 20:08:45
Summary: Enable parallel_ring to use non-standard dtypes.
Affected #: 1 file
diff -r 1547cd15d3f47456071fcd44d62b61adec86338a -r 29f50d773524887654280e1b0f6cfa572bd71c35 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -60,7 +60,8 @@
float32 = MPI.FLOAT,
float64 = MPI.DOUBLE,
int32 = MPI.INT,
- int64 = MPI.LONG
+ int64 = MPI.LONG,
+ c = MPI.CHAR,
)
op_names = dict(
sum = MPI.SUM,
@@ -73,7 +74,8 @@
float32 = "MPI.FLOAT",
float64 = "MPI.DOUBLE",
int32 = "MPI.INT",
- int64 = "MPI.LONG"
+ int64 = "MPI.LONG",
+ c = "MPI.CHAR",
)
op_names = dict(
sum = "MPI.SUM",
@@ -483,6 +485,10 @@
for obj in oiter:
if generate_endpoints and my_rank in (0, my_size) or idata is None:
idata = generator_func(obj)
+ idtype = odtype = get_mpi_type(idata.dtype)
+ if idtype is None:
+ idtype = 'c'
+ odtype = idata.dtype
yield obj, idata
# We first send to the previous processor
osize[0] = idata.size
@@ -490,9 +496,9 @@
t2 = my_comm.mpi_nonblocking_send(osize, dest)
my_comm.mpi_Request_Waitall([t1, t2])
odata = idata
- idata = np.empty(isize[0], dtype=odata.dtype)
- t3 = my_comm.mpi_nonblocking_send(odata, dest, dtype=odata.dtype)
- t4 = my_comm.mpi_nonblocking_recv(idata, source, dtype=odata.dtype)
+ idata = np.empty(isize[0], dtype=odtype)
+ t3 = my_comm.mpi_nonblocking_send(odata.view(idtype), dest, dtype=idtype)
+ t4 = my_comm.mpi_nonblocking_recv(idata.view(idtype), source, dtype=idtype)
my_comm.mpi_Request_Waitall([t3, t4])
del odata
https://bitbucket.org/yt_analysis/yt-3.0/commits/5c72cf16a741/
Changeset: 5c72cf16a741
Branch: yt-3.0
User: MatthewTurk
Date: 2013-06-11 20:14:22
Summary: Few more dtype check fixes.
Affected #: 1 file
diff -r 29f50d773524887654280e1b0f6cfa572bd71c35 -r 5c72cf16a741c271f1bc357390d227d573dc5e0e yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -485,10 +485,9 @@
for obj in oiter:
if generate_endpoints and my_rank in (0, my_size) or idata is None:
idata = generator_func(obj)
- idtype = odtype = get_mpi_type(idata.dtype)
- if idtype is None:
+ idtype = odtype = idata.dtype
+ if get_mpi_type(idtype) is None:
idtype = 'c'
- odtype = idata.dtype
yield obj, idata
# We first send to the previous processor
osize[0] = idata.size
https://bitbucket.org/yt_analysis/yt-3.0/commits/9e49ef42526e/
Changeset: 9e49ef42526e
Branch: yt-3.0
User: MatthewTurk
Date: 2013-06-11 20:36:58
Summary: Implementing non-mutable storage for nobjects != my_rank.
Affected #: 1 file
diff -r 5c72cf16a741c271f1bc357390d227d573dc5e0e -r 9e49ef42526eac7ee233c6161996328219a9f4a9 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -471,9 +471,11 @@
for obj in objects:
yield generator_func(obj)
return
- if len(objects) != my_size or mutable:
+ generate_endpoints = len(objects) != my_size
+ if generate_endpoints and mutable:
raise NotImplementedError
- generate_endpoints = len(objects) > my_size
+ gforw = generate_endpoints and my_rank == 0
+ gback = generate_endpoints and my_rank == my_size - 1
# Now we need to do pairwise sends
source = (my_rank - 1) % my_size
dest = (my_rank + 1) % my_size
@@ -483,22 +485,30 @@
isize = np.zeros((1,), dtype="int64")
osize = np.zeros((1,), dtype="int64")
for obj in oiter:
- if generate_endpoints and my_rank in (0, my_size) or idata is None:
+ if idata is None or gforw:
idata = generator_func(obj)
idtype = odtype = idata.dtype
if get_mpi_type(idtype) is None:
idtype = 'c'
yield obj, idata
# We first send to the previous processor
- osize[0] = idata.size
- t1 = my_comm.mpi_nonblocking_recv(isize, source)
- t2 = my_comm.mpi_nonblocking_send(osize, dest)
- my_comm.mpi_Request_Waitall([t1, t2])
+ tags = []
+ if not gforw:
+ tags.append(my_comm.mpi_nonblocking_recv(isize, source))
+ if not gback:
+ osize[0] = idata.size
+ tags.append(my_comm.mpi_nonblocking_send(osize, dest))
+ my_comm.mpi_Request_Waitall(tags)
odata = idata
- idata = np.empty(isize[0], dtype=odtype)
- t3 = my_comm.mpi_nonblocking_send(odata.view(idtype), dest, dtype=idtype)
- t4 = my_comm.mpi_nonblocking_recv(idata.view(idtype), source, dtype=idtype)
- my_comm.mpi_Request_Waitall([t3, t4])
+ tags = []
+ if not gforw:
+ idata = np.empty(isize[0], dtype=odtype)
+ tags.append(my_comm.mpi_nonblocking_recv(
+ idata.view(idtype), source, dtype=idtype))
+ if not gback:
+ tags.append(my_comm.mpi_nonblocking_send(
+ odata.view(idtype), dest, dtype=idtype))
+ my_comm.mpi_Request_Waitall(tags)
del odata
class CommunicationSystem(object):
https://bitbucket.org/yt_analysis/yt-3.0/commits/b8b862125837/
Changeset: b8b862125837
Branch: yt-3.0
User: MatthewTurk
Date: 2013-06-11 20:43:18
Summary: Adding a docstring to parallel_ring and fixing a bug in non parallel calls.
Affected #: 1 file
diff -r 9e49ef42526eac7ee233c6161996328219a9f4a9 -r b8b8621258376d8a8e694fb58358f5d381639382 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -461,15 +461,65 @@
my_communicator.barrier()
def parallel_ring(objects, generator_func, mutable = False):
- # Mutable governs whether or not we will be modifying the arrays, or
- # whether they are identical to generator_func(obj)
+ r"""This function loops in a ring around a set of objects, yielding the
+ results of generator_func and passing from one processor to another to
+ avoid IO or expensive computation.
+
+ This function is designed to operate in sequence on a set of objects, where
+ the creation of those objects might be expensive. For instance, this could
+ be a set of particles that are costly to read from disk. Processor N will
+ run generator_func on an object, and the results of that will both be
+ yielded and passed to processor N-1. If the length of the objects is not
+ equal to the number of processors, then the final processor in the top
+ communicator will re-generate the data as needed.
+
+ In all likelihood, this function will only be useful internally to yt.
+
+ Parameters
+ ----------
+ objects : iterable
+ The list of objects to operate on.
+ generator_func : callable
+ This function will be called on each object, and the results yielded.
+ It must return a single NumPy array; for multiple values, it needs to
+ have a custom dtype.
+ mutable : bool
+ Should the arrays be considered mutable? Currently, this will only
+ work if the number of processors equals the number of objects.
+ dynamic : bool
+ This governs whether or not dynamic load balancing will be enabled.
+ This requires one dedicated processor; if this is enabled with a set of
+ 128 processors available, only 127 will be available to iterate over
+ objects as one will be load balancing the rest.
+
+
+ Examples
+ --------
+ Here is a simple example of a ring loop around a set of integers, with a
+ custom dtype.
+
+ >>> dt = numpy.dtype([('x', 'float64'), ('y', 'float64'), ('z', 'float64')])
+ >>> def gfunc(o):
+ ... numpy.random.seed(o)
+ ... rv = np.empty(1000, dtype=dt)
+ ... rv['x'] = numpy.random.random(1000)
+ ... rv['y'] = numpy.random.random(1000)
+ ... rv['z'] = numpy.random.random(1000)
+ ... return rv
+ ...
+ >>> obj = range(8)
+ >>> for obj, arr in parallel_ring(obj, gfunc):
+ ... print arr['x'].sum(), arr['y'].sum(), arr['z'].sum()
+ ...
+
+ """
if mutable: raise NotImplementedError
my_comm = communication_system.communicators[-1]
my_size = my_comm.size
my_rank = my_comm.rank # This will also be the first object we access
if not parallel_capable and not mutable:
for obj in objects:
- yield generator_func(obj)
+ yield obj, generator_func(obj)
return
generate_endpoints = len(objects) != my_size
if generate_endpoints and mutable:
https://bitbucket.org/yt_analysis/yt-3.0/commits/cf4f6706a4d6/
Changeset: cf4f6706a4d6
Branch: yt-3.0
User: MatthewTurk
Date: 2013-06-29 17:11:30
Summary: Switching which is generated forward and which is generated backward.
I think this addresses Sam's concern -- thanks, Sam!
Affected #: 1 file
diff -r b8b8621258376d8a8e694fb58358f5d381639382 -r cf4f6706a4d6b85e43b3c746c54c078f36b78098 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -524,8 +524,8 @@
generate_endpoints = len(objects) != my_size
if generate_endpoints and mutable:
raise NotImplementedError
- gforw = generate_endpoints and my_rank == 0
- gback = generate_endpoints and my_rank == my_size - 1
+ gforw = generate_endpoints and my_rank == my_size - 1
+ gback = generate_endpoints and my_rank == 0
# Now we need to do pairwise sends
source = (my_rank - 1) % my_size
dest = (my_rank + 1) % my_size
https://bitbucket.org/yt_analysis/yt-3.0/commits/dc6171491b34/
Changeset: dc6171491b34
Branch: yt-3.0
User: MatthewTurk
Date: 2013-06-29 17:53:19
Summary: Intermediate commit. Things work except for len(objects) > my_size.
Affected #: 1 file
diff -r cf4f6706a4d6b85e43b3c746c54c078f36b78098 -r dc6171491b3433f939bc16cb22ec59b56cddc1c2 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -522,10 +522,23 @@
yield obj, generator_func(obj)
return
generate_endpoints = len(objects) != my_size
+ # gforw means: should we expect one from forwards?
+ # gback means: do we send this object backwards?
+ if len(objects) > my_size:
+ # In this case, the first processor (my_rank == 0) will generate.
+ generate_endpoints = True
+ gback = (my_rank > 0)
+ gforw = (my_rank + 1 < my_size)
+ elif len(objects) > my_size:
+ generate_endpoints = True
+ gback = (my_rank > 0)
+ gforw = (my_rank + 1 < len(objects))
+ else: # Length of objects is equal to my_size
+ generate_endpoints = False
+ gback = True
+ gforw = True
if generate_endpoints and mutable:
raise NotImplementedError
- gforw = generate_endpoints and my_rank == my_size - 1
- gback = generate_endpoints and my_rank == 0
# Now we need to do pairwise sends
source = (my_rank - 1) % my_size
dest = (my_rank + 1) % my_size
https://bitbucket.org/yt_analysis/yt-3.0/commits/d850e79e9160/
Changeset: d850e79e9160
Branch: yt-3.0
User: MatthewTurk
Date: 2013-06-29 19:05:17
Summary: Had the forward/backward directions wrong.
Affected #: 1 file
diff -r dc6171491b3433f939bc16cb22ec59b56cddc1c2 -r d850e79e916033d8e3504ff9cf7085a5b5ad10a7 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -522,26 +522,22 @@
yield obj, generator_func(obj)
return
generate_endpoints = len(objects) != my_size
- # gforw means: should we expect one from forwards?
- # gback means: do we send this object backwards?
- if len(objects) > my_size:
+ # gback False: send the object backwards
+ # gforw False: receive an object from forwards
+ if len(objects) == my_size:
+ generate_endpoints = False
+ gback = False
+ gforw = False
+ else:
# In this case, the first processor (my_rank == 0) will generate.
generate_endpoints = True
- gback = (my_rank > 0)
- gforw = (my_rank + 1 < my_size)
- elif len(objects) > my_size:
- generate_endpoints = True
- gback = (my_rank > 0)
- gforw = (my_rank + 1 < len(objects))
- else: # Length of objects is equal to my_size
- generate_endpoints = False
- gback = True
- gforw = True
+ gback = (my_rank == 0)
+ gforw = (my_rank == my_size - 1)
if generate_endpoints and mutable:
raise NotImplementedError
# Now we need to do pairwise sends
- source = (my_rank - 1) % my_size
- dest = (my_rank + 1) % my_size
+ source = (my_rank + 1) % my_size
+ dest = (my_rank - 1) % my_size
oiter = itertools.islice(itertools.cycle(objects),
my_rank, my_rank+len(objects))
idata = None
https://bitbucket.org/yt_analysis/yt-3.0/commits/24915104ff16/
Changeset: 24915104ff16
Branch: yt-3.0
User: samskillman
Date: 2013-06-29 19:08:55
Summary: Merged in MatthewTurk/yt-3.0 (pull request #49)
Adding a parallel ring iterator
Affected #: 1 file
diff -r 1132bce09de4ea1912b0aadcec4b936b18dbe4a2 -r 24915104ff1604a95f1342f13c10f8fc1c1c0b07 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -60,7 +60,8 @@
float32 = MPI.FLOAT,
float64 = MPI.DOUBLE,
int32 = MPI.INT,
- int64 = MPI.LONG
+ int64 = MPI.LONG,
+ c = MPI.CHAR,
)
op_names = dict(
sum = MPI.SUM,
@@ -73,7 +74,8 @@
float32 = "MPI.FLOAT",
float64 = "MPI.DOUBLE",
int32 = "MPI.INT",
- int64 = "MPI.LONG"
+ int64 = "MPI.LONG",
+ c = "MPI.CHAR",
)
op_names = dict(
sum = "MPI.SUM",
@@ -458,6 +460,116 @@
if barrier:
my_communicator.barrier()
+def parallel_ring(objects, generator_func, mutable = False):
+ r"""This function loops in a ring around a set of objects, yielding the
+ results of generator_func and passing from one processor to another to
+ avoid IO or expensive computation.
+
+ This function is designed to operate in sequence on a set of objects, where
+ the creation of those objects might be expensive. For instance, this could
+ be a set of particles that are costly to read from disk. Processor N will
+ run generator_func on an object, and the results of that will both be
+ yielded and passed to processor N-1. If the length of the objects is not
+ equal to the number of processors, then the final processor in the top
+ communicator will re-generate the data as needed.
+
+ In all likelihood, this function will only be useful internally to yt.
+
+ Parameters
+ ----------
+ objects : iterable
+ The list of objects to operate on.
+ generator_func : callable
+ This function will be called on each object, and the results yielded.
+ It must return a single NumPy array; for multiple values, it needs to
+ have a custom dtype.
+ mutable : bool
+ Should the arrays be considered mutable? Currently, this will only
+ work if the number of processors equals the number of objects.
+ dynamic : bool
+ This governs whether or not dynamic load balancing will be enabled.
+ This requires one dedicated processor; if this is enabled with a set of
+ 128 processors available, only 127 will be available to iterate over
+ objects as one will be load balancing the rest.
+
+
+ Examples
+ --------
+ Here is a simple example of a ring loop around a set of integers, with a
+ custom dtype.
+
+ >>> dt = numpy.dtype([('x', 'float64'), ('y', 'float64'), ('z', 'float64')])
+ >>> def gfunc(o):
+ ... numpy.random.seed(o)
+ ... rv = np.empty(1000, dtype=dt)
+ ... rv['x'] = numpy.random.random(1000)
+ ... rv['y'] = numpy.random.random(1000)
+ ... rv['z'] = numpy.random.random(1000)
+ ... return rv
+ ...
+ >>> obj = range(8)
+ >>> for obj, arr in parallel_ring(obj, gfunc):
+ ... print arr['x'].sum(), arr['y'].sum(), arr['z'].sum()
+ ...
+
+ """
+ if mutable: raise NotImplementedError
+ my_comm = communication_system.communicators[-1]
+ my_size = my_comm.size
+ my_rank = my_comm.rank # This will also be the first object we access
+ if not parallel_capable and not mutable:
+ for obj in objects:
+ yield obj, generator_func(obj)
+ return
+ generate_endpoints = len(objects) != my_size
+ # gback False: send the object backwards
+ # gforw False: receive an object from forwards
+ if len(objects) == my_size:
+ generate_endpoints = False
+ gback = False
+ gforw = False
+ else:
+ # In this case, the first processor (my_rank == 0) will generate.
+ generate_endpoints = True
+ gback = (my_rank == 0)
+ gforw = (my_rank == my_size - 1)
+ if generate_endpoints and mutable:
+ raise NotImplementedError
+ # Now we need to do pairwise sends
+ source = (my_rank + 1) % my_size
+ dest = (my_rank - 1) % my_size
+ oiter = itertools.islice(itertools.cycle(objects),
+ my_rank, my_rank+len(objects))
+ idata = None
+ isize = np.zeros((1,), dtype="int64")
+ osize = np.zeros((1,), dtype="int64")
+ for obj in oiter:
+ if idata is None or gforw:
+ idata = generator_func(obj)
+ idtype = odtype = idata.dtype
+ if get_mpi_type(idtype) is None:
+ idtype = 'c'
+ yield obj, idata
+ # We first send to the previous processor
+ tags = []
+ if not gforw:
+ tags.append(my_comm.mpi_nonblocking_recv(isize, source))
+ if not gback:
+ osize[0] = idata.size
+ tags.append(my_comm.mpi_nonblocking_send(osize, dest))
+ my_comm.mpi_Request_Waitall(tags)
+ odata = idata
+ tags = []
+ if not gforw:
+ idata = np.empty(isize[0], dtype=odtype)
+ tags.append(my_comm.mpi_nonblocking_recv(
+ idata.view(idtype), source, dtype=idtype))
+ if not gback:
+ tags.append(my_comm.mpi_nonblocking_send(
+ odata.view(idtype), dest, dtype=idtype))
+ my_comm.mpi_Request_Waitall(tags)
+ del odata
+
class CommunicationSystem(object):
communicators = []
Repository URL: https://bitbucket.org/yt_analysis/yt-3.0/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
More information about the yt-svn
mailing list