# Copyright 2018 Diamond Light Source Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
.. module:: stage_motion
:platform: Unix
:synopsis: Calculate stage motion from motor positions.
.. moduleauthor:: Nicola Wadeson <scientificsoftware@diamond.ac.uk>
"""
import copy
import numpy as np
import itertools
import time
import logging
from typing import Union
from savu.plugins.plugin import Plugin
from savu.plugins.utils import register_plugin
from savu.plugins.driver.cpu_plugin import CpuPlugin
from savu.core.utils import ensure_string
from pmacparser.pmac_parser import PMACParser
[docs]@register_plugin
class StageMotion(Plugin, CpuPlugin):
NUM_OUTPUT_Q_VARS = 9
MEAN_INDEX = 0
NUM_DATASETS = 1
def __init__(self):
super(StageMotion, self).__init__("StageMotion")
self.parser = None
self.variables = {}
self.pvals = None
self.num_datasets = self.NUM_DATASETS
self.use_min_max = False
def _check_parameters(self):
use_min_max = self.parameters['use_min_max']
logging.debug("Config use min max: " + str(use_min_max))
logging.debug("Experiment meta data use min max: " + \
str(self.exp.meta_data.get('use_minmax')))
# Check the parameters from the config file and the experiment data
# to check whether to use extra datasets
# Config value takes priority, check if None
if self.parameters['use_min_max'] is None:
# Config was None, so check the value in the experiment data file
if self.exp.meta_data.get('use_minmax'):
self.use_min_max = True
elif self.parameters['use_min_max']:
self.use_min_max = True
[docs] def pre_process(self):
# Check the parameters from the config file and the experiment data
if self.parameters['use_min_max'] is None:
# Config was None, so check the value in the experiment data file
if self.exp.meta_data.get('use_minmax'):
self.use_min_max = True
elif self.parameters['use_min_max']:
self.use_min_max = True
# Get the output shape from the plugin
out_pdata = self.get_plugin_out_datasets()[0]
self.out_pshape = out_pdata.get_shape()
# Get the kinematic program to run and the static variables
program_from_data = self.get_in_meta_data()[0].get_dictionary()['program']
variables_from_data = self.get_in_meta_data()[0].get_dictionary()['variables']
# Create the list of code lines from the kinematic program input
code_lines = []
for line in program_from_data:
code_lines.append(ensure_string(line))
# Set the static variables from the meta input
for i in range(len(variables_from_data)):
name = ensure_string(variables_from_data['Name'][i])
self.variables[name] = variables_from_data['Value'][i]
# Create the PMACParser instance
self.parser = PMACParser(code_lines)
# Find which dimension of the data passed to process_frames has axis
# label 'motor'
p_data = self.get_plugin_in_datasets()[0]
self.motor_dim = p_data.get_data_dimension_by_axis_label('motor')
# Fet the axis label values associated with the axis label 'motor'
data = self.get_in_datasets()[0]
self.pvals = data.meta_data.get('motor')
[docs] def process_frames(self, data):
# Data coming in is either an array of 1 or 3 elements
# If 1 element, then just MEAN data, if 3 then MEAN, MIN, MAX
# The last dimension represents the P values for each slice
# Returns either 1 or 3 datasets of size to match the input
mean_output = np.zeros(self.out_pshape)
# Check if using all combinations of min, mean and max
if self.use_min_max:
# Using min, mean and max
min_output = np.zeros(self.out_pshape)
max_output = np.zeros(self.out_pshape)
num_frames, num_p = np.array(data[0]).shape
# Create a new data array consisting of all permutations of the P values
# for min, mean and max, in order to call the parser only once
# Create an index of the min, mean and max permutations
index = np.array([list(itertools.product([0, 1, 2], repeat=num_p))])
index = index.reshape(index.shape[0] * index.shape[1], index.shape[2])
# Stack and transpose the data array to get it so that the last two dimensions
# represent the min,mean,max and then the P value
new_array = np.dstack(np.transpose(data))
# Create the new data array by slicing the new array with the permutations
new_data = new_array[:, index, np.arange(num_p)]
# Now, set the variables to pass to the parser with the new data array
for index, pval in enumerate(self.pvals):
self.variables['P' + str(pval)] = new_data[:, :, index]
# Run the parser
parse_result = self.parser.parse(self.variables)
# Extract the Q values from the parse results
for i in range(1, self.NUM_OUTPUT_Q_VARS + 1):
if 'Q' + str(i) in parse_result:
q_result = parse_result['Q' + str(i)]
if not hasattr(q_result, 'size'):
size = 1
else:
size = q_result.size
if size > 1:
mean_output[i - 1] = q_result[:, self.MEAN_INDEX]
max_output[i - 1] = np.amax(q_result, axis=1)
min_output[i - 1] = np.amin(q_result, axis=1)
else:
mean_output[i - 1] = q_result
max_output[i - 1] = q_result
min_output[i - 1] = q_result
return [mean_output, min_output, max_output]
else:
# Only using mean data
data_mean = data[self.MEAN_INDEX]
# Run the parser on the mean input data to get the mean output data
for index, pval in enumerate(self.pvals):
self.variables['P' + str(pval)] = data_mean[:, index]
# Run the parser
output_vars = self.parser.parse(self.variables)
# Extract the Q values from the parse results
for i in range(1, self.NUM_OUTPUT_Q_VARS + 1):
if 'Q' + str(i) in output_vars:
mean_output[i - 1] = output_vars['Q' + str(i)]
return [mean_output]
[docs] def post_process(self):
pass
[docs] def setup(self):
# take in n x m
# return n x m x r
in_datasets, out_datasets = self.get_datasets()
self.motor_data_dim = \
in_datasets[0].get_data_dimension_by_axis_label('motor')
# additional dimension information
dim_len = self.NUM_OUTPUT_Q_VARS # = r
dim_pos = 0
dim_axis_name = 'Qvalues'
dim_axis_unit = 'units'
new_axis = '.'.join(['~' + str(dim_pos), dim_axis_name, dim_axis_unit])
axis_labels = {in_datasets[0]: [str(self.motor_data_dim), new_axis]}
self.shape = self.__get_output_shape(in_datasets[0], dim_pos, dim_len)
pattern = self.__update_pattern_information(in_datasets[0], dim_pos)
# Populate the output datasets with required information
number_of_datasets_in_use = 1
if self.parameters['use_min_max'] is None:
# Config was None, so check the value in the experiment data file
if self.exp.meta_data.get('use_minmax'):
self.use_min_max = True
elif self.parameters['use_min_max']:
self.use_min_max = True
if self.use_min_max:
number_of_datasets_in_use = 3
logging.debug("Number of datasets: " + str(number_of_datasets_in_use))
for i in range(number_of_datasets_in_use):
out_datasets[i].create_dataset(axis_labels=axis_labels,
shape=self.shape)
out_datasets[i].add_pattern('MOTOR_POSITION', **pattern)
# ================== populate plugin datasets =========================
in_pData, out_pData = self.get_plugin_datasets()
for i in range(number_of_datasets_in_use):
in_pData[i].plugin_data_setup('MOTOR_POSITION', 'multiple')
out_pData[i].plugin_data_setup('MOTOR_POSITION', 'multiple')
# =====================================================================
def __update_pattern_information(self, dObj, dim_pos):
pattern = copy.deepcopy(dObj.get_data_patterns()['MOTOR_POSITION'])
pattern['core_dims'] = (dim_pos,)
slice_dims = pattern['slice_dims']
slice_dims = [i if i < dim_pos else i + 1 for i in list(slice_dims)]
pattern['slice_dims'] = tuple(slice_dims)
return pattern
def __get_output_shape(self, dObj, dim_pos, dim_len):
shape = list(dObj.get_shape())
del shape[self.motor_data_dim]
shape.insert(dim_pos, dim_len)
return tuple(shape)
[docs] def nOutput_datasets(self):
if self.use_min_max:
self.parameters['out_datasets'].extend(
self.parameters['extra_out_datasets'])
return self.num_datasets