[Yt-svn] yt: A new way to load-balance the parallel HOP halo finder.

hg at spacepope.org hg at spacepope.org
Mon May 10 15:01:06 PDT 2010


hg Repository: yt
details:   yt/rev/7f17d891b9a5
changeset: 1648:7f17d891b9a5
user:      Stephen Skory <stephenskory at yahoo.com>
date:
Mon May 10 15:00:42 2010 -0700
description:
A new way to load-balance the parallel HOP halo finder.
A random subset of points are read in on all tasks,
and balanced on the root task, defining the full subvolumes.

A minor change to the merger tree, too.

diffstat:

 yt/extensions/MergerTree.py |    4 +
 yt/lagos/HaloFinding.py     |  127 ++++++++++++++++++++++++++++++-----------
 2 files changed, 96 insertions(+), 35 deletions(-)

diffs (176 lines):

diff -r 3a2127d8583c -r 7f17d891b9a5 yt/extensions/MergerTree.py
--- a/yt/extensions/MergerTree.py	Sat May 08 14:26:17 2010 -0600
+++ b/yt/extensions/MergerTree.py	Mon May 10 15:00:42 2010 -0700
@@ -589,6 +589,10 @@
         if self.mine == 0:
             to_write = []
             # Open the temporary database.
+            try:
+                os.remove(temp_name)
+            except OSError:
+                pass
             temp_conn = sql.connect(temp_name)
             temp_cursor = temp_conn.cursor()
             line = "CREATE TABLE Halos (GlobalHaloID INTEGER PRIMARY KEY,\
diff -r 3a2127d8583c -r 7f17d891b9a5 yt/lagos/HaloFinding.py
--- a/yt/lagos/HaloFinding.py	Sat May 08 14:26:17 2010 -0600
+++ b/yt/lagos/HaloFinding.py	Mon May 10 15:00:42 2010 -0700
@@ -41,7 +41,7 @@
 
 from kd import *
 from yt.funcs import *
-import math, sys, itertools, gc
+import math, sys, itertools, gc, random
 from collections import defaultdict
 
 TINY = 1.e-40
@@ -1141,12 +1141,14 @@
 
 class parallelHF(GenericHaloFinder, parallelHOPHaloList):
     def __init__(self, pf, threshold=160, dm_only=True, resize=True, rearrange=True,\
-        fancy_padding=True, safety=1.5, premerge=True):
+        fancy_padding=True, safety=1.5, premerge=True, sample=0.03):
         GenericHaloFinder.__init__(self, pf, dm_only, padding=0.0)
-        self.padding = 0.0 
+        self.padding = 0.0
         self.num_neighbors = 65
         self.safety = safety
+        self.sample = sample
         period = pf["DomainRightEdge"] - pf["DomainLeftEdge"]
+        topbounds = na.array([[0., 0., 0.], period])
         # Cut up the volume evenly initially, with no padding.
         padded, LE, RE, self._data_source = self._partition_hierarchy_3d(padding=self.padding)
         # also get the total mass of particles
@@ -1156,38 +1158,14 @@
         if ytcfg.getboolean("yt","inline") == False and \
            resize and self._mpi_get_size() != 1:
             cut_list = self._partition_hierarchy_3d_bisection_list()
-            for i,cut in enumerate(cut_list):
-                dim = cut[0]
-                if i == 0:
-                    width = pf["DomainRightEdge"][dim] - pf["DomainLeftEdge"][dim]
-                    new_LE = pf["DomainLeftEdge"]
-                else:
-                    new_LE, new_RE = new_top_bounds
-                    width = new_RE[dim] - new_LE[dim]
-                if ytcfg.getboolean("yt","inline") == False:
-                    data = self._data_source[ds_names[dim]]
-                else:
-                    data = self._data_source[ds_names[dim]]
-                if i == 0:
-                    local_parts = data.size
-                    n_parts = self._mpi_allsum(local_parts)
-                    min = self._mpi_allmin(local_parts)
-                    max = self._mpi_allmax(local_parts)
-                    mylog.info("Initial distribution: (min,max,tot) (%d,%d,%d)" \
-                        % (min, max, n_parts))
-                num_bins = 1000
-                bin_width = float(width)/float(num_bins)
-                bins = na.arange(num_bins+1, dtype='float64') * bin_width + new_LE[dim]
-                counts, bins = na.histogram(data, bins, new=True)
-                if i == 0:
-                    new_group, new_comm, LE, RE, new_top_bounds, new_cc, self._data_source= \
-                        self._partition_hierarchy_3d_bisection(dim, bins, counts, top_bounds = None,\
-                        old_group = None, old_comm = None, cut=cut)
-                else:
-                    new_group, new_comm, LE, RE, new_top_bounds, new_cc, self._data_source = \
-                        self._partition_hierarchy_3d_bisection(dim, bins, counts, top_bounds = new_top_bounds,\
-                        old_group = new_group, old_comm = new_comm, cut=cut, old_cc=new_cc)
-            del bins, counts
+            root_points = self._subsample_points()
+            self.bucket_bounds = []
+            if self._mpi_get_rank() == 0:
+                self._recursive_divide(root_points, topbounds, 0, cut_list)
+            self.bucket_bounds = self._mpi_bcast_pickled(self.bucket_bounds)
+            my_bounds = self.bucket_bounds[self._mpi_get_rank()]
+            LE, RE = my_bounds[0], my_bounds[1]
+            self._data_source = self.hierarchy.region_strict([0.]*3, LE, RE)
         # If this isn't parallel, define the region as an AMRRegionStrict so
         # particle IO works.
         if self._mpi_get_size() == 1:
@@ -1266,6 +1244,85 @@
         self._join_halolists()
         yt_counters("Final Grouping")
 
+    def _subsample_points(self):
+        # Read in a random subset of the points in each domain, and then
+        # collect them on the root task.
+        xp = self._data_source["particle_position_x"]
+        yp = self._data_source["particle_position_y"]
+        zp = self._data_source["particle_position_z"]
+        n_parts = self._mpi_allsum(xp.size)
+        local_parts = xp.size
+        random_points = int(self.sample * n_parts)
+        # We want to get a representative selection of random particles in
+        # each subvolume.
+        adjust = float(local_parts) / ( float(n_parts) / self._mpi_get_size())
+        n_random = int(adjust * float(random_points) / self._mpi_get_size())
+        mylog.info("Reading in %d random particles." % n_random)
+        # Get unique random particles.
+        my_points = na.empty((n_random, 3), dtype='float64')
+        uni = na.array(random.sample(xrange(xp.size), n_random))
+        uni = uni[uni.argsort()]
+        my_points[:,0] = xp[uni]
+        my_points[:,1] = yp[uni]
+        my_points[:,2] = zp[uni]
+        # Collect them on the root task.
+        mine, sizes = self._mpi_info_dict(n_random)
+        if mine == 0:
+            tot_random = sum(sizes.values())
+            root_points = na.empty((tot_random, 3), dtype='float64')
+            root_points.shape = (1, 3*tot_random)
+        else:
+            root_points = na.empty([])
+        my_points.shape = (1, n_random*3)
+        root_points = self._mpi_concatenate_array_on_root_double(my_points[0])
+        if mine == 0:
+            root_points.shape = (tot_random, 3)
+        return root_points
+
+    def _recursive_divide(self, points, bounds, level, cut_list):
+        dim = cut_list[level][0]
+        parts = points.shape[0]
+        num_bins = 1000
+        width = bounds[1][dim] - bounds[0][dim]
+        bin_width = width / num_bins
+        bins = na.arange(num_bins+1, dtype='float64') * bin_width + bounds[0][dim]
+        counts, bins = na.histogram(points[:,dim], bins)
+        # Find the bin that passes the cut points.
+        midpoints = [bounds[0][dim]]
+        sum = 0
+        bin = 0
+        for step in xrange(1,cut_list[level][1]):
+            while sum < ((parts*step)/cut_list[level][1]):
+                lastsum = sum
+                sum += counts[bin]
+                bin += 1
+            # Bin edges
+            left_edge = bins[bin-1]
+            right_edge = bins[bin]
+            # Find a better approx of the midpoint cut line using a linear approx.
+            a = float(sum - lastsum) / (right_edge - left_edge)
+            midpoints.append(left_edge + (0.5 - (float(lastsum) / parts / 2)) / a)
+        midpoints.append(bounds[1][dim])
+
+        # Split the points & update the bounds.
+        subpoints = []
+        subbounds = []
+        for pair in zip(midpoints[:-1],midpoints[1:]):
+            select = na.bitwise_and(points[:,dim] >= pair[0],
+                points[:,dim] < pair[1])
+            subpoints.append(points[select])
+            nb = bounds.copy()
+            nb[0][dim] = pair[0]
+            nb[1][dim] = pair[1]
+            subbounds.append(nb)
+        # If we're at the maxlevel, make a bucket. Otherwise, recurse down.
+        maxlevel = len(cut_list) - 1
+        for pair in zip(subpoints, subbounds):
+            if level == maxlevel:
+                self.bucket_bounds.append(pair[1])
+            else:
+                self._recursive_divide(pair[0], pair[1], level+1, cut_list)
+
     def _join_halolists(self):
         if self.group_count == 0:
             mylog.info("There are no halos found.")



More information about the yt-svn mailing list