Source code for plugins.loaders.full_field_loaders.nxtomo_loader

# Copyright 2014 Diamond Light Source Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
.. module:: nxtomo_loader
   :platform: Unix
   :synopsis: A class for loading standard tomography data in Nexus format.

.. moduleauthor:: Nicola Wadeson <scientificsoftware@diamond.ac.uk>

"""

import h5py
import logging
import numpy as np

from savu.plugins.loaders.base_loader import BaseLoader
from savu.plugins.utils import register_plugin
from savu.data.stats.statistics import Statistics

from savu.data.data_structures.data_types.data_plus_darks_and_flats \
    import ImageKey, NoImageKey


[docs]@register_plugin class NxtomoLoader(BaseLoader): """ """ def __init__(self, name='NxtomoLoader'): super(NxtomoLoader, self).__init__(name) self.warnings = []
[docs] def log_warning(self, msg): logging.warning(msg) self.warnings.append(msg)
[docs] def setup(self): exp = self.get_experiment() data_obj = exp.create_data_object('in_data', self.parameters['name']) data_obj.backing_file = self._get_data_file() data_obj.data = self._get_h5_entry( data_obj.backing_file, self.parameters['data_path']) exp.meta_data.set('data_path', self.parameters['data_path']) exp.meta_data.set('image_key_path', self.parameters['image_key_path']) self._setup_after_pre_run(data_obj) synthetic_path = f"{'/'.join(self.parameters['data_path'].split('/')[:-1])}/synthetic/synthetic" if synthetic_path in data_obj.backing_file: if data_obj.backing_file[synthetic_path][()] == True: self.exp.meta_data.set("synthetic", True) self._set_dark_and_flat(data_obj) self.nFrames = self.__get_nFrames(data_obj) if self.nFrames > 1: self.__setup_4d(data_obj) self.__setup_3d_to_4d(data_obj, self.nFrames) else: if len(data_obj.data.shape) == 3: self._setup_3d(data_obj) else: self.__setup_4d(data_obj) data_obj.set_original_shape(data_obj.data.shape) self._set_rotation_angles(data_obj) self._set_projection_shifts(data_obj) try: control = self._get_h5_path( data_obj.backing_file, 'entry1/tomo_entry/control/data') data_obj.meta_data.set("control", control[...]) except Exception: self.log_warning("No Control information available") nAngles = len(data_obj.meta_data.get('rotation_angle')) self.__check_angles(data_obj, nAngles) self.set_data_reduction_params(data_obj) data_obj.data._set_dark_and_flat()
def _setup_after_pre_run(self, data_obj): fsplit = self.exp.meta_data["data_path"].split("/") fsplit[-1] = "stats" stats_path = "/".join(fsplit) if stats_path in data_obj.backing_file: print("STATS FOUND IN INPUT DATA") stats_obj = Statistics() stats_obj.p_num = 1 stats_obj.pattern = "PROJECTION" stats = data_obj.backing_file[stats_path] stats_obj.stats_key = list(stats.attrs.get("stats_key")) stats_dict = stats_obj._array_to_dict(stats) Statistics.global_stats[1] = [stats_dict] Statistics.plugin_names[1] = "raw_data" for key in list(stats_dict.keys()): data_obj.meta_data.set(["stats", key], stats_dict[key]) stats_obj._write_stats_to_file(p_num=1, plugin_name="raw_data") fsplit = self.exp.meta_data["data_path"].split("/") fsplit[-1] = "preview" preview_path = "/".join(fsplit) if preview_path in data_obj.backing_file: print("PREVIEW FOUND IN INPUT DATA") preview_str = data_obj.backing_file[preview_path][()] preview = preview_str.split(",") for i, crop in enumerate(self.parameters["preview"]): if crop != ":": # Take preview dimensions from user parameter where they exist preview[i] = crop self.parameters["preview"] = preview def _get_h5_entry(self, filename, path): if path in filename: return filename[path] else: raise Exception("Path %s not found in %s" % (path, filename)) def __get_nFrames(self, dObj): if self.parameters['3d_to_4d'] is False: return 0 if self.parameters['3d_to_4d'] is True: try: # for backwards compatibility n_frames = eval(self.parameters["angles"], {"builtins": None, "np": np}) return np.array(n_frames).shape[0] except Exception: raise Exception("Please specify the angles, or the number of " "frames per scan (via 3d_to_4d param) in the loader.") if isinstance(self.parameters['3d_to_4d'], int): return self.parameters['3d_to_4d'] else: raise Exception("Unknown value for loader parameter '3d_to_4d', " "please specify an integer value") def _setup_3d(self, data_obj): logging.debug("Setting up 3d tomography data.") rot = 0 detY = 1 detX = 2 data_obj.set_axis_labels('rotation_angle.degrees', 'detector_y.pixel', 'detector_x.pixel') data_obj.add_pattern('PROJECTION', core_dims=(detX, detY), slice_dims=(rot,)) data_obj.add_pattern('SINOGRAM', core_dims=(detX, rot), slice_dims=(detY,)) data_obj.add_pattern('TANGENTOGRAM', core_dims=(rot, detY), slice_dims=(detX,)) def __setup_3d_to_4d(self, data_obj, n_frames): logging.debug("setting up 4d tomography data from 3d input.") from savu.data.data_structures.data_types.map_3dto4d_h5 \ import Map3dto4dh5 all_angles = data_obj.data.shape[0] if all_angles % n_frames != 0: self.log_warning("There are not a complete set of scans in this file, " "loading complete ones only") data_obj.data = Map3dto4dh5(data_obj, n_frames) data_obj.set_original_shape(data_obj.data.get_shape()) def __setup_4d(self, data_obj): logging.debug("setting up 4d tomography data.") rot = 0 detY = 1 detX = 2 scan = 3 data_obj.set_axis_labels('rotation_angle.degrees', 'detector_y.pixel', 'detector_x.pixel', 'scan.number') data_obj.add_pattern('PROJECTION', core_dims=(detX, detY), slice_dims=(rot, scan)) data_obj.add_pattern('SINOGRAM', core_dims=(detX, rot), slice_dims=(detY, scan)) data_obj.add_pattern('TANGENTOGRAM', core_dims=(rot, detY), slice_dims=(detX, scan)) data_obj.add_pattern('SINOMOVIE', core_dims=(detX, rot, scan), slice_dims=(detY,)) def _set_dark_and_flat(self, data_obj): flat = self.parameters['flat'][0] dark = self.parameters['dark'][0] if not flat or not dark: self.__find_dark_and_flat(data_obj, flat=flat, dark=dark) else: self.__set_separate_dark_and_flat(data_obj) def __find_dark_and_flat(self, data_obj, flat=None, dark=None): ignore = self.parameters['ignore_flats'] if \ self.parameters['ignore_flats'] else None if self.parameters['image_key_path'] is None: image_key_path = 'dummypath/' else: image_key_path = self.parameters['image_key_path'] try: image_key = data_obj.backing_file[image_key_path][...] data_obj.data = \ ImageKey(data_obj, image_key, 0, ignore=ignore) except KeyError as Argument: self.log_warning("An image key was not found due to following error:"+str(Argument)) try: data_obj.data = NoImageKey(data_obj, None, 0) entry = 'entry1/tomo_entry/instrument/detector/' data_obj.data._set_flat_path(entry + 'flatfield') data_obj.data._set_dark_path(entry + 'darkfield') except KeyError: self.log_warning("Dark/flat data was not found in input file.") data_obj.data._set_dark_and_flat() if dark: data_obj.data.update_dark(dark) if flat: data_obj.data.update_flat(flat) def __set_separate_dark_and_flat(self, data_obj): try: image_key = data_obj.backing_file[ 'entry1/tomo_entry/instrument/detector/image_key'][...] except: image_key = None data_obj.data = NoImageKey(data_obj, image_key, 0) self.__set_data(data_obj, 'flat', data_obj.data._set_flat_path) self.__set_data(data_obj, 'dark', data_obj.data._set_dark_path) def __set_data(self, data_obj, name, func): path, entry, scale = self.parameters[name] if path.split('/')[0] == 'Savu': import os savu_base_path = os.path.join(os.path.dirname( os.path.realpath(__file__)), '..', '..', '..', '..') path = os.path.join(savu_base_path, path.split('Savu')[1][1:]) ffile = h5py.File(path, 'r') try: image_key = \ ffile['entry1/tomo_entry/instrument/detector/image_key'][...] func(ffile[entry], imagekey=image_key) except: func(ffile[entry]) data_obj.data._set_scale(name, scale) def _set_rotation_angles(self, data_obj): angles = self.parameters['angles'] warn_ms = "No angles found so evenly distributing them between 0 and" \ " 180 degrees" if angles is None: angle_key = 'entry1/tomo_entry/data/rotation_angle' nxs_angles = self.__get_angles_from_nxs_file(data_obj, angle_key) if nxs_angles is None: self.log_warning(warn_ms) angles = np.linspace(0, 180, data_obj.get_shape()[0]) else: angles = nxs_angles else: try: angles = eval(angles) except Exception as e: logging.warning(e) try: angles = np.loadtxt(angles) except Exception as e: logging.debug(e) self.log_warning(warn_ms) angles = np.linspace(0, 180, data_obj.get_shape()[0]) data_obj.meta_data.set("rotation_angle", angles) return len(angles) def _set_projection_shifts(self, data_obj): proj_shifts = np.zeros((data_obj.get_shape()[0], 2)) # initialise a 2d array of projection shifts self.exp.meta_data.set('projection_shifts', proj_shifts) data_obj.meta_data.set("projection_shifts", proj_shifts) return len(proj_shifts) def __get_angles_from_nxs_file(self, data_obj, path): if path in data_obj.backing_file: idx = data_obj.data.get_image_key() == 0 if \ isinstance(data_obj.data, ImageKey) else slice(None) return data_obj.backing_file[path][idx] else: self.log_warning("No rotation angle entry found in input file.") return None def _get_data_file(self): data = self.exp.meta_data.get("data_file") return h5py.File(data, 'r') def __check_angles(self, data_obj, n_angles): rot_dim = data_obj.get_data_dimension_by_axis_label("rotation_angle") data_angles = data_obj.data.get_shape()[rot_dim] if data_angles != n_angles: if self.nFrames > 1: rot_angles = data_obj.meta_data.get("rotation_angle") try: full_rotations = n_angles // data_angles cleaned_size = full_rotations * data_angles if cleaned_size != n_angles: rot_angles = rot_angles[0:cleaned_size] self.log_warning( "the angle list has more values than expected in it") rot_angles = np.reshape( rot_angles, [full_rotations, data_angles]) data_obj.meta_data.set("rotation_angle", np.transpose(rot_angles)) return except: pass raise Exception("The number of angles %s does not match the data " "dimension length %s" % (n_angles, data_angles))
[docs] def executive_summary(self): """ Provide a summary to the user for the result of the plugin. e.g. - Warning, the sample may have shifted during data collection - Filter operated normally :returns: A list of string summaries """ if len(self.warnings) == 0: return ["Nothing to Report"] else: return self.warnings