Source code for savu.core.transports.hdf5_transport

# Copyright 2015 Diamond Light Source Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
.. module:: hdf5_transport
   :platform: Unix
   :synopsis: Transport specific plugin list runner, passes the data to and \
       from the plugin.

.. moduleauthor:: Mark Basham <scientificsoftware@diamond.ac.uk>

"""

import os
import logging

from savu.core.transport_setup import MPI_setup
from savu.plugins.savers.utils.hdf5_utils import Hdf5Utils
from savu.core.transports.base_transport import BaseTransport
from savu.core.iterate_plugin_group_utils import check_if_in_iterative_loop, \
    check_if_end_plugin_in_iterate_group


[docs]class Hdf5Transport(BaseTransport): def __init__(self): super(Hdf5Transport, self).__init__() os.environ['savu_mode'] = 'hdf5' self.count = 0 def _transport_initialise(self, options): MPI_setup(options) self.exp_coll = None self.data_flow = [] self.files = [] def _transport_update_plugin_list(self): plugin_list = self.exp.meta_data.plugin_list saver_idx = plugin_list._get_savers_index() remove = [] # check the saver plugin and turn off if it is hdf5 for idx in saver_idx: # This saver id is not a location in the plugin list if idx <= len(plugin_list.plugin_list): if plugin_list.plugin_list[idx]['name'] == 'Hdf5Saver': remove.append(idx) for idx in sorted(remove, reverse=True): plugin_list._remove(idx) def _transport_pre_plugin_list_run(self): # run through the experiment (no processing) and create output files self.hdf5 = Hdf5Utils(self.exp) self.exp_coll = self.exp._get_collection() self.data_flow = self.exp.meta_data.plugin_list._get_dataset_flow() n_plugins = list(range(len(self.exp_coll['datasets']))) for i in n_plugins: self.exp._set_experiment_for_current_plugin(i) self.files.append( self._get_filenames(self.exp_coll['plugin_dict'][i])) self._set_file_details(self.files[i]) self._setup_h5_files() # creates the hdf5 files def _transport_pre_plugin(self): count = self.exp.meta_data.get('nPlugin') self._set_file_details(self.files[count]) def _transport_post_plugin(self): iterate_group = check_if_in_iterative_loop(self.exp) for data in list(self.exp.index['out_data'].values()): if not data.remove: msg = self.__class__.__name__ + "_transport_post_plugin." self.exp._barrier(msg=msg) if self.exp.meta_data.get('process') == \ len(self.exp.meta_data.get('processes'))-1: if self.exp.meta_data.get("pre_run") == True: self._populate_pre_run_nexus_file(data) else: self._populate_nexus_file(data, iterate_group=iterate_group) if iterate_group is None: # currently not in an iterative loop, link output h5 # file as normal self.hdf5._link_datafile_to_nexus_file(data) else: self._transport_post_plugin_in_iterative_loop(data, iterate_group) self.exp._barrier(msg=msg) if iterate_group is not None: # reopen file with write permissiosn still present self.hdf5._reopen_file(data, 'r+') else: # reopen file as read-only self.hdf5._reopen_file(data, 'r') def _transport_post_plugin_in_iterative_loop(self, data, iterate_group): """ Handle the transport-related tasks after a plugin in an iterative loop has finished running """ is_end_plugin = check_if_end_plugin_in_iterate_group(self.exp) is_odd_iterations = iterate_group._ip_fixed_iterations % 2 == 1 is_clone_data = 'clone' in data.get_name(orig=False) if is_end_plugin: if iterate_group._ip_iteration == 0: if is_odd_iterations and is_clone_data: info_msg = f"Not linking clone data " \ f"{data.get_name(orig=False)}, as an odd number " \ f"of iterations" logging.debug(info_msg) elif not is_odd_iterations and not is_clone_data: info_msg = f"Not linking original data " \ f"{data.get_name(orig=False)}, as an even number " \ f"of iterations" logging.debug(info_msg) else: # link output h5 file as normal self.hdf5._link_datafile_to_nexus_file(data, iterate_group=iterate_group) elif iterate_group._ip_iteration > 0: # don't link output h5 file, because it has already been linked # when iteration 0 was completed info_msg = f"Not linking intermediate h5 file, on iteration " \ f"{iterate_group._ip_iteration}" logging.debug(info_msg) else: # can link output of non-end plugins more simply if iterate_group._ip_iteration == 0: # link output h5 file as normal self.hdf5._link_datafile_to_nexus_file(data, iterate_group=iterate_group) elif iterate_group._ip_iteration > 0: # don't link output h5 file, because it has already been linked # when iteration 0 was completed info_msg = f"Not linking intermediate h5 file, on iteration " \ f"{iterate_group._ip_iteration}" logging.debug(info_msg) def _transport_terminate_dataset(self, data): self.hdf5._close_file(data) def _transport_checkpoint(self): """ The framework has determined it is time to checkpoint. What should this transport mechanism do?""" cp = self.exp.checkpoint with self.hdf5._open_backing_h5(cp._file, 'a', mpi=False) as f: self._metadata_dump(f, 'in_data') self._metadata_dump(f, 'out_data') def _metadata_dump(self, f, gname): if gname in list(f.keys()): del f[gname] for data in list(self.exp.index[gname].values()): name = data.get_name() entry = f.require_group(gname + '/' + name) self._output_metadata(data, entry, name, dump=True) def _transport_kill_signal(self): """ An opportunity to send a kill signal to the framework. Return True or False. """ path = self.exp.meta_data.get('out_path') killsignal = os.path.join(path, 'killsignal') # jump to the end of the plugin run! if os.path.exists(killsignal): self.exp.meta_data.set('killsignal', True) logging.debug("***************** killsignal sent ****************") return True return False def _transport_cleanup(self, i): """ Any remaining cleanup after kill signal sent """ n_plugins = len(self.exp_coll['datasets']) for i in range(i, n_plugins): self.exp._set_experiment_for_current_plugin(i) for data in list(self.exp.index['out_data'].values()): self.hdf5._close_file(data)