Source code for plugins.filters.poly_background_estimator

# Copyright 2014 Diamond Light Source Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
.. module:: poly_background_estimator
   :platform: Unix
   :synopsis: A plugin to find peaks in spectra

.. moduleauthor:: Aaron Parsons <scientificsoftware@diamond.ac.uk>

"""

from savu.plugins.driver.cpu_plugin import CpuPlugin
from savu.plugins.utils import register_plugin
from savu.plugins.filters.base_filter import BaseFilter
import numpy as np
np.seterr(divide='ignore', invalid='ignore')

[docs]def division_zero(x,y): try: return x/y except ZeroDivisionError: return 0
[docs]@register_plugin class PolyBackgroundEstimator(BaseFilter, CpuPlugin): def __init__(self): super(PolyBackgroundEstimator, self).__init__("PolyBackgroundEstimator")
[docs] def process_frames(self, data): data = data[0] x = self.axis n = self.parameters['n'] if self.parameters['weights'] == '1/data': weights = division_zero(1.0, data) elif self.parameters['weights'] == '1/data^2': weights = division_zero(1.0,data**2) pvalue = self.parameters['pvalue'] MaxIterations = self.parameters['MaxIterations'] zu, _c, _poly, _weight, _index = \ self.poly_background_estimator(x, data, n, weights, MaxIterations, pvalue, fixed=True) #print "The shape of zu is:"+str(zu.shape) return zu
[docs] def setup(self): in_dataset, out_datasets = self.get_datasets() out = out_datasets[0] out.create_dataset(in_dataset[0]) in_pData, out_pData = self.get_plugin_datasets() in_pData[0].plugin_data_setup('SPECTRUM', self.get_max_frames()) out_pData[0].plugin_data_setup('SPECTRUM', self.get_max_frames()) in_meta = self.get_in_meta_data()[0] # get the axis alabel = \ list(in_dataset[0].data_info.get('axis_labels')[-1].keys())[0] self.axis = in_meta.get(alabel)
[docs] def get_max_frames(self): return 'single'
def __generate_parameters(self, n, weight, xdata, ydata): """ generates polynomials based on S. Steenstrup J. Appl. Cryst. (1981). 14, 226--229 "A Simple Procedure for Fitting a Background to a Certain Class of \ Measured Spectra". The polynomial parameters are based on weights \ supplied as part of the fitting fit. """ Npoints = xdata.size poly = np.zeros((n, Npoints), dtype=np.float64, order='F') gamma = np.zeros(n, dtype=np.float64, order='F') alpha = np.zeros(n, dtype=np.float64, order='F') beta = np.zeros(n, dtype=np.float64, order='F') a = np.zeros(n, dtype=np.float64, order='F') alpha[0] = (weight * xdata).sum() / weight.sum() beta[0] = 0.0 poly[0] = 1.0 poly[1] = xdata - alpha[0] for j in range(n): if j > 1: poly[j] = (xdata - alpha[j - 1]) * \ poly[j - 1] - beta[j - 1] * poly[j - 2] p = poly[j] g = (weight * p * p).sum() gamma[j] = g a[j] = (weight * ydata * p / g).sum() if j > 0: alpha[j] = (weight * xdata * p * p).sum() / g beta[j] = \ (weight * xdata * p * poly[j - 1]).sum() / gamma[j - 1] return alpha, gamma, beta, a, poly
[docs] def poly_background_estimator(self, xdata, ydata, n=2, weights=None, maxIterations=12, pvalue=0.9, fixed=False): """ Background estimator based on orthogonal polynomials Input: xdata,ydata (numpy arrays of same length) pvalue : ratio of variance in poly to poly value at which to stop. 0.9 default Output: background,polynomial weights, polynomials S. Steenstrup J. Appl. Cryst. (1981). 14, 226--229 "A Simple Procedure for Fitting a Background to a Certain Class of \ Measured Spectra" """ m = 0 ud = False Npoints = xdata.size ydata = np.clip(ydata, 0.0001, ydata.max()) weight = 1.0 / ydata # zu = np.zeros(Npoints, dtype=np.float64) myiter = 0 vartest = True c_old = [] if(fixed): maxIterations = n+1 n = 2 myiter = 2 AIC_OLD = 1.0e34 while myiter < maxIterations: myiter += 1 _alpha, gamma, _beta, c, poly = \ self.__generate_parameters(n, weight, xdata, ydata) zu = (c[:, np.newaxis] * poly).sum(axis=0) y_diff = ydata - zu rss = (y_diff*y_diff).sum() Eu = (weight * y_diff * y_diff).sum() f = Npoints - n - m if (Eu < (f + m + np.sqrt(2.0 * f))): break sigma = np.sqrt(Eu / (f * gamma)) m = 0.0 index = ydata > (zu + 2.0 * np.sqrt(np.abs(zu))) weight = 1.0 / np.abs(zu) tw = ydata[index] - zu[index] weight[index] = 1.0 / (tw * tw) m = Npoints - index.sum() if(fixed): n += 1 else: if myiter > 1: clen = min(len(c_old), len(c)) vartest = True for i in range(clen): if((c[i]-sigma[i]) < c_old[i] and c_old[i] < (c[i] + sigma[i])): vartest = True else: vartest = False break if vartest: if abs(sigma[n-1] / c[n-1]) < pvalue: if ud: break n = n + 1 else: if n > 3: n = n - 1 ud = True c_old = c #print "finished", index.sum(), len(zu), Npoints, n, m return zu, c, poly, weight, index