[yt-svn] commit/yt: ngoldbaum: Merged in brittonsmith/yt (pull request #1665)

commits-noreply at bitbucket.org commits-noreply at bitbucket.org
Thu Aug 20 09:16:22 PDT 2015


1 new commit in yt:

https://bitbucket.org/yt_analysis/yt/commits/1e76952e015d/
Changeset:   1e76952e015d
Branch:      yt
User:        ngoldbaum
Date:        2015-08-20 16:16:10+00:00
Summary:     Merged in brittonsmith/yt (pull request #1665)

Multi-level parallelism for absorption spectrum generator
Affected #:  2 files

diff -r e0b16d0403a7812a8a302f87327ab0fe5ff07728 -r 1e76952e015dfb14db5d97333c182edc658d4aaa doc/source/analyzing/analysis_modules/absorption_spectrum.rst
--- a/doc/source/analyzing/analysis_modules/absorption_spectrum.rst
+++ b/doc/source/analyzing/analysis_modules/absorption_spectrum.rst
@@ -109,6 +109,16 @@
 
 .. note:: To write out a fits file, you must install the `astropy <http://www.astropy.org>`_ python library in order to access the astropy.io.fits module.  You can usually do this by simply running `pip install astropy` at the command line.
 
+Generating Spectra in Parallel
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The spectrum generator can be run in parallel simply by following the procedures 
+laid out in :ref:`parallel-computation` for running yt scripts in parallel.  
+Spectrum generation is parallelized using a multi-level strategy where each 
+absorption line is deposited by a different processor.  If the number of available 
+processors is greater than the number of lines, then the deposition of 
+individual lines will be divided over multiple processors.
+
 Fitting an Absorption Spectrum
 ------------------------------
 

diff -r e0b16d0403a7812a8a302f87327ab0fe5ff07728 -r 1e76952e015dfb14db5d97333c182edc658d4aaa yt/analysis_modules/absorption_spectrum/absorption_spectrum.py
--- a/yt/analysis_modules/absorption_spectrum/absorption_spectrum.py
+++ b/yt/analysis_modules/absorption_spectrum/absorption_spectrum.py
@@ -26,6 +26,10 @@
     boltzmann_constant_cgs, \
     speed_of_light_cgs
 from yt.utilities.on_demand_imports import _astropy
+from yt.utilities.parallel_tools.parallel_analysis_interface import \
+    _get_comm, \
+    parallel_objects, \
+    parallel_root_only
 
 pyfits = _astropy.pyfits
 
@@ -108,9 +112,9 @@
                                     'normalization': normalization,
                                     'index': index})
 
-    def make_spectrum(self, input_file, output_file='spectrum.h5',
-                      line_list_file='lines.txt',
-                      use_peculiar_velocity=True):
+    def make_spectrum(self, input_file, output_file="spectrum.h5",
+                      line_list_file="lines.txt",
+                      use_peculiar_velocity=True, njobs="auto"):
         """
         Make spectrum from ray data using the line list.
 
@@ -119,11 +123,35 @@
 
         input_file : string
            path to input ray data.
-        output_file : string
-           path for output file.  File formats are chosen based on the filename extension.
-           ``.h5`` for hdf5, ``.fits`` for fits, and everything else is ASCII.
-        use_peculiar_velocity : bool
+        output_file : optional, string
+           path for output file.  File formats are chosen based on the 
+           filename extension.  ``.h5`` for hdf5, ``.fits`` for fits, 
+           and everything else is ASCII.
+           Default: "spectrum.h5"
+        line_list_file : optional, string
+           path to file in which the list of all deposited lines 
+           will be saved.  If set to None, the line list will not 
+           be saved.  Note, when running in parallel, combining the 
+           line lists can be quite slow, so it is recommended to set 
+           this to None when running in parallel unless you really
+           want them.
+           Default: "lines.txt"
+        use_peculiar_velocity : optional, bool
            if True, include line of sight velocity for shifting lines.
+           Default: True
+        njobs : optional, int or "auto"
+           the number of process groups into which the loop over
+           absorption lines will be divided.  If set to -1, each 
+           absorption line will be deposited by exactly one processor.
+           If njobs is set to a value less than the total number of 
+           available processors (N), then the deposition of an 
+           individual line will be parallelized over (N / njobs)
+           processors.  If set to "auto", it will first try to 
+           parallelize over the list of lines and only parallelize 
+           the line deposition if there are more processors than
+           lines.  This is the optimal strategy for parallelizing 
+           spectrum generation.
+           Default: "auto"
         """
 
         input_fields = ['dl', 'redshift', 'temperature']
@@ -145,7 +173,12 @@
         self.tau_field = np.zeros(self.lambda_bins.size)
         self.spectrum_line_list = []
 
-        self._add_lines_to_spectrum(field_data, use_peculiar_velocity)
+        if njobs == "auto":
+            comm = _get_comm(())
+            njobs = min(comm.size, len(self.line_list))
+        
+        self._add_lines_to_spectrum(field_data, use_peculiar_velocity,
+                                    line_list_file is not None, njobs=njobs)
         self._add_continua_to_spectrum(field_data, use_peculiar_velocity)
 
         self.flux_field = np.exp(-self.tau_field)
@@ -156,7 +189,8 @@
             self._write_spectrum_fits(output_file)
         else:
             self._write_spectrum_ascii(output_file)
-        self._write_spectrum_line_list(line_list_file)
+        if line_list_file is not None:
+            self._write_spectrum_line_list(line_list_file)
 
         del field_data
         return (self.lambda_bins, self.flux_field)
@@ -196,7 +230,8 @@
                 pbar.update(i)
             pbar.finish()
 
-    def _add_lines_to_spectrum(self, field_data, use_peculiar_velocity):
+    def _add_lines_to_spectrum(self, field_data, use_peculiar_velocity,
+                               save_line_list, njobs=-1):
         """
         Add the absorption lines to the spectrum.
         """
@@ -205,7 +240,7 @@
         # Widen wavelength window until optical depth reaches a max value at the ends.
         max_tau = 0.001
 
-        for line in self.line_list:
+        for line in parallel_objects(self.line_list, njobs=njobs):
             column_density = field_data[line['field_name']] * field_data['dl']
             delta_lambda = line['wavelength'] * field_data['redshift']
             if use_peculiar_velocity:
@@ -238,7 +273,7 @@
                                    (right_index - left_index > 1))[0]
             pbar = get_pbar("Adding line - %s [%f A]: " % (line['label'], line['wavelength']),
                             valid_lines.size)
-            for i, lixel in enumerate(valid_lines):
+            for i, lixel in parallel_objects(enumerate(valid_lines), njobs=-1):
                 my_bin_ratio = spectrum_bin_ratio
                 while True:
                     lambda_bins, line_tau = \
@@ -261,7 +296,7 @@
                                           my_bin_ratio *
                                           width_ratio[lixel]).astype(int).clip(0, self.n_lambda)
                 self.tau_field[left_index[lixel]:right_index[lixel]] += line_tau
-                if line['label_threshold'] is not None and \
+                if save_line_list and line['label_threshold'] is not None and \
                         column_density[lixel] >= line['label_threshold']:
                     if use_peculiar_velocity:
                         peculiar_velocity = field_data['velocity_los'][lixel].in_units("km/s")
@@ -280,6 +315,13 @@
             del column_density, delta_lambda, thermal_b, \
                 center_bins, width_ratio, left_index, right_index
 
+        comm = _get_comm(())
+        self.tau_field = comm.mpi_allreduce(self.tau_field, op="sum")
+        if save_line_list:
+            self.spectrum_line_list = comm.par_combine_object(
+                self.spectrum_line_list, "cat", datatype="list")
+
+    @parallel_root_only
     def _write_spectrum_line_list(self, filename):
         """
         Write out list of spectral lines.
@@ -295,6 +337,7 @@
                                                 line['redshift'], line['v_pec']))
         f.close()
 
+    @parallel_root_only
     def _write_spectrum_ascii(self, filename):
         """
         Write spectrum to an ascii file.
@@ -307,6 +350,7 @@
                                     self.tau_field[i], self.flux_field[i]))
         f.close()
 
+    @parallel_root_only
     def _write_spectrum_fits(self, filename):
         """
         Write spectrum to a fits file.
@@ -318,6 +362,7 @@
         tbhdu = pyfits.BinTableHDU.from_columns(cols)
         tbhdu.writeto(filename, clobber=True)
 
+    @parallel_root_only
     def _write_spectrum_hdf5(self, filename):
         """
         Write spectrum to an hdf5 file.

Repository URL: https://bitbucket.org/yt_analysis/yt/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.


More information about the yt-svn mailing list