[Yt-svn] yt-commit r1706 - in trunk/yt: extensions extensions/kdtree lagos lagos/parallelHOP

sskory at wrangler.dreamhost.com sskory at wrangler.dreamhost.com
Wed Apr 28 15:00:01 PDT 2010


Author: sskory
Date: Wed Apr 28 14:59:59 2010
New Revision: 1706
URL: http://yt.enzotools.org/changeset/1706

Log:
Adding new tools from hg into svn trunk. This includes the parallel Structure Function Generator, parallel Merger Tree, updates to Star Particle Analysis,  memory usage updates to parallel HOP, and the Fortran kDtree.

Added:
   trunk/yt/extensions/MergerTree.py
   trunk/yt/lagos/StructureFunctionGenerator.py
Modified:
   trunk/yt/extensions/StarAnalysis.py
   trunk/yt/extensions/kdtree/fKD.f90
   trunk/yt/extensions/kdtree/fKD.v
   trunk/yt/lagos/__init__.py
   trunk/yt/lagos/kd.py
   trunk/yt/lagos/parallelHOP/parallelHOP.py

Added: trunk/yt/extensions/MergerTree.py
==============================================================================
--- (empty file)
+++ trunk/yt/extensions/MergerTree.py	Wed Apr 28 14:59:59 2010
@@ -0,0 +1,840 @@
+"""
+MergerTree class and member functions.
+
+Author: Stephen Skory <sskory at physics.ucsd.edu>
+Affiliation: CASS/UC San Diego, CA
+Homepage: http://yt.enzotools.org/
+License:
+  Copyright (C) 2008-2010 Stephen Skory.  All Rights Reserved.
+
+  This file is part of yt.
+
+  yt is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; either version 3 of the License, or
+  (at your option) any later version.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+"""
+
+import yt.lagos as lagos
+from yt.extensions.kdtree import *
+from yt.lagos.HaloFinding import HaloFinder
+from yt.logger import lagosLogger as mylog
+import yt.extensions.HaloProfiler as HP
+
+import numpy as na
+import os, glob, md5, time, gc, sys
+import h5py
+import types
+import sqlite3 as sql
+from collections import defaultdict
+
+column_types = {
+"GlobalHaloID":"INTEGER",
+"SnapCurrentTimeIdentifier":"INTEGER",
+"SnapZ":"FLOAT",
+"SnapHaloID":"INTEGER",
+"HaloMass":"FLOAT",
+"NumPart":"INTEGER",
+"CenMassX":"FLOAT",
+"CenMassY":"FLOAT",
+"CenMassZ":"FLOAT",
+"BulkVelX":"FLOAT",
+"BulkVelY":"FLOAT",
+"BulkVelZ":"FLOAT",
+"MaxRad":"FLOAT",
+"ChildHaloID0":"INTEGER",
+"ChildHaloFrac0":"FLOAT",
+"ChildHaloID1":"INTEGER",
+"ChildHaloFrac1":"FLOAT",
+"ChildHaloID2":"INTEGER",
+"ChildHaloFrac2":"FLOAT",
+"ChildHaloID3":"INTEGER",
+"ChildHaloFrac3":"FLOAT",
+"ChildHaloID4":"INTEGER", 
+"ChildHaloFrac4":"FLOAT"}
+
+# In order.
+columns = ["GlobalHaloID", "SnapCurrentTimeIdentifier", "SnapZ", 
+"SnapHaloID", "HaloMass", "NumPart", "CenMassX", "CenMassY",
+"CenMassZ", "BulkVelX", "BulkVelY", "BulkVelZ", "MaxRad",
+"ChildHaloID0", "ChildHaloFrac0",
+"ChildHaloID1", "ChildHaloFrac1",
+"ChildHaloID2", "ChildHaloFrac2",
+"ChildHaloID3", "ChildHaloFrac3",
+"ChildHaloID4", "ChildHaloFrac4"]
+
+class DatabaseFunctions(object):
+    # Common database functions so it doesn't have to be repeated.
+    def _open_database(self):
+        # open the database. Check to make sure the database file exists.
+        if not os.path.exists(self.database):
+            mylog.error("The database file %s cannot be found. Exiting." % \
+                self.database)
+            return False
+        self.conn = sql.connect(self.database)
+        self.cursor = self.conn.cursor()
+        return True
+
+    def _close_database(self):
+        # close the database cleanly.
+        self.cursor.close()
+        self.conn.close()
+
+class MergerTree(DatabaseFunctions, lagos.ParallelAnalysisInterface):
+    def __init__(self, restart_files=[], database='halos.db',
+            halo_finder_function=HaloFinder, halo_finder_threshold=80.0,
+            FOF_link_length=0.2, dm_only=False, refresh=False, sleep=1,
+            index=True):
+        self.restart_files = restart_files # list of enzo restart files
+        self.with_halos = na.ones(len(restart_files), dtype='bool')
+        self.database = database # the sqlite database of haloes.
+        self.halo_finder_function = halo_finder_function # which halo finder to use
+        self.halo_finder_threshold = halo_finder_threshold # overdensity threshold
+        self.FOF_link_length= FOF_link_length # For FOF
+        self.dm_only = dm_only
+        self.refresh = refresh
+        self.sleep = sleep # How long to wait between db sync checks.
+        if self.sleep <= 0.:
+            self.sleep = 5
+        # MPI stuff
+        self.mine = self._mpi_get_rank()
+        if self.mine is None:
+            self.mine = 0
+        self.size = self._mpi_get_size()
+        if self.size is None:
+            self.size = 1
+        # Get to work.
+        if self.refresh and self.mine == 0:
+            try:
+                os.unlink(self.database)
+            except:
+                pass
+        self._barrier()
+        self._open_create_database()
+        self._create_halo_table()
+        self._run_halo_finder_add_to_db()
+        # Find the h5 file names for all the halos.
+        for snap in self.restart_files:
+            self._build_h5_refs(snap)
+        # Loop over the pairs of snapshots to locate likely neighbors, and
+        # then use those likely neighbors to compute fractional contributions.
+        last = None
+        for snap, pair in enumerate(zip(self.restart_files[:-1], self.restart_files[1:])):
+            if not self.with_halos[snap] or not self.with_halos[snap+1]:
+                continue
+            self._find_likely_children(pair[0], pair[1])
+            # last is the data for the parent dataset, which can be supplied
+            # as the child from the previous round for all but the first loop.
+            last = self._compute_child_fraction(pair[0], pair[1], last)
+        del last
+        if self.mine == 0 and index:
+            mylog.info("Creating database index.")
+            line = "CREATE INDEX IF NOT EXISTS HalosIndex ON Halos ("
+            for name in columns:
+                line += name +","
+            line = line[:-1] + ");"
+            self.cursor.execute(line)
+        self._barrier()
+        self._close_database()
+        mylog.info("Done!")
+        
+    def _read_halo_lists(self):
+        self.halo_lists = []
+        for i,file in enumerate(self.halo_files):
+            hp = HP.HaloProfiler(self.restart_files[i], halo_list_file=file)
+            self.halo_lists.append(hp.all_halos)
+
+    def _run_halo_finder_add_to_db(self):
+        for cycle, file in enumerate(self.restart_files):
+            gc.collect()
+            pf = lagos.EnzoStaticOutput(file)
+            # If the halos are already found, skip this data step, unless
+            # refresh is True.
+            dir = os.path.dirname(file)
+            if os.path.exists(os.path.join(dir, 'MergerHalos.out')) and \
+                    os.path.exists(os.path.join(dir, 'MergerHalos.txt')) and \
+                    glob.glob(os.path.join(dir, 'MergerHalos*h5')) is not [] and \
+                    not self.refresh:
+                pass
+            else:
+                # Run the halo finder.
+                if self.halo_finder_function == yt.lagos.HaloFinding.FOFHaloFinder:
+                    halos = self.halo_finder_function(pf,
+                        link=self.FOF_link_length, dm_only=self.dm_only)
+                else:
+                    halos = self.halo_finder_function(pf,
+                        threshold=self.halo_finder_threshold, dm_only=self.dm_only)
+                halos.write_out(os.path.join(dir, 'MergerHalos.out'))
+                halos.write_particle_lists(os.path.join(dir, 'MergerHalos'))
+                halos.write_particle_lists_txt(os.path.join(dir, 'MergerHalos'))
+                if len(halos) == 0:
+                    mylog.info("Dataset %s has no halos." % file)
+                    self.with_halos[cycle] = False
+                    continue
+                del halos
+            # Now add halo data to the db if it isn't already there by
+            # checking the first halo.
+            currt = pf['CurrentTimeIdentifier']
+            line = "SELECT GlobalHaloID from Halos where SnapHaloID=0\
+            and SnapCurrentTimeIdentifier=%d;" % currt
+            self.cursor.execute(line)
+            result = self.cursor.fetchone()
+            if result != None:
+                continue
+            red = pf['CosmologyCurrentRedshift']
+            # Read the halos off the disk using the Halo Profiler tools.
+            hp = HP.HaloProfiler(file, halo_list_file='MergerHalos.out',
+            halo_list_format={'id':0, 'mass':1, 'numpart':2, 'center':[7, 8, 9], 'velocity':[10, 11, 12], 'r_max':13})
+            if len(hp.all_halos) == 0:
+                mylog.info("Dataset %s has no halos." % file)
+                self.with_halos[cycle] = False
+                continue
+            mylog.info("Entering halos into database for z=%f" % red)
+            if self.mine == 0:
+                for ID,halo in enumerate(hp.all_halos):
+                    numpart = int(halo['numpart'])
+                    values = (None, currt, red, ID, halo['mass'], numpart,
+                    halo['center'][0], halo['center'][1], halo['center'][2],
+                    halo['velocity'][0], halo['velocity'][1], halo['velocity'][2],
+                    halo['r_max'] / pf['mpc'],
+                    -1,0.,-1,0.,-1,0.,-1,0.,-1,0.)
+                    # 23 question marks for 23 data columns.
+                    line = ''
+                    for i in range(23):
+                        line += '?,'
+                    # Pull off the last comma.
+                    line = 'INSERT into Halos VALUES (' + line[:-1] + ')'
+                    self.cursor.execute(line, values)
+                self.conn.commit()
+            self._barrier()
+            del hp
+    
+    def _open_create_database(self):
+        # open the database. This creates the database file on disk if it
+        # doesn't already exist. Open it first on root, and then on the others.
+        if self.mine == 0:
+            self.conn = sql.connect(self.database)
+        self._barrier()
+        self._ensure_db_sync()
+        if self.mine != 0:
+            self.conn = sql.connect(self.database)
+        self.cursor = self.conn.cursor()
+
+    def _ensure_db_sync(self):
+        # If the database becomes out of sync for each task, ostensibly due to
+        # parallel file system funniness, things will go bad very quickly.
+        # Therefore, just to be very, very careful, we will ensure that the
+        # md5 hash of the file is identical across all tasks before proceeding.
+        self._barrier()
+        for i in range(5):
+            try:
+                file = open(self.database)
+            except IOError:
+                # This is to give a little bit of time for the database creation
+                # to replicate across the file system.
+                time.sleep(self.sleep)
+                file = open(self.database)
+            hash = md5.md5(file.read()).hexdigest()
+            file.close()
+            ignore, hashes = self._mpi_info_dict(hash)
+            hashes = set(hashes.values())
+            if len(hashes) == 1:
+                break
+            else:
+                # Wait a little bit for the file system to (hopefully) sync up.
+                time.sleep(self.sleep)
+        if len(hashes) == 1:
+            return
+        else:
+            mylog.error("The file system is not properly synchronizing the database.")
+            raise RunTimeError("Fatal error. Exiting.")
+
+    def _create_halo_table(self):
+        if self.mine == 0:
+            # Handle the error if it already exists.
+            try:
+                # Create the table that will store the halo data.
+                line = "CREATE TABLE Halos (GlobalHaloID INTEGER PRIMARY KEY,\
+                    SnapCurrentTimeIdentifier INTEGER, SnapZ FLOAT, SnapHaloID INTEGER, \
+                    HaloMass FLOAT,\
+                    NumPart INTEGER, CenMassX FLOAT, CenMassY FLOAT,\
+                    CenMassZ FLOAT, BulkVelX FLOAT, BulkVelY FLOAT, BulkVelZ FLOAT,\
+                    MaxRad FLOAT,\
+                    ChildHaloID0 INTEGER, ChildHaloFrac0 FLOAT, \
+                    ChildHaloID1 INTEGER, ChildHaloFrac1 FLOAT, \
+                    ChildHaloID2 INTEGER, ChildHaloFrac2 FLOAT, \
+                    ChildHaloID3 INTEGER, ChildHaloFrac3 FLOAT, \
+                    ChildHaloID4 INTEGER, ChildHaloFrac4 FLOAT);"
+                self.cursor.execute(line)
+                self.conn.commit()
+            except sql.OperationalError:
+                pass
+        self._barrier()
+    
+    def _find_likely_children(self, parentfile, childfile):
+        # For each halo in the parent list, identify likely children in the 
+        # list of children.
+        
+        # First, read in the locations of the child halos.
+        child_pf = lagos.EnzoStaticOutput(childfile)
+        child_t = child_pf['CurrentTimeIdentifier']
+        line = "SELECT SnapHaloID, CenMassX, CenMassY, CenMassZ FROM \
+        Halos WHERE SnapCurrentTimeIdentifier = %d" % child_t
+        self.cursor.execute(line)
+        
+        mylog.info("Finding likely parents for z=%1.5f child halos." % \
+            child_pf["CosmologyCurrentRedshift"])
+        
+        # Build the kdtree for the children by looping over the fetched rows.
+        child_points = []
+        for row in self.cursor:
+            child_points.append([row[1], row[2], row[3]])
+        # Turn it into fortran.
+        child_points = na.array(child_points)
+        fKD.pos = na.asfortranarray(child_points.T)
+        fKD.qv = na.empty(3, dtype='float64')
+        fKD.dist = na.empty(5, dtype='float64')
+        fKD.tags = na.empty(5, dtype='int64')
+        fKD.nn = 5
+        fKD.sort = True
+        fKD.rearrange = True
+        create_tree()
+
+        # Find the parent points from the database.
+        parent_pf = lagos.EnzoStaticOutput(parentfile)
+        parent_t = parent_pf['CurrentTimeIdentifier']
+        line = "SELECT SnapHaloID, CenMassX, CenMassY, CenMassZ FROM \
+        Halos WHERE SnapCurrentTimeIdentifier = %d" % parent_t
+        self.cursor.execute(line)
+
+        # Loop over the returned rows, and find the likely neighbors for the
+        # parents.
+        candidates = {}
+        for row in self.cursor:
+            fKD.qv = na.array([row[1], row[2], row[3]])
+            find_nn_nearest_neighbors()
+            NNtags = fKD.tags[:] - 1
+            nIDs = []
+            for n in NNtags:
+                nIDs.append(n)
+            if len(nIDs) < 5:
+                # We need to fill in fake halos if there aren't enough halos,
+                # which can happen at high redshifts.
+                while len(nIDs) < 5:
+                    nIDs.append(-1)
+            candidates[row[0]] = nIDs
+        
+        del fKD.pos, fKD.tags, fKD.dist
+        free_tree() # Frees the kdtree object.
+        
+        self.candidates = candidates
+        
+        # This stores the masses contributed to each child candidate.
+        self.child_mass_arr = na.zeros(len(candidates)*5, dtype='float64')
+        # Records where to put the entries in the above array.
+        self.child_mass_loc = defaultdict(dict)
+        for i,halo in enumerate(sorted(candidates)):
+            for j, child in enumerate(candidates[halo]):
+                self.child_mass_loc[halo][child] = i*5 + j
+
+    def _build_h5_refs(self, filename):
+        # For this snapshot, add lists of file names that contain the
+        # particle info for each halo.
+        if not hasattr(self, 'h5files'):
+            self.h5files = defaultdict(dict)
+        if not hasattr(self, 'names'):
+            self.names = defaultdict(set)
+        file_pf = lagos.EnzoStaticOutput(filename)
+        currt = file_pf['CurrentTimeIdentifier']
+        dir = os.path.dirname(filename)
+        h5txt = os.path.join(dir, 'MergerHalos.txt')
+        lines = file(h5txt)
+        names = set([])
+        for i,line in enumerate(lines):
+            # Get rid of the carriage returns and turn it into a list.
+            line = line.strip().split()
+            self.h5files[currt][i] = line[1:]
+            names.update(line[1:])
+            self.names[currt].update(line[1:])
+        lines.close()
+
+    def _compute_child_fraction(self, parentfile, childfile, last):
+        # Given a parent and child snapshot, and a list of child candidates,
+        # compute what fraction of the parent halo goes to each of the children.
+        
+        parent_pf = lagos.EnzoStaticOutput(parentfile)
+        child_pf = lagos.EnzoStaticOutput(childfile)
+        parent_currt = parent_pf['CurrentTimeIdentifier']
+        child_currt = child_pf['CurrentTimeIdentifier']
+        
+        mylog.info("Computing fractional contribututions of particles to z=%1.5f halos." % \
+            child_pf['CosmologyCurrentRedshift'])
+        
+        if last == None:
+            # First we're going to read in the particles, haloIDs and masses from
+            # the parent dataset.
+            parent_names = list(self.names[parent_currt])
+            parent_names.sort()
+            parent_IDs = na.array([], dtype='int64')
+            parent_masses = na.array([], dtype='float64')
+            parent_halos = na.array([], dtype='int32')
+            for i,pname in enumerate(parent_names):
+                if i>=self.mine and i%self.size==self.mine:
+                    h5fp = h5py.File(pname)
+                    for group in h5fp:
+                        gID = int(group[4:])
+                        thisIDs = h5fp[group]['particle_index'][:]
+                        thisMasses = h5fp[group]['ParticleMassMsun'][:]
+                        parent_IDs = na.concatenate((parent_IDs, thisIDs))
+                        parent_masses = na.concatenate((parent_masses, thisMasses))
+                        parent_halos = na.concatenate((parent_halos, 
+                            na.ones(thisIDs.size, dtype='int32') * gID))
+                    h5fp.close()
+            
+            # Sort the arrays by particle index in ascending order.
+            sort = parent_IDs.argsort()
+            parent_IDs = parent_IDs[sort]
+            parent_masses = parent_masses[sort]
+            parent_halos = parent_halos[sort]
+            del sort
+        else:
+            # We can use old data and save disk reading.
+            (parent_IDs, parent_masses, parent_halos) = last
+        # Used to communicate un-matched particles.
+        parent_send = na.ones(parent_IDs.size, dtype='bool')
+        
+        # Now get the child halo data.
+        child_names = list(self.names[child_currt])
+        child_names.sort()
+        child_IDs = na.array([], dtype='int64')
+        child_masses = na.array([], dtype='float64')
+        child_halos = na.array([], dtype='int32')
+        for i,cname in enumerate(child_names):
+            if i>=self.mine and i%self.size==self.mine:
+                h5fp = h5py.File(cname)
+                for group in h5fp:
+                    gID = int(group[4:])
+                    thisIDs = h5fp[group]['particle_index'][:]
+                    thisMasses = h5fp[group]['ParticleMassMsun'][:]
+                    child_IDs = na.concatenate((child_IDs, thisIDs))
+                    child_masses = na.concatenate((child_masses, thisMasses))
+                    child_halos = na.concatenate((child_halos, 
+                        na.ones(thisIDs.size, dtype='int32') * gID))
+                h5fp.close()
+        
+        # Sort the arrays by particle index.
+        sort = child_IDs.argsort()
+        child_IDs = child_IDs[sort]
+        child_masses = child_masses[sort]
+        child_halos = child_halos[sort]
+        child_send = na.ones(child_IDs.size, dtype='bool')
+        del sort
+        
+        # Parent IDs on the left, child IDs on the right. We skip down both
+        # columns matching IDs. If they are out of synch, the index(es) is/are
+        # advanced until they match up again.
+        left = 0
+        right = 0
+        while left < parent_IDs.size and right < child_IDs.size:
+            if parent_IDs[left] == child_IDs[right]:
+                # They match up, add this relationship.
+                try:
+                    loc = self.child_mass_loc[parent_halos[left]][child_halos[right]]
+                except KeyError:
+                    # This happens when a child halo contains a particle from
+                    # a parent halo, but the child is not identified as a 
+                    # candidate child halo. So we do nothing and move on with
+                    # our lives.
+                    left += 1
+                    right += 1
+                    continue
+                self.child_mass_arr[loc] += parent_masses[left]
+                # Mark this pair so we don't send them later.
+                parent_send[left] = False
+                child_send[right] = False
+                left += 1
+                right += 1
+                continue
+            if parent_IDs[left] < child_IDs[right]:
+                # The left is too small, so we need to increase it.
+                left += 1
+                continue
+            if parent_IDs[left] > child_IDs[right]:
+                # Right too small.
+                right += 1
+                continue
+
+        # Now we send all the un-matched particles to the root task for one more
+        # pass. This depends on the assumption that most of the particles do
+        # not move very much between data dumps, so that not too many particles
+        # will be dumped on the single task.
+        parent_IDs_tosend = parent_IDs[parent_send]
+        parent_masses_tosend = parent_masses[parent_send]
+        parent_halos_tosend = parent_halos[parent_send]
+        child_IDs_tosend = child_IDs[child_send]
+        child_halos_tosend = child_halos[child_send]
+        
+        parent_IDs_tosend = self._mpi_concatenate_array_on_root_long(parent_IDs_tosend)
+        parent_masses_tosend = self._mpi_concatenate_array_on_root_double(parent_masses_tosend)
+        parent_halos_tosend = self._mpi_concatenate_array_on_root_int(parent_halos_tosend)
+        child_IDs_tosend = self._mpi_concatenate_array_on_root_long(child_IDs_tosend)
+        child_halos_tosend = self._mpi_concatenate_array_on_root_int(child_halos_tosend)
+
+        # Resort the received particles.
+        Psort = parent_IDs_tosend.argsort()
+        parent_IDs_tosend = parent_IDs_tosend[Psort]
+        parent_masses_tosend = parent_masses_tosend[Psort]
+        parent_halos_tosend = parent_halos_tosend[Psort]
+        Csort = child_IDs_tosend.argsort()
+        child_IDs_tosend = child_IDs_tosend[Csort]
+        child_halos_tosend = child_halos_tosend[Csort]
+        del Psort, Csort
+
+        # Now Again.
+        if self.mine == 0:
+            matched = 0
+            left = 0
+            right = 0
+            while left < parent_IDs_tosend.size and right < child_IDs_tosend.size:
+                if parent_IDs_tosend[left] == child_IDs_tosend[right]:
+                    # They match up, add this relationship.
+                    try:
+                        loc = self.child_mass_loc[parent_halos_tosend[left]][child_halos_tosend[right]]
+                    except KeyError:
+                        # This happens when a child halo contains a particle from
+                        # a parent halo, but the child is not identified as a 
+                        # candidate child halo. So we do nothing and move on with
+                        # our lives.
+                        left += 1
+                        right += 1
+                        continue
+                    self.child_mass_arr[loc] += parent_masses_tosend[left]
+                    matched += 1
+                    left += 1
+                    right += 1
+                    continue
+                if parent_IDs_tosend[left] < child_IDs_tosend[right]:
+                    # The left is too small, so we need to increase it.
+                    left += 1
+                    continue
+                if parent_IDs_tosend[left] > child_IDs_tosend[right]:
+                    # Right too small.
+                    right += 1
+                    continue
+            mylog.info("Clean-up round matched %d of %d parents and %d children." % \
+            (matched, parent_IDs_tosend.size, child_IDs_tosend.size))
+
+        # Now we sum up the contributions globally.
+        self.child_mass_arr = self._mpi_Allsum_double(self.child_mass_arr)
+        
+        # Turn these Msol masses into percentages of the parent.
+        line = "SELECT HaloMass FROM Halos WHERE SnapCurrentTimeIdentifier=%d \
+        ORDER BY SnapHaloID ASC;" % parent_currt
+        self.cursor.execute(line)
+        mark = 0
+        result = self.cursor.fetchone()
+        while result:
+            mass = result[0]
+            self.child_mass_arr[mark:mark+5] /= mass
+            mark += 5
+            result = self.cursor.fetchone()
+        
+        # Get the global ID for the SnapHaloID=0 from the child, this will
+        # be used to prevent unnecessary SQL reads.
+        line = "SELECT GlobalHaloID FROM Halos WHERE SnapCurrentTimeIdentifier=%d \
+        AND SnapHaloID=0;" % child_currt
+        self.cursor.execute(line)
+        baseChildID = self.cursor.fetchone()[0]
+        
+        # Now we prepare a big list of writes to put in the database.
+        write_values = []
+        for i,parent_halo in enumerate(sorted(self.candidates)):
+            child_indexes = []
+            child_per = []
+            for j,child in enumerate(self.candidates[parent_halo]):
+                if child == -1:
+                    # Account for fake children.
+                    child_indexes.append(-1)
+                    child_per.append(0.)
+                    continue
+                # We need to get the GlobalHaloID for this child.
+                child_globalID = baseChildID + child
+                child_indexes.append(child_globalID)
+                child_per.append(self.child_mass_arr[i*5 + j])
+            # Sort by percentages, desending.
+            child_per, child_indexes = zip(*sorted(zip(child_per, child_indexes), reverse=True))
+            values = []
+            for pair in zip(child_indexes, child_per):
+                values.extend([int(pair[0]), float(pair[1])])
+            values.extend([parent_currt, parent_halo])
+            # This has the child ID, child percent listed five times, followed
+            # by the currt and this parent halo ID (SnapHaloID).
+            values = tuple(values)
+            write_values.append(values)
+        
+        # Now we do the actual writing, but only by task 0.
+        line = 'UPDATE Halos SET ChildHaloID0=?, ChildHaloFrac0=?,\
+        ChildHaloID1=?, ChildHaloFrac1=?,\
+        ChildHaloID2=?, ChildHaloFrac2=?,\
+        ChildHaloID3=?, ChildHaloFrac3=?,\
+        ChildHaloID4=?, ChildHaloFrac4=?\
+        WHERE SnapCurrentTimeIdentifier=? AND SnapHaloID=?;'
+        if self.mine == 0:
+            for values in write_values:
+                self.cursor.execute(line, values)
+            self.conn.commit()
+        
+        # This has a barrier in it, which ensures the disk isn't lagging.
+        self._ensure_db_sync()
+        
+        return (child_IDs, child_masses, child_halos)
+
+class MergerTreeConnect(DatabaseFunctions):
+    def __init__(self, database='halos.db'):
+        self.database = database
+        result = self._open_database()
+        if not result:
+            return None
+    
+    def close(self):
+        # To be more like typical Python open/close.
+        self._close_database()
+    
+    def query(self, string):
+        # Query the database and return a list of tuples.
+        if string is None:
+            mylog.error("You must enter a SQL query.")
+            return None
+        items = []
+        self.cursor.execute(string)
+        results = self.cursor.fetchone()
+        while results:
+            items.append(results)
+            results = self.cursor.fetchone()
+        return items
+
+class Node(object):
+    def __init__(self, CoM, mass, parentIDs, z, color):
+        self.CoM = CoM
+        self.mass = mass
+        self.parentIDs = parentIDs # In descending order of contribution
+        self.z = z
+        self.color = color
+
+class Link(object):
+    def __init__(self):
+        self.childIDs = []
+        self.fractions = []
+
+class MergerTreeDotOutput(DatabaseFunctions, lagos.ParallelAnalysisInterface):
+    def __init__(self, halos=None, database='halos.db',
+            dotfile='MergerTree.gv', current_time=None, link_min=0.2):
+        self.database = database
+        self.link_min = link_min
+        if halos is None:
+            mylog.error("Please provide at least one halo to start the tree. Exiting.")
+            return None
+        result = self._open_database()
+        if not result:
+            return None
+        if type(halos) == types.IntType:
+            halos = [halos]
+        if current_time is not None:
+            halos = self._translate_haloIDs(halos, current_time)
+        newhalos = set(halos)
+        # A key is the GlobalHaloID for this halo, and the content is a
+        # Node object.
+        self.nodes = {}
+        # A key is the GlobalHaloID for the parent in the relationship,
+        # and the content is a Link ojbect.
+        self.links = defaultdict(Link)
+        # Record which halos are at the same z level for convenience.
+        # They key is a z value, and the content a list of co-leveled halo IDs.
+        self.levels = defaultdict(list)
+        # For the first set of halos.
+        self._add_nodes(newhalos)
+        # Recurse over parents.
+        while len(newhalos) > 0:
+            mylog.info("Finding parents for %d children." % len(newhalos))
+            newhalos = self._find_parents(newhalos)
+            self._add_nodes(newhalos)
+        mylog.info("Writing out %s to disk." % dotfile)
+        self._open_dot(dotfile)
+        self._write_nodes()
+        self._write_links()
+        self._write_levels()
+        self._close_dot()
+        self._close_database()
+        return None
+
+    def _translate_haloIDs(self, halos, current_time):
+        # If the input is in the haloID equivalent to SnapHaloID, translate them
+        # to GlobalHaloIDs.
+        new_haloIDs=[]
+        for halo in halos:
+            line = "SELECT GlobalHaloID FROM Halos WHERE SnapHaloID=? AND \
+            SnapCurrentTimeIdentifier=? limit 1;"
+            values = (halo, current_time)
+            self.cursor.execute(line, values)
+            new_haloIDs.append(self.cursor.fetchone()[0])
+        return new_haloIDs
+        
+    def _find_parents(self, halos):
+        # Given a set of halos, find their parents and add that to each of their
+        # node records. At the same time, make a link record for that
+        # relationship.
+        # This stores the newly discovered parent halos.
+        newhalos = set([])
+        for halo in halos:
+            line = "SELECT GlobalHaloID, ChildHaloFrac0,\
+                ChildHaloFrac1, ChildHaloFrac2,ChildHaloFrac3, ChildHaloFrac4,\
+                ChildHaloID0, ChildHaloID1, ChildHaloID2, \
+                ChildHaloID3, ChildHaloID4 \
+                FROM Halos WHERE\
+                ChildHaloID0=? or ChildHaloID1=? or ChildHaloID2=? or\
+                ChildHaloID3=? or ChildHaloID4=?;"
+            values = (halo, halo, halo, halo, halo)
+            self.cursor.execute(line, values)
+            result = self.cursor.fetchone()
+            while result:
+                res = list(result)
+                pID = result[0]
+                pfracs = res[1:6]
+                cIDs = res[6:11]
+                for pair in zip(cIDs, pfracs):
+                    if pair[1] <= self.link_min or pair[0] != halo:
+                        continue
+                    else:
+                        self.nodes[halo].parentIDs.append(pID)
+                        self.links[pID].childIDs.append(halo)
+                        self.links[pID].fractions.append(pair[1])
+                        newhalos.add(pID)
+                result = self.cursor.fetchone()
+        return newhalos
+    
+    def _add_nodes(self, newhalos):
+        # Each call of this function always happens for a set of newhalos that
+        # are at the same z. To give the halos color we will figure out how
+        # many halos total were found this z.
+        # There's probably a way to do this with only one SQL operation.
+        if len(newhalos) == 0:
+            return
+        ahalo = list(newhalos)[0]
+        line = 'SELECT SnapCurrentTimeIdentifier FROM Halos WHERE GlobalHaloID=?;'
+        values = (ahalo,)
+        self.cursor.execute(line, values)
+        result = self.cursor.fetchone()
+        # Use currt to get the number.
+        line = 'SELECT max(SnapHaloID) FROM Halos where SnapCurrentTimeIdentifier=?;'
+        values = (result[0],)
+        self.cursor.execute(line, values)
+        maxID = self.cursor.fetchone()[0]
+        # For the new halos, create nodes for them.
+        for halo in newhalos:
+            line = 'SELECT SnapZ, HaloMass, CenMassX, CenMassY, CenMassZ,\
+            SnapHaloID FROM Halos WHERE GlobalHaloID=? limit 1;'
+            value = (halo,)
+            self.cursor.execute(line, value)
+            result = self.cursor.fetchone()
+            self.nodes[halo] = Node(na.array([result[2],result[3],result[4]]),
+                result[1], [], result[0], 1. - float(result[5])/maxID)
+            self.levels[result[0]].append(halo)
+
+    def _open_dot(self, dotfile):
+        # Write out the opening stuff in the dotfile.
+        self.dotfile=self._write_on_root(dotfile)
+        line = 'digraph galaxy {size="10, 10";\n'
+        line += 'node [style=bold, shape=record];\n'
+        self.dotfile.write(line)
+    
+    def _close_dot(self):
+        self.dotfile.write("\n};\n")
+        self.dotfile.close()
+    
+    def _write_nodes(self):
+        # Write out the nodes to the dot file.
+        self.dotfile.write("{\n")
+        for halo in self.nodes:
+            this = self.nodes[halo]
+            line = '"%d" [label="{%1.3e\\n(%1.3f,%1.3f,%1.3f)}", shape="record",' \
+                % (halo, this.mass, this.CoM[0], this.CoM[1], this.CoM[2])
+            line += ' color="%0.3f 1. %0.3f"];\n' % (this.color, this.color)
+            self.dotfile.write(line)
+        self.dotfile.write("};\n")
+    
+    def _write_links(self):
+        # Write out the links to the dot file.
+        self.dotfile.write("{\n")
+        for parent in self.links:
+            this = self.links[parent]
+            for child,frac in zip(this.childIDs, this.fractions):
+                if frac > self.link_min:
+                    line = '"%d"->"%d" [label="%3.2f%%", color="blue", fontsize=10];\n' \
+                        % (parent, child, frac*100.)
+                    self.dotfile.write(line)
+        self.dotfile.write("};\n")
+
+    def _write_levels(self):
+        # Write out the co-leveled halos to the dot file.
+        for z in self.levels:
+            this = self.levels[z]
+            self.dotfile.write("{ rank = same;\n")
+            line = '"%1.5f"; ' % z
+            for halo in this:
+                line += '"%d"; ' % halo
+            line += "\n};\n"
+            self.dotfile.write(line)
+        # Also write out the unlinked boxes for the redshifts.
+        line = '{"%1.5f" [label="{%1.5f}", shape="record" color="green"];}\n' \
+            % (z, z)
+
+class MergerTreeTextOutput(DatabaseFunctions, lagos.ParallelAnalysisInterface):
+    def __init__(self, database='halos.db', outfile='MergerTreeDB.txt'):
+        self.database = database
+        self.outfile = outfile
+        result = self._open_database()
+        if not result:
+            return None
+        self._write_out()
+        self._close_database()
+        return None
+    
+    def _write_out(self):
+        # Essentially dump the contents of the database into a text file.
+        fp = open(self.outfile, "w")
+        # Make the header line.
+        spacing = {}
+        for column in columns:
+            spacing[column] = (max(15,len(column)+1))
+        line = "# "
+        for column in columns:
+            line += "%s" % column.ljust(spacing[column])
+        line += "\n"
+        fp.write(line)
+        # Get the data.
+        line = "SELECT * FROM Halos ORDER BY SnapZ DESC, SnapHaloID ASC;"
+        self.cursor.execute(line)
+        results = self.cursor.fetchone()
+        # Write out the columns.
+        while results:
+            line = "  "
+            for i,column in enumerate(columns):
+                if column_types[column] == "FLOAT":
+                    this = "%1.6e" % results[i]
+                    line += this.ljust(spacing[column])
+                if column_types[column] == "INTEGER":
+                    this = "%d" % results[i]
+                    line += this.ljust(spacing[column])
+            line += "\n"
+            fp.write(line)
+            results = self.cursor.fetchone()
+        fp.close()
+        

Modified: trunk/yt/extensions/StarAnalysis.py
==============================================================================
--- trunk/yt/extensions/StarAnalysis.py	(original)
+++ trunk/yt/extensions/StarAnalysis.py	Wed Apr 28 14:59:59 2010
@@ -114,11 +114,18 @@
         """
         fp = open(name, "w")
         if self.mode == 'data_source':
-            vol = self._data_source.volume('mpc')
+            try:
+                vol = self._data_source.volume('mpc')
+            except AttributeError:
+                # If we're here, this is probably a HOPHalo object, and we
+                # can get the volume this way.
+                ds = self._data_source.get_sphere()
+                vol = ds.volume('mpc')
         elif self.mode == 'provided':
             vol = self.volume
         tc = self._pf["Time"]
         # Use the center of the time_bin, not the left edge.
+        fp.write("#time\tlookback\tredshift\tMsol/yr\tMsol/yr/Mpc3\tMsol\tcumMsol\t\n")
         for i, time in enumerate((self.time_bins[1:] + self.time_bins[:-1])/2.):
             line = "%1.5e\t%1.5e\t%1.5e\t%1.5e\t%1.5e\t%1.5e\t%1.5e\n" % \
             (time * tc / YEAR, # Time
@@ -215,7 +222,7 @@
     
     def calculate_spectrum(self, data_source=None, star_mass=None,
             star_creation_time=None, star_metallicity_fraction=None,
-            star_metallicity_constant=None):
+            star_metallicity_constant=None, min_age=0.):
         """
         For the set of stars, calculate the collective spectrum.
         Attached to the output are several useful objects:
@@ -233,6 +240,8 @@
         metallicity fractions, in code units (which is not Z/Zsun).
         :param star_metallicity_constant (float): If desired, override the star
         metallicity fraction of all the stars to the given value.
+        :param min_age (float): Removes young stars below this number (in years
+        from the spectrum. Default: 0 (all stars).
         """
         # Initialize values
         self.final_spec = na.zeros(self.wavelength.size, dtype='float64')
@@ -240,6 +249,7 @@
         self.star_mass = star_mass
         self.star_creation_time = star_creation_time
         self.star_metal = star_metallicity_fraction
+        self.min_age = min_age
         
         # Check to make sure we have the right set of data.
         if data_source is None:
@@ -273,8 +283,13 @@
         self.star_metal /= Zsun
         # Age of star in years.
         dt = (self.time_now - self.star_creation_time * self._pf['Time']) / YEAR
+        # Remove young stars
+        sub = dt > self.min_age
+        self.star_metal = self.star_metal[sub]
+        dt = dt[sub]
+        self.star_creation_time = self.star_creation_time[sub]
         # Figure out which METALS bin the star goes into.
-        Mindex = na.digitize(dt, METALS)
+        Mindex = na.digitize(self.star_metal, METALS)
         # Replace the indices with strings.
         Mname = MtoD[Mindex]
         # Figure out which age bin this star goes into.
@@ -284,7 +299,8 @@
         ratio2 = (self.age[Aindex] - dt) / (self.age[Aindex] - self.age[Aindex-1])
         # Sort the stars by metallicity and then by age, which should reduce
         # memory access time by a little bit in the loop.
-        sort = na.lexsort((Aindex, Mname))
+        indexes = na.arange(self.star_metal.size)
+        sort = na.asarray([indexes[i] for i in na.lexsort([indexes, Aindex, Mname])])
         Mname = Mname[sort]
         Aindex = Aindex[sort]
         ratio1 = ratio1[sort]

Modified: trunk/yt/extensions/kdtree/fKD.f90
==============================================================================
--- trunk/yt/extensions/kdtree/fKD.f90	(original)
+++ trunk/yt/extensions/kdtree/fKD.f90	Wed Apr 28 14:59:59 2010
@@ -43,6 +43,33 @@
 
 end subroutine find_nn_nearest_neighbors
 
+subroutine find_many_nn_nearest_neighbors()
+    ! Given an input array of query vectors (qv_many), find their
+    ! nearest neighbors.
+    use kdtree2_module
+    use fKD_module
+    use kdtree2module
+    use tree_nodemodule
+    use intervalmodule
+    
+    integer :: k, number
+    type(kdtree2_result),allocatable :: results(:)
+    
+    allocate(results(nn))
+
+    number = size(qv_many,2)
+
+    do k=1, number
+        qv(:) = qv_many(:,k)
+        call kdtree2_n_nearest(tp=tree2,qv=qv,nn=nn,results=results)
+        nn_tags(:, k) = results%idx
+    end do
+    
+    deallocate(results)
+    return
+
+end subroutine find_many_nn_nearest_neighbors
+
 subroutine find_all_nn_nearest_neighbors()
     ! for all particles in pos, find their nearest neighbors and return the
     ! indexes and distances as big arrays

Modified: trunk/yt/extensions/kdtree/fKD.v
==============================================================================
--- trunk/yt/extensions/kdtree/fKD.v	(original)
+++ trunk/yt/extensions/kdtree/fKD.v	Wed Apr 28 14:59:59 2010
@@ -12,6 +12,7 @@
 dens(:) _real
 mass(:) _real
 qv(3) real
+qv_many(3,:) _real
 nparts integer
 nn integer
 nMerge integer # number of nearest neighbors used in chain merging
@@ -97,5 +98,6 @@
 create_tree() subroutine
 free_tree() subroutine
 find_all_nn_nearest_neighbors subroutine
+find_many_nn_nearest_neighbors subroutine
 find_chunk_nearest_neighbors subroutine
 chainHOP_tags_dens subroutine

Added: trunk/yt/lagos/StructureFunctionGenerator.py
==============================================================================
--- (empty file)
+++ trunk/yt/lagos/StructureFunctionGenerator.py	Wed Apr 28 14:59:59 2010
@@ -0,0 +1,746 @@
+"""
+Structure Function Generator
+
+Author: Stephen Skory <stephenskory at yahoo.com>
+Affiliation: UCSD Physics/CASS
+Homepage: http://yt.enzotools.org/
+License:
+  Copyright (C) 2010 Stephen Skory.  All Rights Reserved.
+
+  This file is part of yt.
+
+  yt is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; either version 3 of the License, or
+  (at your option) any later version.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+"""
+
+from yt.lagos import *
+from yt.math_utils import *
+from yt.performance_counters import yt_counters, time_function
+try:
+    from yt.extensions.kdtree import *
+except ImportError:
+    mylog.info("The Fortran kD-Tree did not import correctly. The structure function generator will not work correctly.")
+
+import math, sys, itertools, inspect, types, time
+from collections import defaultdict
+
+class StructFcnGen(ParallelAnalysisInterface):
+    def __init__(self, pf, fields, left_edge=None, right_edge=None,
+            total_values=1000000, comm_size=10000, length_type="lin",
+            length_number=10, length_range=None, vol_ratio = 1,
+            salt=0):
+        """
+        *total_values* (int) How many total (global) pair calculations to run
+        for each of the structure functions specified.
+        A single integer. Default: 1,000,000.
+        *comm_size* (int) How entries are sent during communication.
+        Default: 10,000.
+        Set the parameters used to search the simulational volume with randomly
+        placed 'rulers'.
+        *length_type* (str) controls the even spacing of the rulers lengths in
+        logarithmic or linear space, set by "log" or "lin", respectively.
+        A single string. Default: "lin"
+        *length_number* (int) sets how many lengths to create, evenly spaced by the above
+        parameter.
+        A single integer. Default: "10"
+        *length_range* (float) a min/max pair for the range of values to search the over
+        the simulational volume.
+        A single pair (a list or array). Default: [sqrt(3)dx, 1/2*shortest box edge],
+        where dx is the smallest grid cell size.
+        *salt* (int) a number that will be added to the random number generator
+        seed. Use this if a different random series of numbers is desired when
+        keeping everything else constant from this set: (MPI task count, 
+        number of ruler lengths, ruler min/max, number of structure functions,
+        number of point pairs per ruler length). Default: 0.
+        """
+        self._fsets = []
+        self.fields = fields
+        # MPI stuff.
+        self.size = self._mpi_get_size()
+        self.mine = self._mpi_get_rank()
+        self.vol_ratio = vol_ratio
+        self.total_values = int(total_values / self.size)
+        # For communication.
+        self.recv_hooks = []
+        self.send_hooks = []
+        self.done_hooks = []
+        self.comm_size = min(int(comm_size), self.total_values)
+        self.pf = pf
+        self.nlevels = pf.h.max_level
+        self.period = self.pf['DomainRightEdge'] - self.pf['DomainLeftEdge']
+        self.min_edge = min(self.period)
+        self.hierarchy = pf.h
+        self.center = (pf["DomainRightEdge"] + pf["DomainLeftEdge"])/2.0
+        # Figure out the range of ruler lengths.
+        if length_range == None:
+            length_range = [math.sqrt(3) * self.pf.h.get_smallest_dx(),
+                self.min_edge/2.]
+        else:
+            if len(length_range) != 2:
+                raise ValueError("length_range must have two values.")
+            if length_range[1] <= length_range[0]:
+                raise ValueError("length_range[1] must be larger than length_range[0]")
+            if length_range[1] > self.min_edge/2.:
+                length_range[1] = self.min_edge/2.
+                mylog.info("Automatically adjusting length_range[1] to half the shortest box edge.")
+        if length_range[0] == -1 or length_range[0] == -1.:
+            mylog.info("Automatically adjusting length_range[0] to %1.5e." % \
+                (math.sqrt(3) * self.pf.h.get_smallest_dx()))
+            length_range[0] = math.sqrt(3) * self.pf.h.get_smallest_dx()
+        # Make the list of ruler lengths.
+        if length_type == "lin":
+            self.lengths = na.linspace(length_range[0], length_range[1],
+                length_number)
+        elif length_type == "log":
+            self.lengths = na.logspace(math.log10(length_range[0]),
+                math.log10(length_range[1]), length_number)
+        else:
+            # Something went wrong.
+            raise SyntaxError("length_type is either \"lin\" or \"log\".")
+        # Subdivide the volume.
+        if not left_edge or not right_edge:
+            self.left_edge = self.pf['DomainLeftEdge']
+            self.right_edge = self.pf['DomainRightEdge']
+            padded, self.LE, self.RE, self.ds = self._partition_hierarchy_3d(padding=0.,
+                rank_ratio = self.vol_ratio)
+        else:
+            self.left_edge = left_edge
+            self.right_edge = right_edge
+            # We do this twice, first with no 'buffer' to get the unbuffered
+            # self.LE/RE, and then second to get a buffered self.ds.
+            padded, self.LE, self.RE, temp = \
+                self._partition_region_3d(left_edge, right_edge,
+                    rank_ratio=self.vol_ratio)
+            padded, temp, temp, self.ds = \
+                self._partition_region_3d(left_edge - self.lengths[-1], \
+                right_edge + self.lengths[-1], rank_ratio=self.vol_ratio)
+        mylog.info("LE %s RE %s %s" % (str(self.LE), str(self.RE), str(self.ds)))
+        self.width = self.ds.right_edge - self.ds.left_edge
+        self.mt = na.random.mtrand.RandomState(seed = 1234 * self.mine + salt)
+    
+    def add_function(self, function, out_labels, sqrt):
+        """
+        Add a structure function to the list that will be evaluated at the
+        generated pairs of points.
+        """
+        fargs = inspect.getargspec(function)
+        if len(fargs.args) != 5:
+            raise SyntaxError("The structure function %s needs five arguments." %\
+                function.__name__)
+        out_labels = list(out_labels)
+        if len(out_labels) < 1:
+            raise SyntaxError("Please specify at least one out_labels for function %s." %\
+                function.__name__)
+        sqrt = list(sqrt)
+        if len(sqrt) != len(out_labels):
+            raise SyntaxError("Please have the same number of elements in out_labels as in sqrt for function %s." %\
+                function.__name__)
+        self._fsets.append(StructSet(self, function, self.min_edge,
+            out_labels, sqrt))
+        return self._fsets[-1]
+
+    def __getitem__(self, key):
+        return self._fsets[key]
+    
+    def run_generator(self):
+        """
+        After all the structure functions have been added, run the generator.
+        """
+        yt_counters("run_generator")
+        # We need a function!
+        if len(self._fsets) == 0:
+            mylog.error("You need to add at least one structure function!")
+            return None
+        # Do all the startup tasks to get the grid points.
+        if self.nlevels == 1:
+            yt_counters("build_sort")
+            self._build_sort_array()
+            self.sort_done = False
+            yt_counters("build_sort")
+        else:
+            yt_counters("init_kd_tree")
+            self._init_kd_tree()
+            self.sort_done = True
+            yt_counters("init_kd_tree")
+        # Store the fields.
+        self.stored_fields = {}
+        yt_counters("getting data")
+        for field in self.fields:
+            self.stored_fields[field] = self.ds[field].copy()
+        self.ds.clear_data()
+        # If the arrays haven't been sorted yet and need to be, do that.
+        if not self.sort_done:
+            for field in self.fields:
+                self.stored_fields[field] = self.stored_fields[field][self.sort]
+            del self.sort
+            self.sort_done = True
+        yt_counters("getting data")
+        self._build_fields_vals()
+        yt_counters("big loop over lengths")
+        t_waiting = 0.
+        for bigloop, length in enumerate(self.lengths):
+            self._build_points_array()
+            if self.mine == 0:
+                mylog.info("Doing length %1.5e" % length)
+            # Things stop when this value below equals total_values.
+            self.generated_points = 0
+            self.gen_array = na.zeros(self.size, dtype='int64')
+            self.comm_cycle_count = 0
+            self.final_comm_cycle_count = 0
+            self.sent_done = False
+            self._setup_done_hooks_on_root()
+            # While everyone else isn't done or I'm not done, we loop.
+            while self._should_cycle():
+                self._setup_recv_arrays()
+                self._send_arrays()
+                t0 = time.time()
+                self._mpi_Request_Waitall(self.send_hooks)
+                self._mpi_Request_Waitall(self.recv_hooks)
+                t1 = time.time()
+                t_waiting += (t1-t0)
+                if (self.recv_points < -1.).any() or (self.recv_points > 1.).any(): # or \
+                        #(na.abs(na.log10(na.abs(self.recv_points))) > 20).any():
+                    raise ValueError("self.recv_points is no good!")
+                self.points = self.recv_points.copy()
+                self.fields_vals = self.recv_fields_vals.copy()
+                self.gen_array = self.recv_gen_array.copy()
+                self._eval_points(length)
+                self.gen_array[self.mine] = self.generated_points
+                self.comm_cycle_count += 1
+                if self.generated_points == self.total_values:
+                    self._send_done_to_root()
+            if self.mine == 0:
+                mylog.info("Length (%d of %d) %1.5e took %d communication cycles to complete." % \
+                (bigloop+1, len(self.lengths), length, self.comm_cycle_count))
+        yt_counters("big loop over lengths")
+        if self.nlevels > 1:
+            del fKD.pos, fKD.qv_many, fKD.nn_tags
+            free_tree() # Frees the kdtree object.
+        yt_counters("allsum")
+        self._allsum_bin_hits()
+        mylog.info("Spent %f seconds waiting for communication." % t_waiting)
+        yt_counters("allsum")
+        yt_counters("run_generator")
+    
+    def _init_kd_tree(self):
+        """
+        Builds the kd tree of grid center points.
+        """
+        # Grid cell centers.
+        mylog.info("Multigrid: Building kD-Tree.")
+        xp = self.ds["x"]
+        yp = self.ds["y"]
+        zp = self.ds["z"]
+        fKD.pos = na.asfortranarray(na.empty((3,xp.size), dtype='float64'))
+        fKD.pos[0, :] = xp[:]
+        fKD.pos[1, :] = yp[:]
+        fKD.pos[2, :] = zp[:]
+        fKD.nn = 1
+        fKD.sort = False
+        fKD.rearrange = True
+        create_tree()
+
+    def _build_sort_array(self):
+        """
+        When running on a unigrid simulation, the kD tree isn't necessary.
+        But we need to ensure that the points are sorted in the usual manner
+        allowing values to be found via array indices.
+        """
+        mylog.info("Unigrid: finding cell centers.")
+        xp = self.ds["x"]
+        yp = self.ds["y"]
+        zp = self.ds["z"]
+        self.sizes = [na.unique(xp).size, na.unique(yp).size, na.unique(zp).size]        
+        self.sort = na.lexsort([zp, yp, xp])
+        del xp, yp, zp
+        self.ds.clear_data()
+    
+    def _build_fields_vals(self):
+        """
+        Builds an array to store the field values array.
+        """
+        self.fields_vals = na.empty((self.comm_size, len(self.fields)*2), \
+            dtype='float64')
+        # At the same time build a dict to label the columns.
+        self.fields_columns = {}
+        for i,field in enumerate(self.fields):
+            self.fields_columns[field] = i
+
+    def _build_points_array(self):
+        """
+        Initializes the array that contains the random points as all negatives
+        to start with.
+        """
+        self.points = na.ones((self.comm_size, 6), dtype='float64') * -1.0
+    
+    def _setup_done_hooks_on_root(self):
+        """
+        Opens non-blocking receives on root pointing to all the other tasks
+        """
+        if self.mine != 0:
+            return
+        self.recv_done = {}
+        for task in xrange(self.size):
+            if task == self.mine: continue
+            self.recv_done[task] = na.zeros(1, dtype='int64')
+            self.done_hooks.append(self._mpi_Irecv_long(self.recv_done[task], \
+                task, tag=15))
+    
+    def _send_done_to_root(self):
+        """
+        Tell the root process that I'm done.
+        """
+        # If I've already done this, don't do it again.
+        if self.sent_done: return
+        if self.mine !=0:
+            # I send when I *think* things should finish.
+            self.send_done = na.ones(1, dtype='int64') * \
+                (self.size / self.vol_ratio -1) + self.comm_cycle_count
+            self.done_hooks.append(self._mpi_Isend_long(self.send_done, \
+                    0, tag=15))
+        else:
+            # As root, I need to mark myself!
+            self.recv_done[0] = na.ones(1, dtype='int64') * \
+                (self.size / self.vol_ratio -1) + self.comm_cycle_count
+        self.sent_done = True
+    
+    def _should_cycle(self):
+        """
+        Determine if I should continue cycling the communication.
+        """
+        if self.mine == 0:
+            # If other tasks aren't finished, this will return False.
+            status = self._mpi_Request_Testall(self.done_hooks)
+            # Convolve this with with root's status.
+            status = status * (self.generated_points == self.total_values)
+            if status == 1:
+                # If they are all finished, meaning Testall returns True,
+                # and root has made its points, we find
+                # the biggest value in self.recv_done and stop there.
+                status = max(self.recv_done.values())
+        else:
+            status = 0
+        # Broadcast the status from root - we stop only if root thinks we should
+        # stop.
+        status = self._mpi_bcast_pickled(status)
+        if status == 0: return True
+        if self.comm_cycle_count < status:
+            return True
+        # If we've come this far, we're done.
+        return False
+
+    def _setup_recv_arrays(self):
+        """
+        Creates the recv buffers and calls a non-blocking MPI receive pointing
+        to the left-hand neighbor.
+        """
+        self.recv_points = na.ones((self.comm_size, 6), dtype='float64') * -1.
+        self.recv_fields_vals = na.zeros((self.comm_size, len(self.fields)*2), \
+            dtype='float64')
+        self.recv_gen_array = na.zeros(self.size, dtype='int64')
+        self.recv_hooks.append(self._mpi_Irecv_double(self.recv_points, \
+            (self.mine-1)%self.size, tag=10))
+        self.recv_hooks.append(self._mpi_Irecv_double(self.recv_fields_vals, \
+            (self.mine-1)%self.size, tag=20))
+        self.recv_hooks.append(self._mpi_Irecv_long(self.recv_gen_array, \
+            (self.mine-1)%self.size, tag=40))
+
+    def _send_arrays(self):
+        """
+        Send the data arrays to the right-hand neighbor.
+        """
+        self.send_hooks.append(self._mpi_Isend_double(self.points,\
+            (self.mine+1)%self.size, tag=10))
+        self.send_hooks.append(self._mpi_Isend_double(self.fields_vals,\
+            (self.mine+1)%self.size, tag=20))
+        self.send_hooks.append(self._mpi_Isend_long(self.gen_array, \
+            (self.mine+1)%self.size, tag=40))
+
+    def _allsum_bin_hits(self):
+        """
+        Add up the hits to all the bins globally for all functions.
+        """
+        for fset in self._fsets:
+            fset.too_low = self._mpi_allsum(fset.too_low)
+            fset.too_high = self._mpi_allsum(fset.too_high)
+            fset.binned = {}
+            if self.mine == 0:
+                mylog.info("Function %s had values out of range for these fields:" % \
+                    fset.function.__name__)
+                for i,field in enumerate(fset.out_labels):
+                    mylog.info("Field %s had %d values too high and %d too low that were not binned." % \
+                    (field, fset.too_high[i], fset.too_low[i]))
+            for length in self.lengths:
+                fset.length_bin_hits[length] = \
+                    self._mpi_Allsum_long(fset.length_bin_hits[length])
+                # Find out how many were successfully binned.
+                fset.binned[length] = fset.length_bin_hits[length].sum()
+                # Normalize the counts.
+                fset.length_bin_hits[length] = \
+                    fset.length_bin_hits[length].astype('float64') / \
+                    fset.binned[length]
+                # Return it to its original shape.
+                fset.length_bin_hits[length] = \
+                    fset.length_bin_hits[length].reshape(fset.bin_number)
+    
+    def _pick_random_points(self, length, size):
+        """
+        Picks out size random pairs separated by length *length*.
+        """
+        # First make random points inside this subvolume.
+        r1 = na.empty((size,3), dtype='float64')
+        for dim in range(3):
+            r1[:,dim] = self.mt.uniform(low=self.ds.left_edge[dim],
+                high=self.ds.right_edge[dim], size=size)
+        # Next we find the second point, determined by a random
+        # theta, phi angle.
+        theta = self.mt.uniform(low=0, high=2.*math.pi, size=size)
+        phi = self.mt.uniform(low=-math.pi/2., high=math.pi/2., size=size)
+        r2 = na.empty((size,3), dtype='float64')
+        r2[:,0] = r1[:,0] + length * na.cos(theta) * na.cos(phi)
+        r2[:,1] = r1[:,1] + length * na.sin(theta) * na.cos(phi)
+        r2[:,2] = r1[:,2] + length * na.sin(phi)
+        # Reflect so it's inside the (full) volume.
+        r2 %= self.period
+        return (r1, r2)
+
+    def _find_nearest_cell(self, points):
+        """
+        Finds the closest grid cell for each point in a vectorized manner.
+        """
+        if self.nlevels == 1:
+            pos = (points - self.ds.left_edge) / self.width
+            n = (self.sizes[2] * pos[:,2]).astype('int32')
+            n += self.sizes[2] * (self.sizes[1] * pos[:,1]).astype('int32')
+            n += self.sizes[2] * self.sizes[1] * (self.sizes[0] * pos[:,0]).astype('int32')
+        else:
+            fKD.qv_many = points.T
+            fKD.nn_tags = na.asfortranarray(na.empty((1, points.shape[0]), dtype='int64'))
+            find_many_nn_nearest_neighbors()
+            # The -1 is for fortran counting.
+            n = fKD.nn_tags[0,:] - 1
+        return n
+
+    def _get_fields_vals(self, points):
+        """
+        Given points, return the values for the fields we need for those
+        points.
+        """
+        # First find the grid data index field.
+        indices = self._find_nearest_cell(points)
+        results = na.empty((len(indices), len(self.fields)), dtype='float64')
+        # Put the field values into the columns of results.
+        for field in self.fields:
+            col = self.fields_columns[field]
+            results[:, col] = self.stored_fields[field][indices]
+        return results
+
+
+    def _eval_points(self, length):
+        # We need to loop over the points array at least once. Further
+        # iterations only happen if we have added new points to the array,
+        # but not as many as we want to, so we need to check again to see if
+        # we can put more points into the buffer.
+        added_points = True
+        while added_points:
+            # If we need to, add more points to the points array.
+            if self.generated_points < self.total_values:
+                # Look for 'empty' slots to put in new pairs.
+                select = (self.points[:,0] < 0)
+                ssum = select.sum()
+                # We'll generate only as many points as we need to/can.
+                size = min(ssum, self.total_values - self.generated_points)
+                (new_r1,new_r2) = self._pick_random_points(length, size)
+                self.generated_points += size
+                # If size != select.sum(), we need to pad the end of new_r1/r2
+                # which is what is effectively happening below.
+                newpoints = na.ones((ssum, 6), dtype='float64') * -1.
+                newpoints[:size,:3] = new_r1
+                newpoints[:size,3:] = new_r2
+                # Now we insert them into self.points.
+                self.points[select] = newpoints
+            else:
+                added_points = False
+            
+            # If we have an empty buffer here, we can skip everything below.
+            if (self.points < 0).all():
+                added_points = False # Not strictly required, but clearer.
+                break
+            
+            # Now we have a points array that is either full of unevaluated points,
+            # or I don't need to make any new points and I'm just processing the
+            # array. Start by finding the indices of the points I own.
+            self.points.shape = (self.comm_size*2, 3) # Doesn't make a copy - fast!
+            select = na.bitwise_or((self.points < self.ds.left_edge).any(axis=1),
+                (self.points >= self.ds.right_edge).any(axis=1))
+            select = na.invert(select)
+            mypoints = self.points[select]
+            if mypoints.size > 0:
+                # Get the fields values.
+                results = self._get_fields_vals(mypoints)
+                # Put this into self.fields_vals.
+                self.fields_vals.shape = (self.comm_size*2, len(self.fields))
+                self.fields_vals[select] = results
+            
+            # Put our arrays back into their original shapes cheaply!
+            if mypoints.size > 0:
+                self.fields_vals.shape = (self.comm_size, len(self.fields)*2)
+            self.points.shape = (self.comm_size, 6)
+            
+            # To run the structure functions, what is key is that the
+            # second point in the pair is ours.
+            second_points = self.points[:,3:]
+            select = na.bitwise_or((second_points < self.ds.left_edge).any(axis=1),
+                (second_points >= self.ds.right_edge).any(axis=1))
+            select = na.invert(select)
+            if select.any():
+                points_to_eval = self.points[select]
+                fields_to_eval = self.fields_vals[select]
+                
+                # Find the normal vector between our points.
+                vec = na.abs(points_to_eval[:,:3] - points_to_eval[:,3:])
+                norm = na.sqrt(na.sum(na.multiply(vec,vec), axis=1))
+                # I wish there was a better way to do this, but I can't find it.
+                for i, n in enumerate(norm):
+                    vec[i] = na.divide(vec[i], n)
+                
+                # Now evaluate the functions.
+                for fcn_set in self._fsets:
+                    fcn_results = fcn_set._eval_st_fcn(fields_to_eval,points_to_eval,
+                        vec)
+                    fcn_set._bin_results(length, fcn_results)
+                
+                # Now clear the buffers at the processed points.
+                self.points[select] = na.array([-1.]*6, dtype='float64')
+                
+            else:
+                # We didn't clear any points, so we should move on with our
+                # lives and pass this buffer along.
+                added_points = False
+
+    @parallel_blocking_call
+    def write_out_means(self):
+        """
+        Writes out the weighted-average value for each structure function for
+        each dimension for each ruler length to a text file. The data is written
+        to files of the name 'function_name.txt' in the current working
+        directory.
+        """
+        sep = 12
+        for fset in self._fsets:
+            fp = self._write_on_root("%s.txt" % fset.function.__name__)
+            fset._avg_bin_hits()
+            line = "# length".ljust(sep)
+            line += "count".ljust(sep)
+            for dim in fset.dims:
+                line += ("%s" % fset.out_labels[dim]).ljust(sep)
+            fp.write(line + "\n")
+            for length in self.lengths:
+                line = ("%1.5e" % length).ljust(sep)
+                line += ("%d" % fset.binned[length]).ljust(sep)
+                for dim in fset.dims:
+                    if fset.sqrt[dim]:
+                        line += ("%1.5e" % \
+                            math.sqrt(fset.length_avgs[length][dim])).ljust(sep)
+                    else:
+                        line += ("%1.5e" % \
+                            fset.length_avgs[length][dim]).ljust(sep)
+                line += "\n"
+                fp.write(line)
+            fp.close()
+    
+    @parallel_root_only
+    def write_out_arrays(self):
+        """
+        Writes out the raw probability bins and the bin edges to an HDF5 file
+        for each of the structure functions. The files are named 
+        'function_name.txt' and saved in the current working directory.
+        """
+        if self.mine == 0:
+            for fset in self._fsets:
+                f = h5py.File("%s.h5" % fset.function.__name__, "w")
+                bin_names = []
+                prob_names = []
+                bin_counts = []
+                for dim in fset.dims:
+                    f.create_dataset("/bin_edges_%02d_%s" % \
+                        (dim, fset.out_labels[dim]), \
+                        data=fset.bin_edges[dim])
+                    bin_names.append("/bin_edges_%02d_%s" % \
+                    (dim, fset.out_labels[dim]))
+                for i,length in enumerate(self.lengths):
+                    f.create_dataset("/prob_bins_%05d" % i, \
+                        data=fset.length_bin_hits[length])
+                    prob_names.append("/prob_bins_%05d" % i)
+                    bin_counts.append([fset.too_low.sum(), fset.binned[length],
+                        fset.too_high.sum()])
+                f.create_dataset("/bin_edges_names", data=bin_names)
+                #f.create_dataset("/prob_names", data=prob_names)
+                f.create_dataset("/lengths", data=self.lengths)
+                f.create_dataset("/counts", data=bin_counts)
+                f.close()
+
+class StructSet(StructFcnGen):
+    def __init__(self,sfg, function, min_edge, out_labels, sqrt):
+        self.sfg = sfg # The overarching SFG class
+        self.function = function # Function to eval between the two points.
+        self.min_edge = min_edge # The length of the minimum edge of the box.
+        self.out_labels = out_labels # For output.
+        self.sqrt = sqrt # which columns to sqrt on output.
+        # These below are used to track how many times the function returns
+        # unbinned results.
+        self.too_low = na.zeros(len(self.out_labels), dtype='int32')
+        self.too_high = na.zeros(len(self.out_labels), dtype='int32')
+        
+    def set_pdf_params(self, bin_type="lin", bin_number=1000, bin_range=None):
+        """
+        Set the parameters used to build the Probability Distribution Function
+        for each ruler length for this function. The values output by the
+        function are slotted into the bins described here.
+        *bin_type* (str) controls the edges of the bins spaced evenly in
+        logarithmic or linear space, set by "log" or "lin", respectively.
+        A single string, or list of strings for N-dim binning. Default: "lin".
+        *bin_number* (int) sets how many bins to create, evenly spaced by the above
+        parameter.
+        A single integer, or a list of integers for N-dim binning. Default: 1000
+        *bin_range* (float) A pair of values giving the range for the bins.
+        A pair of floats (a list), or a list of pairs for N-dim binning.
+        Default: None.
+        """
+        # This should be called after setSearchParams.
+        if not hasattr(self.sfg, "lengths"):
+            mylog.error("Please call setSearchParams() before calling setPDFParams().")
+            return None
+        # Make sure they're either all lists or only one is.
+        input = [bin_type, bin_number, bin_range]
+        lists = 0
+        for thing in input:
+            if type(thing) == types.ListType:
+                lists += 1
+        if lists > 1 and lists < 3:
+            mylog.error("Either all the inputs need to be lists, or only one.")
+            return None
+        # Make sure they're all the same length if they're lists.
+        if lists == 3:
+            first_len = 0
+            for thing in input:
+                if first_len == 0:
+                    first_len = len(thing)
+                    if first_len == 0:
+                        mylog.error("Input cannot be an empty list.")
+                        return None
+                    continue
+                if first_len != len(thing):
+                    mylog.error("All the inputs need to have the same length.")
+                    return None
+        # If they are not all lists, put the input into lists for convenience.
+        if lists == 1:
+            bin_type, bin_number = [bin_type], [bin_number]
+            bin_range = [bin_range]
+        self.bin_type = bin_type
+        self.bin_number = na.array(bin_number) - 1
+        self.dims = range(len(bin_type))
+        # Create the dict that stores the arrays to store the bin hits, and
+        # the arrays themselves.
+        self.length_bin_hits = {}
+        for length in self.sfg.lengths:
+            # It's easier to index flattened, but will be unflattened later.
+            self.length_bin_hits[length] = na.zeros(self.bin_number,
+                dtype='int64').flatten()
+        # Create the bin edges for each dimension.
+        # self.bins is indexed by dimension
+        self.bin_edges = {}
+        for dim in self.dims:
+            # Error check.
+            if len(bin_range[dim]) != 2:
+                raise ValueError("bin_range must have two values.")
+            if bin_range[dim][1] <= bin_range[dim][0]:
+                raise ValueError("bin_range[1] must be larger than bin_range[0]")
+            # Make the edges for this dimension.
+            if bin_type[dim] == "lin":
+                self.bin_edges[dim] = na.linspace(bin_range[dim][0], bin_range[dim][1],
+                    bin_number[dim])
+            elif bin_type[dim] == "log":
+                self.bin_edges[dim] = na.logspace(math.log10(bin_range[dim][0]),
+                    math.log10(bin_range[dim][1]), bin_number[dim])
+            else:
+                raise SyntaxError("bin_edges is either \"lin\" or \"log\".")
+
+    def _eval_st_fcn(self, results, points, vec):
+        """
+        Return the value of the structure function using the provided results.
+        """
+        return self.function(results[:,:len(self.sfg.fields)],
+            results[:,len(self.sfg.fields):], points[:,:3], points[:,3:], vec)
+        """
+        NOTE - A function looks like:
+        def stuff(a,b,r1,r2, vec):
+            return [(a[0] - b[0])/(a[1] + b[1])]
+        where a and b refer to different points in space and the indices
+        are for the different fields, which are given when the function is
+        added. The results need to be a list or array even if it's only one
+        item.
+        """
+
+    def _bin_results(self, length, results):
+        """
+        Add hits to the bins corresponding to these results. length_hit_bins
+        is flattened, so we need to figure out the offset for this hit by
+        factoring the sizes of the other dimensions.
+        """
+        hit_bin = na.zeros(results.shape[0], dtype='int64')
+        multi = 1
+        good = na.ones(results.shape[0], dtype='bool')
+        for dim in range(len(self.out_labels)):
+            for d1 in range(dim):
+                multi *= self.bin_edges[d1].size
+            if dim == 0 and len(self.out_labels)==1:
+                digi = na.digitize(results, self.bin_edges[dim])
+            else:
+                digi = na.digitize(results[:,dim], self.bin_edges[dim])
+            too_low = (digi == 0)
+            too_high = (digi == self.bin_edges[dim].size)
+            self.too_low[dim] += (too_low).sum()
+            self.too_high[dim] += (too_high).sum()
+            newgood = na.bitwise_and(na.invert(too_low), na.invert(too_high))
+            good = na.bitwise_and(good, newgood)
+            hit_bin += na.multiply((digi - 1), multi)
+        digi_bins = na.arange(self.length_bin_hits[length].size+1)
+        hist, digi_bins = na.histogram(hit_bin[good], digi_bins)
+        self.length_bin_hits[length] += hist
+
+    def _dim_sum(self, a, dim):
+        """
+        Given a multidimensional array a, this finds the sum over all the
+        elements leaving the dimension dim untouched.
+        """
+        dims = na.arange(len(a.shape))
+        dims = na.flipud(dims)
+        gt_dims = dims[dims > dim]
+        lt_dims = dims[dims < dim]
+        iter_dims = na.concatenate((gt_dims, lt_dims))
+        for this_dim in iter_dims:
+            a = a.sum(axis=this_dim)
+        return a
+
+    def _avg_bin_hits(self):
+        """
+        For each dimension and length of bin_hits return the weighted average.
+        """
+        self.length_avgs = defaultdict(dict)
+        for length in self.sfg.lengths:
+            for dim in self.dims:
+                self.length_avgs[length][dim] = \
+                    (self._dim_sum(self.length_bin_hits[length], dim) * \
+                    ((self.bin_edges[dim][:-1] + self.bin_edges[dim][1:]) / 2.)).sum()
+

Modified: trunk/yt/lagos/__init__.py
==============================================================================
--- trunk/yt/lagos/__init__.py	(original)
+++ trunk/yt/lagos/__init__.py	Wed Apr 28 14:59:59 2010
@@ -100,6 +100,8 @@
 
 from HaloFinding import *
 
+from StructureFunctionGenerator import *
+
 # We load plugins.  Keep in mind, this can be fairly dangerous -
 # the primary purpose is to allow people to have a set of functions
 # that get used every time that they don't have to *define* every time.

Modified: trunk/yt/lagos/kd.py
==============================================================================
--- trunk/yt/lagos/kd.py	(original)
+++ trunk/yt/lagos/kd.py	Wed Apr 28 14:59:59 2010
@@ -223,7 +223,7 @@
     
     # if this node is close enough (the distances are calculated in the previous
     # iteration), or if the query point is inside the node, continue on
-    if (neighbors.minDistanceSquared > distanceSquared) or inside:
+    if True or inside:
         # leafs don't have children, so this tests to see if this node
         # is a leaf, and if it is a leaf, calculate distances to the query point
         if (node.leftChild is None):

Modified: trunk/yt/lagos/parallelHOP/parallelHOP.py
==============================================================================
--- trunk/yt/lagos/parallelHOP/parallelHOP.py	(original)
+++ trunk/yt/lagos/parallelHOP/parallelHOP.py	Wed Apr 28 14:59:59 2010
@@ -256,6 +256,7 @@
         yt_counters("Picking padding data to send.")
         # Communicate the sizes to send.
         self.mine, global_send_count = self._mpi_info_dict(send_size)
+        del send_size
         # Initialize the arrays to receive data.
         yt_counters("Initalizing recv arrays.")
         recv_real_indices = {}
@@ -392,6 +393,7 @@
         # inside the real region, but within one padding of the boundary,
         # and this will do it.
         self.is_inside_annulus = na.bitwise_and(self.is_inside, inner)
+        del inner
         # Below we make a mapping of real particle index->local ID
         # Unf. this has to be a dict, because any task can have
         # particles of any particle_index, which means that if it were an
@@ -594,6 +596,10 @@
                 # All our neighbors are in the same chain already, so 
                 # we don't need to search again.
                 self.search_again[i] = False
+        try:
+            del NNtags
+        except UnboundLocalError:
+            pass
         yt_counters("preconnect kd tree search.")
         # Recursively jump links until we get to a chain whose densest
         # link is to itself. At that point we've found the densest chain
@@ -632,7 +638,9 @@
                 self.chainID[i] = map[self.chainID[i]]
         del map
         self.densest_in_chain = dic_new.copy()
+        del dic_new
         self.densest_in_chain_real_index = dicri_new.copy()
+        del dicri_new
         self.__max_memory()
         yt_counters("preconnect pregrouping.")
         mylog.info("Preconnected %d chains." % removed)
@@ -731,6 +739,7 @@
                     continue
                 self.chainID[i] = new
             del tolink, fix_map
+        del uniq
         yt_counters("global chain hand-linking.")
         yt_counters("create_global_densest_in_chain")
 
@@ -821,6 +830,7 @@
         self.global_padded_count = self._mpi_joindict(self.global_padded_count)
         # Send/receive 'em.
         self._communicate_uphill_info()
+        del self.global_padded_count
         self.__max_memory()
         # Fix the IDs to localIDs.
         for i,real_index in enumerate(self.recv_real_indices):
@@ -951,7 +961,7 @@
                     continue
         # Clean up.
         del recv_real_indices, recv_chainIDs, real_indices, chainIDs, select
-        del hooks
+        del hooks, global_annulus_count
         # We're done with this here.
         del self.rev_index
         yt_counters("communicate_annulus_chainIDs")
@@ -1027,6 +1037,10 @@
                             boundary_density
                 else:
                     continue
+        try:
+            del point, NNtags, results
+        except UnboundLocalError:
+            pass
         self.__max_memory()
         yt_counters("connect_chains")
 
@@ -1131,10 +1145,7 @@
         Set_list = []
         # We only want the holes that are modulo mine.
         keys = na.arange(groupID, dtype='int64')
-        if self._mpi_get_size() == None:
-            size = 1
-        else:
-            size = self._mpi_get_size()
+        size = self._mpi_get_size()
         select = (keys % size == self.mine)
         groupIDs = keys[select]
         mine_groupIDs = set([]) # Records only ones modulo mine.
@@ -1272,7 +1283,8 @@
             if self.densest_in_group[groupID] < max_dens:
                 self.densest_in_group[groupID] = max_dens
                 self.densest_in_group_real_index[groupID] = self.densest_in_chain_real_index[chainID]
-        del self.densest_in_chain, self.densest_in_chain_real_index
+        del self.densest_in_chain, self.densest_in_chain_real_index, self.reverse_map
+        del self.densest_in_group
         yt_counters("translate_groupIDs")
 
     def _precompute_group_info(self):
@@ -1309,6 +1321,7 @@
         # some groups are on multiple tasks, there is only one densest_in_chain
         # and only that task contributed above.
         self.max_dens_point = self._mpi_Allsum_double(max_dens_point)
+        del max_dens_point
         yt_counters("max dens point")
         # Now CoM.
         yt_counters("CoM")
@@ -1334,6 +1347,7 @@
             if ms_u.size == 1:
                 single = True
                 Tot_M = size.astype('float64') * ms_u
+                del ms_u
             else:
                 single = False
                 del ms_u
@@ -1386,8 +1400,9 @@
         yt_counters("max radius")
         yt_counters("Precomp.")
         self.__max_memory()
+        del select, loc, subchain, CoM_M, Tot_M, size, max_radius
         if calc:
-            del loc, subchain, CoM_M, Tot_M, c_vec, max_radius, select
+            del c_vec
             del sort_subchain, uniq_subchain, diff_subchain, marks, dist, sort
             del rad, com
 
@@ -1408,7 +1423,7 @@
         yt_counters("chainHOP_tags_dens")
         chainHOP_tags_dens()
         yt_counters("chainHOP_tags_dens")
-        self.density = fKD.dens
+        self.density = fKD.dens.copy()
         # Now each particle has NNtags, and a local self density.
         # Let's find densest NN
         mylog.info('Finding densest nearest neighbors...')



More information about the yt-svn mailing list