Source code for savu.core.transports.dosna_transport

# Copyright 2015 Diamond Light Source Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
.. module:: dosna_transport
   :platform: Unix
   :synopsis: Transports data using DosNa(which provides several storing \
   backends) at the beginning and end of the process list respectively.

.. moduleauthor:: Emilio Perez Juarez <scientificsoftware@diamond.ac.uk>

"""

import logging

from savu.core.transport_setup import MPI_setup
from savu.core.transports.base_transport import BaseTransport
from savu.core.transports.hdf5_transport import Hdf5Transport
from savu.data.chunking import Chunking
from savu.plugins.savers.utils.hdf5_utils import Hdf5Utils

import dosna as dn

log = logging.getLogger(__name__)

DEFAULT_CONNECTION = "savu-data"
DEFAULT_BACKEND = "ceph"
DEFAULT_ENGINE = "mpi"

[docs]class DosnaTransport(BaseTransport): """Transport implementation to use DosNa for managing storage and chunking""" def __init__(self): super(DosnaTransport, self).__init__() self.dosna_connection = None self.global_data = True self.h5trans = Hdf5Transport() self.data_flow = None self.count = 0 self.hdf5 = None self.hdf5_flag = True self.files = [] self.final_dict = None self.dataset_cache = [] self.n_plugins = 0 def _transport_initialise(self, options): MPI_setup(options) backend = options.get("dosna_backend") or DEFAULT_BACKEND engine = options.get("dosna_engine") or DEFAULT_ENGINE dosna_connection_name = options.get("dosna_connection") \ or DEFAULT_CONNECTION dosna_connection_options = options.get("dosna_connection_options") dosna_options = {} dosna_options.update(dict(item.split('=') for item in dosna_connection_options)) log.debug("DosNa is using backend %s engine %s and options %s", backend, engine, dosna_options) dn.use(engine, backend) self.dosna_connection = dn.Connection(dosna_connection_name, **dosna_options) self.dosna_connection.connect() # initially reading from a hdf5 file so Hdf5TransportData will be used # for all datasets created in a loader options['transport'] = 'hdf5' def _transport_update_plugin_list(self): plugin_list = self.exp.meta_data.plugin_list saver_idx = plugin_list._get_savers_index() remove = [] # check the saver plugin and turn off if it is hdf5 for idx in saver_idx: if plugin_list.plugin_list[idx]['name'] == 'Hdf5Saver': remove.append(idx) for idx in sorted(remove, reverse=True): plugin_list._remove(idx) def _transport_pre_plugin_list_run(self): # loaders have completed now revert back to DosnaTransport, so any # output datasets created by a plugin will use this. self.hdf5 = Hdf5Utils(self.exp) exp_coll = self.exp._get_collection() self.data_flow = self.exp.meta_data.plugin_list._get_dataset_flow() #self.exp.meta_data.set('transport', 'dosna') plist = self.exp.meta_data.plugin_list self.n_plugins = plist._get_n_processing_plugins() self.final_dict = plist.plugin_list[-1] for plugin_index in range(self.n_plugins): self.exp._set_experiment_for_current_plugin(plugin_index) self.files.append( self._get_filenames(exp_coll['plugin_dict'][plugin_index])) self._set_file_details(self.files[plugin_index]) self._setup_dosna_objects() # creates the dosna objects if self.n_plugins != 1: self.exp.meta_data.set('transport', 'dosna') def _transport_post_plugin_list_run(self): if not self.dosna_connection: return for dataset in self.dataset_cache: self.dosna_connection.del_dataset(dataset.name) self.dataset_cache = [] self.dosna_connection.disconnect() self.dosna_connection = None def _transport_terminate_dataset(self, data): if self.exp.meta_data.get('transport') == "hdf5": self.hdf5._close_file(data) @staticmethod def _extract_digits(data): result = [] for char in data: if ord(char) in range(ord('0'), ord('9') + 1): result.append(char) return "".join(result) def _create_dosna_dataset(self, object_id, data, key, current_and_next): group_name = self.exp.meta_data.get(["group_name", key]) data.data_info.set('group_name', group_name) try: group_name = group_name + '_' + data.name except AttributeError: pass shape = data.get_shape() dataset_name = "{}_{}".format(group_name, self._extract_digits(object_id)) if current_and_next == 0: data.data = self.dosna_connection.create_dataset(dataset_name, shape, data.dtype) else: chunking = Chunking(self.exp, current_and_next) chunks = chunking._calculate_chunking(shape, data.dtype) data.data = self.dosna_connection.create_dataset(dataset_name, shape, data.dtype, chunk_size=chunks) self.dataset_cache.append(data.data) def _setup_dosna_objects(self): out_data_dict = self.exp.index["out_data"] current_and_next = [0]*len(out_data_dict) if 'current_and_next' in self.exp.meta_data.get_dictionary(): current_and_next = self.exp.meta_data.get('current_and_next') for key in list(out_data_dict.keys()): out_data = out_data_dict[key] filename = self.exp.meta_data.get(["filename", key]) self._create_dosna_dataset(filename, out_data, key, current_and_next[key]) def _transport_pre_plugin(self): if self.count == self.n_plugins - 1: self.__set_hdf5_transport() def _transport_post_plugin(self): # revert back to basic if a temporary transport mechanism was used if self.hdf5_flag: self.__unset_hdf5_transport() if self.count == self.n_plugins - 2: self.exp.meta_data.set('transport', 'dosna') if self.count == self.n_plugins - 1: # final plugin self.h5trans.exp = self.exp self.h5trans.hdf5 = Hdf5Utils(self.exp) self.h5trans._transport_post_plugin() self.count += 1 def __set_hdf5_transport(self): self.hdf5_flag = True self.exp.meta_data.set('transport', 'hdf5') files = self._get_filenames(self.final_dict) self._set_file_details(files) self._setup_h5_files() def __unset_hdf5_transport(self): self.exp.meta_data.set('transport', 'basic') self.hdf5_flag = False