[yt-svn] commit/yt: xarthisius: Merged in MatthewTurk/yt (pull request #1350)
commits-noreply at bitbucket.org
commits-noreply at bitbucket.org
Sat Jan 3 21:17:45 PST 2015
1 new commit in yt:
https://bitbucket.org/yt_analysis/yt/commits/20d4fcc40283/
Changeset: 20d4fcc40283
Branch: yt
User: xarthisius
Date: 2015-01-04 05:17:36+00:00
Summary: Merged in MatthewTurk/yt (pull request #1350)
Cache data for grid frontends in IO chunking
Affected #: 7 files
diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/data_objects/construction_data_containers.py
--- a/yt/data_objects/construction_data_containers.py
+++ b/yt/data_objects/construction_data_containers.py
@@ -470,7 +470,7 @@
self.domain_width = np.rint((self.ds.domain_right_edge -
self.ds.domain_left_edge)/self.dds).astype('int64')
self._setup_data_source()
-
+
@property
def icoords(self):
ic = np.indices(self.ActiveDimensions).astype("int64")
@@ -950,14 +950,18 @@
mylog.info("Extracting (sampling: %s)" % (fields,))
verts = []
samples = []
- for block, mask in parallel_objects(self.data_source.blocks):
- my_verts = self._extract_isocontours_from_grid(
- block, self.surface_field, self.field_value,
- mask, fields, sample_type)
- if fields is not None:
- my_verts, svals = my_verts
- samples.append(svals)
- verts.append(my_verts)
+ deps = self._determine_fields(self.surface_field)
+ deps = self._identify_dependencies(deps, spatial=True)
+ for io_chunk in parallel_objects(self.data_source.chunks(deps, "io",
+ preload_fields = deps)):
+ for block, mask in self.data_source.blocks:
+ my_verts = self._extract_isocontours_from_grid(
+ block, self.surface_field, self.field_value,
+ mask, fields, sample_type)
+ if fields is not None:
+ my_verts, svals = my_verts
+ samples.append(svals)
+ verts.append(my_verts)
verts = np.concatenate(verts).transpose()
verts = self.comm.par_combine_object(verts, op='cat', datatype='array')
self.vertices = verts
@@ -1040,21 +1044,27 @@
"""
flux = 0.0
mylog.info("Fluxing %s", fluxing_field)
- for block, mask in parallel_objects(self.data_source.blocks):
- flux += self._calculate_flux_in_grid(block, mask,
- field_x, field_y, field_z, fluxing_field)
+ deps = [field_x, field_y, field_z]
+ if fluxing_field is not None: deps.append(fluxing_field)
+ deps = self._determine_fields(deps)
+ deps = self._identify_dependencies(deps)
+ for io_chunk in parallel_objects(self.data_source.chunks(deps, "io",
+ preload_fields = deps)):
+ for block, mask in self.data_source.blocks:
+ flux += self._calculate_flux_in_grid(block, mask,
+ field_x, field_y, field_z, fluxing_field)
flux = self.comm.mpi_allreduce(flux, op="sum")
return flux
def _calculate_flux_in_grid(self, grid, mask,
- field_x, field_y, field_z, fluxing_field = None):
+ field_x, field_y, field_z, fluxing_field = None):
vals = grid.get_vertex_centered_data(self.surface_field)
if fluxing_field is None:
ff = np.ones(vals.shape, dtype="float64")
else:
ff = grid.get_vertex_centered_data(fluxing_field)
- xv, yv, zv = [grid.get_vertex_centered_data(f) for f in
- [field_x, field_y, field_z]]
+ xv, yv, zv = [grid.get_vertex_centered_data(f)
+ for f in [field_x, field_y, field_z]]
return march_cubes_grid_flux(self.field_value, vals, xv, yv, zv,
ff, mask, grid.LeftEdge, grid.dds)
diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/data_objects/data_containers.py
--- a/yt/data_objects/data_containers.py
+++ b/yt/data_objects/data_containers.py
@@ -1019,16 +1019,13 @@
"""
verts = []
samples = []
- pb = get_pbar("Extracting ", len(list(self._get_grid_objs())))
- for i, g in enumerate(self._get_grid_objs()):
- pb.update(i)
+ for block, mask in self.blocks:
my_verts = self._extract_isocontours_from_grid(
- g, field, value, sample_values)
+ block, mask, field, value, sample_values)
if sample_values is not None:
my_verts, svals = my_verts
samples.append(svals)
verts.append(my_verts)
- pb.finish()
verts = np.concatenate(verts).transpose()
verts = self.comm.par_combine_object(verts, op='cat', datatype='array')
verts = verts.transpose()
@@ -1052,11 +1049,9 @@
return verts, samples
return verts
-
- def _extract_isocontours_from_grid(self, grid, field, value,
- sample_values = None):
- mask = self._get_cut_mask(grid) * grid.child_mask
- vals = grid.get_vertex_centered_data(field, no_ghost = False)
+ def _extract_isocontours_from_grid(self, grid, mask, field, value,
+ sample_values=None):
+ vals = grid.get_vertex_centered_data(field, no_ghost=False)
if sample_values is not None:
svals = grid.get_vertex_centered_data(sample_values)
else:
@@ -1130,15 +1125,14 @@
... "velocity_x", "velocity_y", "velocity_z", "Metal_Density")
"""
flux = 0.0
- for g in self._get_grid_objs():
- flux += self._calculate_flux_in_grid(g, field, value,
- field_x, field_y, field_z, fluxing_field)
+ for block, mask in self.blocks:
+ flux += self._calculate_flux_in_grid(block, mask, field, value, field_x,
+ field_y, field_z, fluxing_field)
flux = self.comm.mpi_allreduce(flux, op="sum")
return flux
- def _calculate_flux_in_grid(self, grid, field, value,
+ def _calculate_flux_in_grid(self, grid, mask, field, value,
field_x, field_y, field_z, fluxing_field = None):
- mask = self._get_cut_mask(grid) * grid.child_mask
vals = grid.get_vertex_centered_data(field)
if fluxing_field is None:
ff = np.ones(vals.shape, dtype="float64")
diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/frontends/enzo/io.py
--- a/yt/frontends/enzo/io.py
+++ b/yt/frontends/enzo/io.py
@@ -14,6 +14,8 @@
#-----------------------------------------------------------------------------
import os
+import random
+from contextlib import contextmanager
from yt.utilities.io_handler import \
BaseIOHandler, _axis_ids
@@ -130,12 +132,25 @@
raise RuntimeError
g = chunks[0].objs[0]
f = h5py.File(g.filename.encode('ascii'), 'r')
+ if g.id in self._cached_fields:
+ gf = self._cached_fields[g.id]
+ rv.update(gf)
+ if len(rv) == len(fields): return rv
gds = f.get("/Grid%08i" % g.id)
- for ftype, fname in fields:
+ for field in fields:
+ if field in rv:
+ self._hits += 1
+ continue
+ self._misses += 1
+ ftype, fname = field
if fname in gds:
rv[(ftype, fname)] = gds.get(fname).value.swapaxes(0,2)
else:
rv[(ftype, fname)] = np.zeros(g.ActiveDimensions)
+ if self._cache_on:
+ for gid in rv:
+ self._cached_fields.setdefault(gid, {})
+ self._cached_fields[gid].update(rv[gid])
f.close()
return rv
if size is None:
@@ -155,10 +170,16 @@
if g.filename is None: continue
if fid is None:
fid = h5py.h5f.open(g.filename.encode('ascii'), h5py.h5f.ACC_RDONLY)
+ gf = self._cached_fields.get(g.id, {})
data = np.empty(g.ActiveDimensions[::-1], dtype="float64")
data_view = data.swapaxes(0,2)
nd = 0
for field in fields:
+ if field in gf:
+ nd = g.select(selector, gf[field], rv[field], ind)
+ self._hits += 1
+ continue
+ self._misses += 1
ftype, fname = field
try:
node = "/Grid%08i/%s" % (g.id, fname)
@@ -167,11 +188,52 @@
if fname == "Dark_Matter_Density": continue
raise
dg.read(h5py.h5s.ALL, h5py.h5s.ALL, data)
+ if self._cache_on:
+ self._cached_fields.setdefault(g.id, {})
+ # Copy because it's a view into an empty temp array
+ self._cached_fields[g.id][field] = data_view.copy()
nd = g.select(selector, data_view, rv[field], ind) # caches
ind += nd
if fid: fid.close()
return rv
+ @contextmanager
+ def preload(self, chunk, fields, max_size):
+ if len(fields) == 0:
+ yield self
+ return
+ old_cache_on = self._cache_on
+ old_cached_fields = self._cached_fields
+ self._cached_fields = cf = {}
+ self._cache_on = True
+ for gid in old_cached_fields:
+ # Will not copy numpy arrays, which is good!
+ cf[gid] = old_cached_fields[gid].copy()
+ self._hits = self._misses = 0
+ self._cached_fields = self._read_chunk_data(chunk, fields)
+ mylog.debug("(1st) Hits = % 10i Misses = % 10i",
+ self._hits, self._misses)
+ self._hits = self._misses = 0
+ yield self
+ mylog.debug("(2nd) Hits = % 10i Misses = % 10i",
+ self._hits, self._misses)
+ self._cached_fields = old_cached_fields
+ self._cache_on = old_cache_on
+ # Randomly remove some grids from the cache. Note that we're doing
+ # this on a grid basis, not a field basis. Performance will be
+ # slightly non-deterministic as a result of this, but it should roughly
+ # be statistically alright, assuming (as we do) that this will get
+ # called during largely unbalanced stuff.
+ if len(self._cached_fields) > max_size:
+ to_remove = random.sample(self._cached_fields.keys(),
+ len(self._cached_fields) - max_size)
+ mylog.debug("Purging from cache %s", len(to_remove))
+ for k in to_remove:
+ self._cached_fields.pop(k)
+ else:
+ mylog.warning("Cache size % 10i (max % 10i)",
+ len(self._cached_fields), max_size)
+
def _read_chunk_data(self, chunk, fields):
fid = fn = None
rv = {}
@@ -190,6 +252,8 @@
if len(fluid_fields) == 0: return rv
for g in chunk.objs:
rv[g.id] = gf = {}
+ if g.id in self._cached_fields:
+ rv[g.id].update(self._cached_fields[g.id])
if g.filename is None: continue
elif g.filename != fn:
if fid is not None: fid.close()
@@ -200,6 +264,10 @@
data = np.empty(g.ActiveDimensions[::-1], dtype="float64")
data_view = data.swapaxes(0,2)
for field in fluid_fields:
+ if field in gf:
+ self._hits += 1
+ continue
+ self._misses += 1
ftype, fname = field
try:
node = "/Grid%08i/%s" % (g.id, fname)
@@ -210,6 +278,10 @@
dg.read(h5py.h5s.ALL, h5py.h5s.ALL, data)
gf[field] = data_view.copy()
if fid: fid.close()
+ if self._cache_on:
+ for gid in rv:
+ self._cached_fields.setdefault(gid, {})
+ self._cached_fields[gid].update(rv[gid])
return rv
class IOHandlerPackedHDF5GhostZones(IOHandlerPackedHDF5):
diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/geometry/geometry_handler.py
--- a/yt/geometry/geometry_handler.py
+++ b/yt/geometry/geometry_handler.py
@@ -252,7 +252,6 @@
chunk_size)
return fields_to_return, fields_to_generate
-
def _chunk(self, dobj, chunking_style, ngz = 0, **kwargs):
# A chunk is either None or (grids, size)
if dobj._current_chunk is None:
diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/geometry/grid_geometry_handler.py
--- a/yt/geometry/grid_geometry_handler.py
+++ b/yt/geometry/grid_geometry_handler.py
@@ -323,9 +323,13 @@
yield YTDataChunk(dobj, "spatial", [g], size, cache = False)
_grid_chunksize = 1000
- def _chunk_io(self, dobj, cache = True, local_only = False):
+ def _chunk_io(self, dobj, cache = True, local_only = False,
+ preload_fields = None):
# local_only is only useful for inline datasets and requires
# implementation by subclasses.
+ if preload_fields is None:
+ preload_fields = []
+ preload_fields, _ = self._split_fields(preload_fields)
gfiles = defaultdict(list)
gobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info)
for g in gobjs:
@@ -338,7 +342,10 @@
for grids in (gs[pos:pos + size] for pos
in xrange(0, len(gs), size)):
- yield YTDataChunk(dobj, "io", grids,
+ dc = YTDataChunk(dobj, "io", grids,
self._count_selection(dobj, grids),
cache = cache)
-
+ # We allow four full chunks to be included.
+ with self.io.preload(dc, preload_fields,
+ 4.0 * self._grid_chunksize):
+ yield dc
diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/utilities/io_handler.py
--- a/yt/utilities/io_handler.py
+++ b/yt/utilities/io_handler.py
@@ -14,6 +14,7 @@
#-----------------------------------------------------------------------------
from collections import defaultdict
+from contextlib import contextmanager
from yt.funcs import mylog
import cPickle
@@ -37,6 +38,9 @@
_vector_fields = ()
_dataset_type = None
_particle_reader = False
+ _cache_on = False
+ _misses = 0
+ _hits = 0
def __init__(self, ds):
self.queue = defaultdict(dict)
@@ -44,12 +48,14 @@
self._last_selector_id = None
self._last_selector_counts = None
self._array_fields = {}
+ self._cached_fields = {}
# We need a function for reading a list of sets
# and a function for *popping* from a queue all the appropriate sets
- def preload(self, grids, sets):
- pass
+ @contextmanager
+ def preload(self, chunk, fields, max_size):
+ yield self
def pop(self, grid, field):
if grid.id in self.queue and field in self.queue[grid.id]:
diff -r ea716eba60f6a36dfee7274b9503822c70af0ffe -r 20d4fcc40283217c698762168f98fbe9eb2e29ec yt/utilities/parallel_tools/parallel_analysis_interface.py
--- a/yt/utilities/parallel_tools/parallel_analysis_interface.py
+++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py
@@ -814,12 +814,8 @@
return data
def preload(self, grids, fields, io_handler):
- # This will preload if it detects we are parallel capable and
- # if so, we load *everything* that we need. Use with some care.
- if len(fields) == 0: return
- mylog.debug("Preloading %s from %s grids", fields, len(grids))
- if not self._distributed: return
- io_handler.preload(grids, fields)
+ # This is non-functional.
+ return
@parallel_passthrough
def mpi_allreduce(self, data, dtype=None, op='sum'):
Repository URL: https://bitbucket.org/yt_analysis/yt/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
More information about the yt-svn
mailing list