[Yt-svn] yt: A new way to load-balance the parallel HOP halo finder.
hg at spacepope.org
hg at spacepope.org
Mon May 10 15:01:06 PDT 2010
hg Repository: yt
details: yt/rev/7f17d891b9a5
changeset: 1648:7f17d891b9a5
user: Stephen Skory <stephenskory at yahoo.com>
date:
Mon May 10 15:00:42 2010 -0700
description:
A new way to load-balance the parallel HOP halo finder.
A random subset of points are read in on all tasks,
and balanced on the root task, defining the full subvolumes.
A minor change to the merger tree, too.
diffstat:
yt/extensions/MergerTree.py | 4 +
yt/lagos/HaloFinding.py | 127 ++++++++++++++++++++++++++++++-----------
2 files changed, 96 insertions(+), 35 deletions(-)
diffs (176 lines):
diff -r 3a2127d8583c -r 7f17d891b9a5 yt/extensions/MergerTree.py
--- a/yt/extensions/MergerTree.py Sat May 08 14:26:17 2010 -0600
+++ b/yt/extensions/MergerTree.py Mon May 10 15:00:42 2010 -0700
@@ -589,6 +589,10 @@
if self.mine == 0:
to_write = []
# Open the temporary database.
+ try:
+ os.remove(temp_name)
+ except OSError:
+ pass
temp_conn = sql.connect(temp_name)
temp_cursor = temp_conn.cursor()
line = "CREATE TABLE Halos (GlobalHaloID INTEGER PRIMARY KEY,\
diff -r 3a2127d8583c -r 7f17d891b9a5 yt/lagos/HaloFinding.py
--- a/yt/lagos/HaloFinding.py Sat May 08 14:26:17 2010 -0600
+++ b/yt/lagos/HaloFinding.py Mon May 10 15:00:42 2010 -0700
@@ -41,7 +41,7 @@
from kd import *
from yt.funcs import *
-import math, sys, itertools, gc
+import math, sys, itertools, gc, random
from collections import defaultdict
TINY = 1.e-40
@@ -1141,12 +1141,14 @@
class parallelHF(GenericHaloFinder, parallelHOPHaloList):
def __init__(self, pf, threshold=160, dm_only=True, resize=True, rearrange=True,\
- fancy_padding=True, safety=1.5, premerge=True):
+ fancy_padding=True, safety=1.5, premerge=True, sample=0.03):
GenericHaloFinder.__init__(self, pf, dm_only, padding=0.0)
- self.padding = 0.0
+ self.padding = 0.0
self.num_neighbors = 65
self.safety = safety
+ self.sample = sample
period = pf["DomainRightEdge"] - pf["DomainLeftEdge"]
+ topbounds = na.array([[0., 0., 0.], period])
# Cut up the volume evenly initially, with no padding.
padded, LE, RE, self._data_source = self._partition_hierarchy_3d(padding=self.padding)
# also get the total mass of particles
@@ -1156,38 +1158,14 @@
if ytcfg.getboolean("yt","inline") == False and \
resize and self._mpi_get_size() != 1:
cut_list = self._partition_hierarchy_3d_bisection_list()
- for i,cut in enumerate(cut_list):
- dim = cut[0]
- if i == 0:
- width = pf["DomainRightEdge"][dim] - pf["DomainLeftEdge"][dim]
- new_LE = pf["DomainLeftEdge"]
- else:
- new_LE, new_RE = new_top_bounds
- width = new_RE[dim] - new_LE[dim]
- if ytcfg.getboolean("yt","inline") == False:
- data = self._data_source[ds_names[dim]]
- else:
- data = self._data_source[ds_names[dim]]
- if i == 0:
- local_parts = data.size
- n_parts = self._mpi_allsum(local_parts)
- min = self._mpi_allmin(local_parts)
- max = self._mpi_allmax(local_parts)
- mylog.info("Initial distribution: (min,max,tot) (%d,%d,%d)" \
- % (min, max, n_parts))
- num_bins = 1000
- bin_width = float(width)/float(num_bins)
- bins = na.arange(num_bins+1, dtype='float64') * bin_width + new_LE[dim]
- counts, bins = na.histogram(data, bins, new=True)
- if i == 0:
- new_group, new_comm, LE, RE, new_top_bounds, new_cc, self._data_source= \
- self._partition_hierarchy_3d_bisection(dim, bins, counts, top_bounds = None,\
- old_group = None, old_comm = None, cut=cut)
- else:
- new_group, new_comm, LE, RE, new_top_bounds, new_cc, self._data_source = \
- self._partition_hierarchy_3d_bisection(dim, bins, counts, top_bounds = new_top_bounds,\
- old_group = new_group, old_comm = new_comm, cut=cut, old_cc=new_cc)
- del bins, counts
+ root_points = self._subsample_points()
+ self.bucket_bounds = []
+ if self._mpi_get_rank() == 0:
+ self._recursive_divide(root_points, topbounds, 0, cut_list)
+ self.bucket_bounds = self._mpi_bcast_pickled(self.bucket_bounds)
+ my_bounds = self.bucket_bounds[self._mpi_get_rank()]
+ LE, RE = my_bounds[0], my_bounds[1]
+ self._data_source = self.hierarchy.region_strict([0.]*3, LE, RE)
# If this isn't parallel, define the region as an AMRRegionStrict so
# particle IO works.
if self._mpi_get_size() == 1:
@@ -1266,6 +1244,85 @@
self._join_halolists()
yt_counters("Final Grouping")
+ def _subsample_points(self):
+ # Read in a random subset of the points in each domain, and then
+ # collect them on the root task.
+ xp = self._data_source["particle_position_x"]
+ yp = self._data_source["particle_position_y"]
+ zp = self._data_source["particle_position_z"]
+ n_parts = self._mpi_allsum(xp.size)
+ local_parts = xp.size
+ random_points = int(self.sample * n_parts)
+ # We want to get a representative selection of random particles in
+ # each subvolume.
+ adjust = float(local_parts) / ( float(n_parts) / self._mpi_get_size())
+ n_random = int(adjust * float(random_points) / self._mpi_get_size())
+ mylog.info("Reading in %d random particles." % n_random)
+ # Get unique random particles.
+ my_points = na.empty((n_random, 3), dtype='float64')
+ uni = na.array(random.sample(xrange(xp.size), n_random))
+ uni = uni[uni.argsort()]
+ my_points[:,0] = xp[uni]
+ my_points[:,1] = yp[uni]
+ my_points[:,2] = zp[uni]
+ # Collect them on the root task.
+ mine, sizes = self._mpi_info_dict(n_random)
+ if mine == 0:
+ tot_random = sum(sizes.values())
+ root_points = na.empty((tot_random, 3), dtype='float64')
+ root_points.shape = (1, 3*tot_random)
+ else:
+ root_points = na.empty([])
+ my_points.shape = (1, n_random*3)
+ root_points = self._mpi_concatenate_array_on_root_double(my_points[0])
+ if mine == 0:
+ root_points.shape = (tot_random, 3)
+ return root_points
+
+ def _recursive_divide(self, points, bounds, level, cut_list):
+ dim = cut_list[level][0]
+ parts = points.shape[0]
+ num_bins = 1000
+ width = bounds[1][dim] - bounds[0][dim]
+ bin_width = width / num_bins
+ bins = na.arange(num_bins+1, dtype='float64') * bin_width + bounds[0][dim]
+ counts, bins = na.histogram(points[:,dim], bins)
+ # Find the bin that passes the cut points.
+ midpoints = [bounds[0][dim]]
+ sum = 0
+ bin = 0
+ for step in xrange(1,cut_list[level][1]):
+ while sum < ((parts*step)/cut_list[level][1]):
+ lastsum = sum
+ sum += counts[bin]
+ bin += 1
+ # Bin edges
+ left_edge = bins[bin-1]
+ right_edge = bins[bin]
+ # Find a better approx of the midpoint cut line using a linear approx.
+ a = float(sum - lastsum) / (right_edge - left_edge)
+ midpoints.append(left_edge + (0.5 - (float(lastsum) / parts / 2)) / a)
+ midpoints.append(bounds[1][dim])
+
+ # Split the points & update the bounds.
+ subpoints = []
+ subbounds = []
+ for pair in zip(midpoints[:-1],midpoints[1:]):
+ select = na.bitwise_and(points[:,dim] >= pair[0],
+ points[:,dim] < pair[1])
+ subpoints.append(points[select])
+ nb = bounds.copy()
+ nb[0][dim] = pair[0]
+ nb[1][dim] = pair[1]
+ subbounds.append(nb)
+ # If we're at the maxlevel, make a bucket. Otherwise, recurse down.
+ maxlevel = len(cut_list) - 1
+ for pair in zip(subpoints, subbounds):
+ if level == maxlevel:
+ self.bucket_bounds.append(pair[1])
+ else:
+ self._recursive_divide(pair[0], pair[1], level+1, cut_list)
+
def _join_halolists(self):
if self.group_count == 0:
mylog.info("There are no halos found.")
More information about the yt-svn
mailing list