[yt-svn] commit/yt-3.0: 9 new changesets

commits-noreply at bitbucket.org commits-noreply at bitbucket.org
Sat Jun 29 10:08:59 PDT 2013


9 new commits in yt-3.0:

https://bitbucket.org/yt_analysis/yt-3.0/commits/1547cd15d3f4/
Changeset:   1547cd15d3f4
Branch:      yt-3.0
User:        MatthewTurk
Date:        2013-06-11 19:56:27
Summary:     Adding first pass at parallel_ring iterator.
Affected #:  1 file

diff -r 00a361746464f7adf59927eb04fbbc49bb145d45 -r 1547cd15d3f47456071fcd44d62b61adec86338a yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -458,6 +458,44 @@
     if barrier:
         my_communicator.barrier()
 
+def parallel_ring(objects, generator_func, mutable = False):
+    # Mutable governs whether or not we will be modifying the arrays, or
+    # whether they are identical to generator_func(obj)
+    if mutable: raise NotImplementedError
+    my_comm = communication_system.communicators[-1]
+    my_size = my_comm.size
+    my_rank = my_comm.rank # This will also be the first object we access
+    if not parallel_capable and not mutable:
+        for obj in objects:
+            yield generator_func(obj)
+        return
+    if len(objects) != my_size or mutable:
+        raise NotImplementedError
+    generate_endpoints = len(objects) > my_size
+    # Now we need to do pairwise sends
+    source = (my_rank - 1) % my_size
+    dest = (my_rank + 1) % my_size
+    oiter = itertools.islice(itertools.cycle(objects),
+                             my_rank, my_rank+len(objects))
+    idata = None
+    isize = np.zeros((1,), dtype="int64")
+    osize = np.zeros((1,), dtype="int64")
+    for obj in oiter:
+        if generate_endpoints and my_rank in (0, my_size) or idata is None:
+            idata = generator_func(obj)
+        yield obj, idata
+        # We first send to the previous processor
+        osize[0] = idata.size
+        t1 = my_comm.mpi_nonblocking_recv(isize, source)
+        t2 = my_comm.mpi_nonblocking_send(osize, dest)
+        my_comm.mpi_Request_Waitall([t1, t2])
+        odata = idata
+        idata = np.empty(isize[0], dtype=odata.dtype)
+        t3 = my_comm.mpi_nonblocking_send(odata, dest, dtype=odata.dtype)
+        t4 = my_comm.mpi_nonblocking_recv(idata, source, dtype=odata.dtype)
+        my_comm.mpi_Request_Waitall([t3, t4])
+        del odata
+
 class CommunicationSystem(object):
     communicators = []
 


https://bitbucket.org/yt_analysis/yt-3.0/commits/29f50d773524/
Changeset:   29f50d773524
Branch:      yt-3.0
User:        MatthewTurk
Date:        2013-06-11 20:08:45
Summary:     Enable parallel_ring to use non-standard dtypes.
Affected #:  1 file

diff -r 1547cd15d3f47456071fcd44d62b61adec86338a -r 29f50d773524887654280e1b0f6cfa572bd71c35 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -60,7 +60,8 @@
             float32 = MPI.FLOAT,
             float64 = MPI.DOUBLE,
             int32   = MPI.INT,
-            int64   = MPI.LONG
+            int64   = MPI.LONG,
+            c       = MPI.CHAR,
     )
     op_names = dict(
         sum = MPI.SUM,
@@ -73,7 +74,8 @@
             float32 = "MPI.FLOAT",
             float64 = "MPI.DOUBLE",
             int32   = "MPI.INT",
-            int64   = "MPI.LONG"
+            int64   = "MPI.LONG",
+            c       = "MPI.CHAR",
     )
     op_names = dict(
             sum = "MPI.SUM",
@@ -483,6 +485,10 @@
     for obj in oiter:
         if generate_endpoints and my_rank in (0, my_size) or idata is None:
             idata = generator_func(obj)
+            idtype = odtype = get_mpi_type(idata.dtype)
+            if idtype is None:
+                idtype = 'c'
+                odtype = idata.dtype
         yield obj, idata
         # We first send to the previous processor
         osize[0] = idata.size
@@ -490,9 +496,9 @@
         t2 = my_comm.mpi_nonblocking_send(osize, dest)
         my_comm.mpi_Request_Waitall([t1, t2])
         odata = idata
-        idata = np.empty(isize[0], dtype=odata.dtype)
-        t3 = my_comm.mpi_nonblocking_send(odata, dest, dtype=odata.dtype)
-        t4 = my_comm.mpi_nonblocking_recv(idata, source, dtype=odata.dtype)
+        idata = np.empty(isize[0], dtype=odtype)
+        t3 = my_comm.mpi_nonblocking_send(odata.view(idtype), dest, dtype=idtype)
+        t4 = my_comm.mpi_nonblocking_recv(idata.view(idtype), source, dtype=idtype)
         my_comm.mpi_Request_Waitall([t3, t4])
         del odata
 


https://bitbucket.org/yt_analysis/yt-3.0/commits/5c72cf16a741/
Changeset:   5c72cf16a741
Branch:      yt-3.0
User:        MatthewTurk
Date:        2013-06-11 20:14:22
Summary:     Few more dtype check fixes.
Affected #:  1 file

diff -r 29f50d773524887654280e1b0f6cfa572bd71c35 -r 5c72cf16a741c271f1bc357390d227d573dc5e0e yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -485,10 +485,9 @@
     for obj in oiter:
         if generate_endpoints and my_rank in (0, my_size) or idata is None:
             idata = generator_func(obj)
-            idtype = odtype = get_mpi_type(idata.dtype)
-            if idtype is None:
+            idtype = odtype = idata.dtype
+            if get_mpi_type(idtype) is None:
                 idtype = 'c'
-                odtype = idata.dtype
         yield obj, idata
         # We first send to the previous processor
         osize[0] = idata.size


https://bitbucket.org/yt_analysis/yt-3.0/commits/9e49ef42526e/
Changeset:   9e49ef42526e
Branch:      yt-3.0
User:        MatthewTurk
Date:        2013-06-11 20:36:58
Summary:     Implementing non-mutable storage for nobjects != my_rank.
Affected #:  1 file

diff -r 5c72cf16a741c271f1bc357390d227d573dc5e0e -r 9e49ef42526eac7ee233c6161996328219a9f4a9 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -471,9 +471,11 @@
         for obj in objects:
             yield generator_func(obj)
         return
-    if len(objects) != my_size or mutable:
+    generate_endpoints = len(objects) != my_size
+    if generate_endpoints and mutable:
         raise NotImplementedError
-    generate_endpoints = len(objects) > my_size
+    gforw = generate_endpoints and my_rank == 0
+    gback = generate_endpoints and my_rank == my_size - 1
     # Now we need to do pairwise sends
     source = (my_rank - 1) % my_size
     dest = (my_rank + 1) % my_size
@@ -483,22 +485,30 @@
     isize = np.zeros((1,), dtype="int64")
     osize = np.zeros((1,), dtype="int64")
     for obj in oiter:
-        if generate_endpoints and my_rank in (0, my_size) or idata is None:
+        if idata is None or gforw:
             idata = generator_func(obj)
             idtype = odtype = idata.dtype
             if get_mpi_type(idtype) is None:
                 idtype = 'c'
         yield obj, idata
         # We first send to the previous processor
-        osize[0] = idata.size
-        t1 = my_comm.mpi_nonblocking_recv(isize, source)
-        t2 = my_comm.mpi_nonblocking_send(osize, dest)
-        my_comm.mpi_Request_Waitall([t1, t2])
+        tags = []
+        if not gforw:
+            tags.append(my_comm.mpi_nonblocking_recv(isize, source))
+        if not gback:
+            osize[0] = idata.size
+            tags.append(my_comm.mpi_nonblocking_send(osize, dest))
+        my_comm.mpi_Request_Waitall(tags)
         odata = idata
-        idata = np.empty(isize[0], dtype=odtype)
-        t3 = my_comm.mpi_nonblocking_send(odata.view(idtype), dest, dtype=idtype)
-        t4 = my_comm.mpi_nonblocking_recv(idata.view(idtype), source, dtype=idtype)
-        my_comm.mpi_Request_Waitall([t3, t4])
+        tags = []
+        if not gforw:
+            idata = np.empty(isize[0], dtype=odtype)
+            tags.append(my_comm.mpi_nonblocking_recv(
+                          idata.view(idtype), source, dtype=idtype))
+        if not gback:
+            tags.append(my_comm.mpi_nonblocking_send(
+                          odata.view(idtype), dest, dtype=idtype))
+        my_comm.mpi_Request_Waitall(tags)
         del odata
 
 class CommunicationSystem(object):


https://bitbucket.org/yt_analysis/yt-3.0/commits/b8b862125837/
Changeset:   b8b862125837
Branch:      yt-3.0
User:        MatthewTurk
Date:        2013-06-11 20:43:18
Summary:     Adding a docstring to parallel_ring and fixing a bug in non parallel calls.
Affected #:  1 file

diff -r 9e49ef42526eac7ee233c6161996328219a9f4a9 -r b8b8621258376d8a8e694fb58358f5d381639382 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -461,15 +461,65 @@
         my_communicator.barrier()
 
 def parallel_ring(objects, generator_func, mutable = False):
-    # Mutable governs whether or not we will be modifying the arrays, or
-    # whether they are identical to generator_func(obj)
+    r"""This function loops in a ring around a set of objects, yielding the
+    results of generator_func and passing from one processor to another to
+    avoid IO or expensive computation.
+
+    This function is designed to operate in sequence on a set of objects, where
+    the creation of those objects might be expensive.  For instance, this could
+    be a set of particles that are costly to read from disk.  Processor N will
+    run generator_func on an object, and the results of that will both be
+    yielded and passed to processor N-1.  If the length of the objects is not
+    equal to the number of processors, then the final processor in the top
+    communicator will re-generate the data as needed.
+
+    In all likelihood, this function will only be useful internally to yt.
+
+    Parameters
+    ----------
+    objects : iterable
+        The list of objects to operate on.
+    generator_func : callable
+        This function will be called on each object, and the results yielded.
+        It must return a single NumPy array; for multiple values, it needs to
+        have a custom dtype.
+    mutable : bool
+        Should the arrays be considered mutable?  Currently, this will only
+        work if the number of processors equals the number of objects.
+    dynamic : bool
+        This governs whether or not dynamic load balancing will be enabled.
+        This requires one dedicated processor; if this is enabled with a set of
+        128 processors available, only 127 will be available to iterate over
+        objects as one will be load balancing the rest.
+
+
+    Examples
+    --------
+    Here is a simple example of a ring loop around a set of integers, with a
+    custom dtype.
+
+    >>> dt = numpy.dtype([('x', 'float64'), ('y', 'float64'), ('z', 'float64')])
+    >>> def gfunc(o):
+    ...     numpy.random.seed(o)
+    ...     rv = np.empty(1000, dtype=dt)
+    ...     rv['x'] = numpy.random.random(1000)
+    ...     rv['y'] = numpy.random.random(1000)
+    ...     rv['z'] = numpy.random.random(1000)
+    ...     return rv
+    ...
+    >>> obj = range(8)
+    >>> for obj, arr in parallel_ring(obj, gfunc):
+    ...     print arr['x'].sum(), arr['y'].sum(), arr['z'].sum()
+    ...
+
+    """
     if mutable: raise NotImplementedError
     my_comm = communication_system.communicators[-1]
     my_size = my_comm.size
     my_rank = my_comm.rank # This will also be the first object we access
     if not parallel_capable and not mutable:
         for obj in objects:
-            yield generator_func(obj)
+            yield obj, generator_func(obj)
         return
     generate_endpoints = len(objects) != my_size
     if generate_endpoints and mutable:


https://bitbucket.org/yt_analysis/yt-3.0/commits/cf4f6706a4d6/
Changeset:   cf4f6706a4d6
Branch:      yt-3.0
User:        MatthewTurk
Date:        2013-06-29 17:11:30
Summary:     Switching which is generated forward and which is generated backward.

I think this addresses Sam's concern -- thanks, Sam!
Affected #:  1 file

diff -r b8b8621258376d8a8e694fb58358f5d381639382 -r cf4f6706a4d6b85e43b3c746c54c078f36b78098 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -524,8 +524,8 @@
     generate_endpoints = len(objects) != my_size
     if generate_endpoints and mutable:
         raise NotImplementedError
-    gforw = generate_endpoints and my_rank == 0
-    gback = generate_endpoints and my_rank == my_size - 1
+    gforw = generate_endpoints and my_rank == my_size - 1
+    gback = generate_endpoints and my_rank == 0
     # Now we need to do pairwise sends
     source = (my_rank - 1) % my_size
     dest = (my_rank + 1) % my_size


https://bitbucket.org/yt_analysis/yt-3.0/commits/dc6171491b34/
Changeset:   dc6171491b34
Branch:      yt-3.0
User:        MatthewTurk
Date:        2013-06-29 17:53:19
Summary:     Intermediate commit.  Things work except for len(objects) > my_size.
Affected #:  1 file

diff -r cf4f6706a4d6b85e43b3c746c54c078f36b78098 -r dc6171491b3433f939bc16cb22ec59b56cddc1c2 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -522,10 +522,23 @@
             yield obj, generator_func(obj)
         return
     generate_endpoints = len(objects) != my_size
+    # gforw means: should we expect one from forwards?
+    # gback means: do we send this object backwards?
+    if len(objects) > my_size:
+        # In this case, the first processor (my_rank == 0) will generate.
+        generate_endpoints = True
+        gback = (my_rank > 0)
+        gforw = (my_rank + 1 < my_size)
+    elif len(objects) > my_size:
+        generate_endpoints = True
+        gback = (my_rank > 0)
+        gforw = (my_rank + 1 < len(objects))
+    else: # Length of objects is equal to my_size
+        generate_endpoints = False
+        gback = True
+        gforw = True
     if generate_endpoints and mutable:
         raise NotImplementedError
-    gforw = generate_endpoints and my_rank == my_size - 1
-    gback = generate_endpoints and my_rank == 0
     # Now we need to do pairwise sends
     source = (my_rank - 1) % my_size
     dest = (my_rank + 1) % my_size


https://bitbucket.org/yt_analysis/yt-3.0/commits/d850e79e9160/
Changeset:   d850e79e9160
Branch:      yt-3.0
User:        MatthewTurk
Date:        2013-06-29 19:05:17
Summary:     Had the forward/backward directions wrong.
Affected #:  1 file

diff -r dc6171491b3433f939bc16cb22ec59b56cddc1c2 -r d850e79e916033d8e3504ff9cf7085a5b5ad10a7 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -522,26 +522,22 @@
             yield obj, generator_func(obj)
         return
     generate_endpoints = len(objects) != my_size
-    # gforw means: should we expect one from forwards?
-    # gback means: do we send this object backwards?
-    if len(objects) > my_size:
+    # gback False: send the object backwards
+    # gforw False: receive an object from forwards
+    if len(objects) == my_size:
+        generate_endpoints = False
+        gback = False
+        gforw = False
+    else:
         # In this case, the first processor (my_rank == 0) will generate.
         generate_endpoints = True
-        gback = (my_rank > 0)
-        gforw = (my_rank + 1 < my_size)
-    elif len(objects) > my_size:
-        generate_endpoints = True
-        gback = (my_rank > 0)
-        gforw = (my_rank + 1 < len(objects))
-    else: # Length of objects is equal to my_size
-        generate_endpoints = False
-        gback = True
-        gforw = True
+        gback = (my_rank == 0)
+        gforw = (my_rank == my_size - 1)
     if generate_endpoints and mutable:
         raise NotImplementedError
     # Now we need to do pairwise sends
-    source = (my_rank - 1) % my_size
-    dest = (my_rank + 1) % my_size
+    source = (my_rank + 1) % my_size
+    dest = (my_rank - 1) % my_size
     oiter = itertools.islice(itertools.cycle(objects),
                              my_rank, my_rank+len(objects))
     idata = None


https://bitbucket.org/yt_analysis/yt-3.0/commits/24915104ff16/
Changeset:   24915104ff16
Branch:      yt-3.0
User:        samskillman
Date:        2013-06-29 19:08:55
Summary:     Merged in MatthewTurk/yt-3.0 (pull request #49)

Adding a parallel ring iterator
Affected #:  1 file

diff -r 1132bce09de4ea1912b0aadcec4b936b18dbe4a2 -r 24915104ff1604a95f1342f13c10f8fc1c1c0b07 yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -60,7 +60,8 @@
             float32 = MPI.FLOAT,
             float64 = MPI.DOUBLE,
             int32   = MPI.INT,
-            int64   = MPI.LONG
+            int64   = MPI.LONG,
+            c       = MPI.CHAR,
     )
     op_names = dict(
         sum = MPI.SUM,
@@ -73,7 +74,8 @@
             float32 = "MPI.FLOAT",
             float64 = "MPI.DOUBLE",
             int32   = "MPI.INT",
-            int64   = "MPI.LONG"
+            int64   = "MPI.LONG",
+            c       = "MPI.CHAR",
     )
     op_names = dict(
             sum = "MPI.SUM",
@@ -458,6 +460,116 @@
     if barrier:
         my_communicator.barrier()
 
+def parallel_ring(objects, generator_func, mutable = False):
+    r"""This function loops in a ring around a set of objects, yielding the
+    results of generator_func and passing from one processor to another to
+    avoid IO or expensive computation.
+
+    This function is designed to operate in sequence on a set of objects, where
+    the creation of those objects might be expensive.  For instance, this could
+    be a set of particles that are costly to read from disk.  Processor N will
+    run generator_func on an object, and the results of that will both be
+    yielded and passed to processor N-1.  If the length of the objects is not
+    equal to the number of processors, then the final processor in the top
+    communicator will re-generate the data as needed.
+
+    In all likelihood, this function will only be useful internally to yt.
+
+    Parameters
+    ----------
+    objects : iterable
+        The list of objects to operate on.
+    generator_func : callable
+        This function will be called on each object, and the results yielded.
+        It must return a single NumPy array; for multiple values, it needs to
+        have a custom dtype.
+    mutable : bool
+        Should the arrays be considered mutable?  Currently, this will only
+        work if the number of processors equals the number of objects.
+    dynamic : bool
+        This governs whether or not dynamic load balancing will be enabled.
+        This requires one dedicated processor; if this is enabled with a set of
+        128 processors available, only 127 will be available to iterate over
+        objects as one will be load balancing the rest.
+
+
+    Examples
+    --------
+    Here is a simple example of a ring loop around a set of integers, with a
+    custom dtype.
+
+    >>> dt = numpy.dtype([('x', 'float64'), ('y', 'float64'), ('z', 'float64')])
+    >>> def gfunc(o):
+    ...     numpy.random.seed(o)
+    ...     rv = np.empty(1000, dtype=dt)
+    ...     rv['x'] = numpy.random.random(1000)
+    ...     rv['y'] = numpy.random.random(1000)
+    ...     rv['z'] = numpy.random.random(1000)
+    ...     return rv
+    ...
+    >>> obj = range(8)
+    >>> for obj, arr in parallel_ring(obj, gfunc):
+    ...     print arr['x'].sum(), arr['y'].sum(), arr['z'].sum()
+    ...
+
+    """
+    if mutable: raise NotImplementedError
+    my_comm = communication_system.communicators[-1]
+    my_size = my_comm.size
+    my_rank = my_comm.rank # This will also be the first object we access
+    if not parallel_capable and not mutable:
+        for obj in objects:
+            yield obj, generator_func(obj)
+        return
+    generate_endpoints = len(objects) != my_size
+    # gback False: send the object backwards
+    # gforw False: receive an object from forwards
+    if len(objects) == my_size:
+        generate_endpoints = False
+        gback = False
+        gforw = False
+    else:
+        # In this case, the first processor (my_rank == 0) will generate.
+        generate_endpoints = True
+        gback = (my_rank == 0)
+        gforw = (my_rank == my_size - 1)
+    if generate_endpoints and mutable:
+        raise NotImplementedError
+    # Now we need to do pairwise sends
+    source = (my_rank + 1) % my_size
+    dest = (my_rank - 1) % my_size
+    oiter = itertools.islice(itertools.cycle(objects),
+                             my_rank, my_rank+len(objects))
+    idata = None
+    isize = np.zeros((1,), dtype="int64")
+    osize = np.zeros((1,), dtype="int64")
+    for obj in oiter:
+        if idata is None or gforw:
+            idata = generator_func(obj)
+            idtype = odtype = idata.dtype
+            if get_mpi_type(idtype) is None:
+                idtype = 'c'
+        yield obj, idata
+        # We first send to the previous processor
+        tags = []
+        if not gforw:
+            tags.append(my_comm.mpi_nonblocking_recv(isize, source))
+        if not gback:
+            osize[0] = idata.size
+            tags.append(my_comm.mpi_nonblocking_send(osize, dest))
+        my_comm.mpi_Request_Waitall(tags)
+        odata = idata
+        tags = []
+        if not gforw:
+            idata = np.empty(isize[0], dtype=odtype)
+            tags.append(my_comm.mpi_nonblocking_recv(
+                          idata.view(idtype), source, dtype=idtype))
+        if not gback:
+            tags.append(my_comm.mpi_nonblocking_send(
+                          odata.view(idtype), dest, dtype=idtype))
+        my_comm.mpi_Request_Waitall(tags)
+        del odata
+
 class CommunicationSystem(object):
     communicators = []

Repository URL: https://bitbucket.org/yt_analysis/yt-3.0/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.



More information about the yt-svn mailing list