[yt-svn] commit/yt: xarthisius: Merged in MatthewTurk/yt (pull request #1350)

commits-noreply at bitbucket.org commits-noreply at bitbucket.org
Sat Jan 3 21:17:45 PST 2015


1 new commit in yt:

https://bitbucket.org/yt_analysis/yt/commits/20d4fcc40283/
Changeset:   20d4fcc40283
Branch:      yt
User:        xarthisius
Date:        2015-01-04 05:17:36+00:00
Summary:     Merged in MatthewTurk/yt (pull request #1350)

Cache data for grid frontends in IO chunking
Affected #:  7 files

diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/data_objects/construction_data_containers.py
--- a/yt/data_objects/construction_data_containers.py
+++ b/yt/data_objects/construction_data_containers.py
@@ -470,7 +470,7 @@
         self.domain_width = np.rint((self.ds.domain_right_edge -
                     self.ds.domain_left_edge)/self.dds).astype('int64')
         self._setup_data_source()
-                
+
     @property
     def icoords(self):
         ic = np.indices(self.ActiveDimensions).astype("int64")
@@ -950,14 +950,18 @@
         mylog.info("Extracting (sampling: %s)" % (fields,))
         verts = []
         samples = []
-        for block, mask in parallel_objects(self.data_source.blocks):
-            my_verts = self._extract_isocontours_from_grid(
-                            block, self.surface_field, self.field_value,
-                            mask, fields, sample_type)
-            if fields is not None:
-                my_verts, svals = my_verts
-                samples.append(svals)
-            verts.append(my_verts)
+        deps = self._determine_fields(self.surface_field)
+        deps = self._identify_dependencies(deps, spatial=True)
+        for io_chunk in parallel_objects(self.data_source.chunks(deps, "io",
+                                         preload_fields = deps)):
+            for block, mask in self.data_source.blocks:
+                my_verts = self._extract_isocontours_from_grid(
+                                block, self.surface_field, self.field_value,
+                                mask, fields, sample_type)
+                if fields is not None:
+                    my_verts, svals = my_verts
+                    samples.append(svals)
+                verts.append(my_verts)
         verts = np.concatenate(verts).transpose()
         verts = self.comm.par_combine_object(verts, op='cat', datatype='array')
         self.vertices = verts
@@ -1040,21 +1044,27 @@
         """
         flux = 0.0
         mylog.info("Fluxing %s", fluxing_field)
-        for block, mask in parallel_objects(self.data_source.blocks):
-            flux += self._calculate_flux_in_grid(block, mask,
-                    field_x, field_y, field_z, fluxing_field)
+        deps = [field_x, field_y, field_z]
+        if fluxing_field is not None: deps.append(fluxing_field)
+        deps = self._determine_fields(deps)
+        deps = self._identify_dependencies(deps)
+        for io_chunk in parallel_objects(self.data_source.chunks(deps, "io",
+                                preload_fields = deps)):
+            for block, mask in self.data_source.blocks:
+                flux += self._calculate_flux_in_grid(block, mask,
+                        field_x, field_y, field_z, fluxing_field)
         flux = self.comm.mpi_allreduce(flux, op="sum")
         return flux
 
     def _calculate_flux_in_grid(self, grid, mask,
-                    field_x, field_y, field_z, fluxing_field = None):
+            field_x, field_y, field_z, fluxing_field = None):
         vals = grid.get_vertex_centered_data(self.surface_field)
         if fluxing_field is None:
             ff = np.ones(vals.shape, dtype="float64")
         else:
             ff = grid.get_vertex_centered_data(fluxing_field)
-        xv, yv, zv = [grid.get_vertex_centered_data(f) for f in 
-                     [field_x, field_y, field_z]]
+        xv, yv, zv = [grid.get_vertex_centered_data(f)
+                      for f in [field_x, field_y, field_z]]
         return march_cubes_grid_flux(self.field_value, vals, xv, yv, zv,
                     ff, mask, grid.LeftEdge, grid.dds)
 

diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/data_objects/data_containers.py
--- a/yt/data_objects/data_containers.py
+++ b/yt/data_objects/data_containers.py
@@ -1019,16 +1019,13 @@
         """
         verts = []
         samples = []
-        pb = get_pbar("Extracting ", len(list(self._get_grid_objs())))
-        for i, g in enumerate(self._get_grid_objs()):
-            pb.update(i)
+        for block, mask in self.blocks:
             my_verts = self._extract_isocontours_from_grid(
-                            g, field, value, sample_values)
+                block, mask, field, value, sample_values)
             if sample_values is not None:
                 my_verts, svals = my_verts
                 samples.append(svals)
             verts.append(my_verts)
-        pb.finish()
         verts = np.concatenate(verts).transpose()
         verts = self.comm.par_combine_object(verts, op='cat', datatype='array')
         verts = verts.transpose()
@@ -1052,11 +1049,9 @@
             return verts, samples
         return verts
 
-
-    def _extract_isocontours_from_grid(self, grid, field, value,
-                                       sample_values = None):
-        mask = self._get_cut_mask(grid) * grid.child_mask
-        vals = grid.get_vertex_centered_data(field, no_ghost = False)
+    def _extract_isocontours_from_grid(self, grid, mask, field, value,
+                                       sample_values=None):
+        vals = grid.get_vertex_centered_data(field, no_ghost=False)
         if sample_values is not None:
             svals = grid.get_vertex_centered_data(sample_values)
         else:
@@ -1130,15 +1125,14 @@
         ...     "velocity_x", "velocity_y", "velocity_z", "Metal_Density")
         """
         flux = 0.0
-        for g in self._get_grid_objs():
-            flux += self._calculate_flux_in_grid(g, field, value,
-                    field_x, field_y, field_z, fluxing_field)
+        for block, mask in self.blocks:
+            flux += self._calculate_flux_in_grid(block, mask, field, value, field_x,
+                                                 field_y, field_z, fluxing_field)
         flux = self.comm.mpi_allreduce(flux, op="sum")
         return flux
 
-    def _calculate_flux_in_grid(self, grid, field, value,
+    def _calculate_flux_in_grid(self, grid, mask, field, value,
                     field_x, field_y, field_z, fluxing_field = None):
-        mask = self._get_cut_mask(grid) * grid.child_mask
         vals = grid.get_vertex_centered_data(field)
         if fluxing_field is None:
             ff = np.ones(vals.shape, dtype="float64")

diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/frontends/enzo/io.py
--- a/yt/frontends/enzo/io.py
+++ b/yt/frontends/enzo/io.py
@@ -14,6 +14,8 @@
 #-----------------------------------------------------------------------------
 
 import os
+import random
+from contextlib import contextmanager
 
 from yt.utilities.io_handler import \
     BaseIOHandler, _axis_ids
@@ -130,12 +132,25 @@
                 raise RuntimeError
             g = chunks[0].objs[0]
             f = h5py.File(g.filename.encode('ascii'), 'r')
+            if g.id in self._cached_fields:
+                gf = self._cached_fields[g.id]
+                rv.update(gf)
+            if len(rv) == len(fields): return rv
             gds = f.get("/Grid%08i" % g.id)
-            for ftype, fname in fields:
+            for field in fields:
+                if field in rv:
+                    self._hits += 1
+                    continue
+                self._misses += 1
+                ftype, fname = field
                 if fname in gds:
                     rv[(ftype, fname)] = gds.get(fname).value.swapaxes(0,2)
                 else:
                     rv[(ftype, fname)] = np.zeros(g.ActiveDimensions)
+            if self._cache_on:
+                for gid in rv:
+                    self._cached_fields.setdefault(gid, {})
+                    self._cached_fields[gid].update(rv[gid])
             f.close()
             return rv
         if size is None:
@@ -155,10 +170,16 @@
                 if g.filename is None: continue
                 if fid is None:
                     fid = h5py.h5f.open(g.filename.encode('ascii'), h5py.h5f.ACC_RDONLY)
+                gf = self._cached_fields.get(g.id, {})
                 data = np.empty(g.ActiveDimensions[::-1], dtype="float64")
                 data_view = data.swapaxes(0,2)
                 nd = 0
                 for field in fields:
+                    if field in gf:
+                        nd = g.select(selector, gf[field], rv[field], ind)
+                        self._hits += 1
+                        continue
+                    self._misses += 1
                     ftype, fname = field
                     try:
                         node = "/Grid%08i/%s" % (g.id, fname)
@@ -167,11 +188,52 @@
                         if fname == "Dark_Matter_Density": continue
                         raise
                     dg.read(h5py.h5s.ALL, h5py.h5s.ALL, data)
+                    if self._cache_on:
+                        self._cached_fields.setdefault(g.id, {})
+                        # Copy because it's a view into an empty temp array
+                        self._cached_fields[g.id][field] = data_view.copy()
                     nd = g.select(selector, data_view, rv[field], ind) # caches
                 ind += nd
             if fid: fid.close()
         return rv
 
+    @contextmanager
+    def preload(self, chunk, fields, max_size):
+        if len(fields) == 0:
+            yield self
+            return
+        old_cache_on = self._cache_on
+        old_cached_fields = self._cached_fields
+        self._cached_fields = cf = {}
+        self._cache_on = True
+        for gid in old_cached_fields:
+            # Will not copy numpy arrays, which is good!
+            cf[gid] = old_cached_fields[gid].copy() 
+        self._hits = self._misses = 0
+        self._cached_fields = self._read_chunk_data(chunk, fields)
+        mylog.debug("(1st) Hits = % 10i Misses = % 10i",
+            self._hits, self._misses)
+        self._hits = self._misses = 0
+        yield self
+        mylog.debug("(2nd) Hits = % 10i Misses = % 10i",
+            self._hits, self._misses)
+        self._cached_fields = old_cached_fields
+        self._cache_on = old_cache_on
+        # Randomly remove some grids from the cache.  Note that we're doing
+        # this on a grid basis, not a field basis.  Performance will be
+        # slightly non-deterministic as a result of this, but it should roughly
+        # be statistically alright, assuming (as we do) that this will get
+        # called during largely unbalanced stuff.
+        if len(self._cached_fields) > max_size:
+            to_remove = random.sample(self._cached_fields.keys(),
+                len(self._cached_fields) - max_size)
+            mylog.debug("Purging from cache %s", len(to_remove))
+            for k in to_remove:
+                self._cached_fields.pop(k)
+        else:
+            mylog.warning("Cache size % 10i (max % 10i)",
+                len(self._cached_fields), max_size)
+
     def _read_chunk_data(self, chunk, fields):
         fid = fn = None
         rv = {}
@@ -190,6 +252,8 @@
         if len(fluid_fields) == 0: return rv
         for g in chunk.objs:
             rv[g.id] = gf = {}
+            if g.id in self._cached_fields:
+                rv[g.id].update(self._cached_fields[g.id])
             if g.filename is None: continue
             elif g.filename != fn:
                 if fid is not None: fid.close()
@@ -200,6 +264,10 @@
             data = np.empty(g.ActiveDimensions[::-1], dtype="float64")
             data_view = data.swapaxes(0,2)
             for field in fluid_fields:
+                if field in gf:
+                    self._hits += 1
+                    continue
+                self._misses += 1
                 ftype, fname = field
                 try:
                     node = "/Grid%08i/%s" % (g.id, fname)
@@ -210,6 +278,10 @@
                 dg.read(h5py.h5s.ALL, h5py.h5s.ALL, data)
                 gf[field] = data_view.copy()
         if fid: fid.close()
+        if self._cache_on:
+            for gid in rv:
+                self._cached_fields.setdefault(gid, {})
+                self._cached_fields[gid].update(rv[gid])
         return rv
 
 class IOHandlerPackedHDF5GhostZones(IOHandlerPackedHDF5):

diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/geometry/geometry_handler.py
--- a/yt/geometry/geometry_handler.py
+++ b/yt/geometry/geometry_handler.py
@@ -252,7 +252,6 @@
             chunk_size)
         return fields_to_return, fields_to_generate
 
-
     def _chunk(self, dobj, chunking_style, ngz = 0, **kwargs):
         # A chunk is either None or (grids, size)
         if dobj._current_chunk is None:

diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/geometry/grid_geometry_handler.py
--- a/yt/geometry/grid_geometry_handler.py
+++ b/yt/geometry/grid_geometry_handler.py
@@ -323,9 +323,13 @@
             yield YTDataChunk(dobj, "spatial", [g], size, cache = False)
 
     _grid_chunksize = 1000
-    def _chunk_io(self, dobj, cache = True, local_only = False):
+    def _chunk_io(self, dobj, cache = True, local_only = False,
+                  preload_fields = None):
         # local_only is only useful for inline datasets and requires
         # implementation by subclasses.
+        if preload_fields is None:
+            preload_fields = []
+        preload_fields, _ = self._split_fields(preload_fields)
         gfiles = defaultdict(list)
         gobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info)
         for g in gobjs:
@@ -338,7 +342,10 @@
             
             for grids in (gs[pos:pos + size] for pos
                           in xrange(0, len(gs), size)):
-                yield YTDataChunk(dobj, "io", grids,
+                dc = YTDataChunk(dobj, "io", grids,
                         self._count_selection(dobj, grids),
                         cache = cache)
-
+                # We allow four full chunks to be included.
+                with self.io.preload(dc, preload_fields, 
+                            4.0 * self._grid_chunksize):
+                    yield dc

diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/utilities/io_handler.py
--- a/yt/utilities/io_handler.py
+++ b/yt/utilities/io_handler.py
@@ -14,6 +14,7 @@
 #-----------------------------------------------------------------------------
 
 from collections import defaultdict
+from contextlib import contextmanager
 
 from yt.funcs import mylog
 import cPickle
@@ -37,6 +38,9 @@
     _vector_fields = ()
     _dataset_type = None
     _particle_reader = False
+    _cache_on = False
+    _misses = 0
+    _hits = 0
 
     def __init__(self, ds):
         self.queue = defaultdict(dict)
@@ -44,12 +48,14 @@
         self._last_selector_id = None
         self._last_selector_counts = None
         self._array_fields = {}
+        self._cached_fields = {}
 
     # We need a function for reading a list of sets
     # and a function for *popping* from a queue all the appropriate sets
 
-    def preload(self, grids, sets):
-        pass
+    @contextmanager
+    def preload(self, chunk, fields, max_size):
+        yield self
 
     def pop(self, grid, field):
         if grid.id in self.queue and field in self.queue[grid.id]:

diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -814,12 +814,8 @@
             return data
 
     def preload(self, grids, fields, io_handler):
-        # This will preload if it detects we are parallel capable and
-        # if so, we load *everything* that we need.  Use with some care.
-        if len(fields) == 0: return
-        mylog.debug("Preloading %s from %s grids", fields, len(grids))
-        if not self._distributed: return
-        io_handler.preload(grids, fields)
+        # This is non-functional.
+        return
 
     @parallel_passthrough
     def mpi_allreduce(self, data, dtype=None, op='sum'):

Repository URL: https://bitbucket.org/yt_analysis/yt/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.



More information about the yt-svn mailing list