[Yt-svn] commit/yt: MatthewTurk: Adding routines necessary to merge quadtrees in a pairwise, numpy-buffer

Bitbucket commits-noreply at bitbucket.org
Thu Jun 2 08:16:54 PDT 2011


1 new changeset in yt:

http://bitbucket.org/yt_analysis/yt/changeset/1236616b460a/
changeset:   1236616b460a
branches:    
user:        MatthewTurk
date:        2011-06-02 17:13:59
summary:     Adding routines necessary to merge quadtrees in a pairwise, numpy-buffer
manner.  QuadProj will give incorrect results in parallel as of right now; it
will be rewritten for better load balancing shortly.
affected #:  2 files (2.4 KB)

--- a/yt/utilities/_amr_utils/QuadTree.pyx	Wed Jun 01 19:50:16 2011 -0400
+++ b/yt/utilities/_amr_utils/QuadTree.pyx	Thu Jun 02 11:13:59 2011 -0400
@@ -219,6 +219,9 @@
                                  refined, values, wval)
         return (refined, values, wval)
 
+    def get_args(self):
+        return (self.top_grid_dims[0], self.top_grid_dims[1], self.nvals)
+
     cdef void add_to_position(self,
                  int level, np.int64_t pos[2],
                  np.float64_t *val,


--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py	Wed Jun 01 19:50:16 2011 -0400
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py	Thu Jun 02 11:13:59 2011 -0400
@@ -36,6 +36,8 @@
 from yt.utilities.definitions import \
     x_dict, y_dict
 import yt.utilities.logger
+from yt.utilities.amr_utils import \
+    QuadTree, merge_quadtrees
 
 exe_name = os.path.basename(sys.executable)
 # At import time, we determined whether or not we're being run in parallel.
@@ -1253,6 +1255,61 @@
         if not obj._distributed: return True
         return (obj._owner == MPI.COMM_WORLD.rank)
 
+    def merge_quadtree_buffers(self, qt):
+        # This is a modified version of pairwise reduction from Lisandro Dalcin,
+        # in the reductions demo of mpi4py
+        size = MPI.COMM_WORLD.size
+        rank = MPI.COMM_WORLD.rank
+
+        mask = 1
+
+        args = qt.get_args() # Will always be the same
+        tgd = na.array([args[0], args[1]], dtype='int64')
+        sizebuf = na.zeros(1, 'int64')
+
+        while mask < size:
+            if (mask & rank) != 0:
+                buf = qt.tobuffer()
+                sizebuf[0] = buf[0].size
+                target = (rank & ~mask) % size
+                MPI.COMM_WORLD.Send([sizebuf, MPI.LONG], dest=target)
+                MPI.COMM_WORLD.Send([buf[0], MPI.INT], dest=target)
+                MPI.COMM_WORLD.Send([buf[1], MPI.DOUBLE], dest=target)
+                MPI.COMM_WORLD.Send([buf[2], MPI.DOUBLE], dest=target)
+            else:
+                target = (rank | mask)
+                if target < size:
+                    MPI.COMM_WORLD.Recv(sizebuf, source=target)
+                    buf = [na.empty((sizebuf[0],), 'int32'),
+                           na.empty((sizebuf[0], args[2]),'float64'),
+                           na.empty((sizebuf[0],),'float64')]
+                    MPI.COMM_WORLD.Recv([buf[0], MPI.INT], source=target)
+                    MPI.COMM_WORLD.Recv([buf[1], MPI.DOUBLE], source=target)
+                    MPI.COMM_WORLD.Recv([buf[2], MPI.DOUBLE], source=target)
+                    qto = QuadTree(tgd, args[2])
+                    qto.frombuffer(*buf)
+                    del buf
+                    merge_quadtrees(qt, qto)
+                    del qto
+            mask <<= 1
+
+        if rank == 0:
+            buf = qt.tobuffer()
+            sizebuf[0] = buf[0].size
+        MPI.COMM_WORLD.Bcast([sizebuf, MPI.LONG], root=0)
+        if rank != 0:
+            buf = [na.empty((sizebuf[0],), 'int32'),
+                   na.empty((sizebuf[0], args[2]),'float64'),
+                   na.empty((sizebuf[0],),'float64')]
+        MPI.COMM_WORLD.Bcast([buf[0], MPI.INT], root=0)
+        MPI.COMM_WORLD.Bcast([buf[1], MPI.DOUBLE], root=0)
+        MPI.COMM_WORLD.Bcast([buf[2], MPI.DOUBLE], root=0)
+        self.refined = buf[0]
+        if rank != 0:
+            qt = QuadTree(tgd, args[2])
+            qt.frombuffer(*buf)
+        return qt
+
 __tocast = 'c'
 
 def _send_array(arr, dest, tag = 0):

Repository URL: https://bitbucket.org/yt_analysis/yt/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.



More information about the yt-svn mailing list