Source code for plugins.reconstructions.astra_recons.astra_recon_gpu

# Copyright 2014 Diamond Light Source Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
.. module:: astra_recon_gpu
   :platform: Unix
   :synopsis: Wrapper around the Astra toolbox for gpu reconstruction using vector geometry
.. moduleauthor:: Mark Basham <scientificsoftware@diamond.ac.uk>

"""
import astra
import numpy as np

from savu.plugins.reconstructions.astra_recons.base_astra_vector_recon \
    import BaseAstraVectorRecon
from savu.plugins.reconstructions.base_recon import BaseRecon
from savu.plugins.driver.gpu_plugin import GpuPlugin
from savu.plugins.utils import register_plugin
from savu.core.iterate_plugin_group_utils import \
    check_if_end_plugin_in_iterate_group


[docs]@register_plugin class AstraReconGpu(BaseAstraVectorRecon, GpuPlugin): def __init__(self): super(AstraReconGpu, self).__init__("AstraReconGpu") self.GPU_index = None self.res = False self.start = 0
[docs] def set_options(self, cfg): if 'option' not in cfg.keys(): cfg['option'] = {} cfg['option']['GPUindex'] = self.parameters['GPU_index'] return cfg
# total number of output datasets
[docs] def nOutput_datasets(self): alg = self.parameters['algorithm'] if self.parameters['res_norm'] is True and 'FBP' not in alg \ and check_if_end_plugin_in_iterate_group(self.exp): err_str = "The res_norm output dataset has not yet been " \ "implemented for when AstraReconGpu is at the end of an " \ "iterative loop" raise ValueError(err_str) elif self.parameters['res_norm'] is True and 'FBP' not in alg: self.res = True self.parameters['out_datasets'].append('res_norm') return 2 elif check_if_end_plugin_in_iterate_group(self.exp): return 2 else: return 1
[docs] def astra_setup(self): options_list = ["FBP_CUDA", "SIRT_CUDA", "SART_CUDA", "CGLS_CUDA", "FP_CUDA", "BP_CUDA", "BP3D_CUDA", "FBP3D_CUDA", "SIRT3D_CUDA", "CGLS3D_CUDA"] if not options_list.count(self.parameters['algorithm']): raise Exception("Unknown Astra GPU algorithm.")
[docs] def astra_2D_vector_recon(self, data): sino = data[0] cor, angles, vol_shape, init = self.get_frame_params() skip = self.parameters['skip_projections'] skip_idx = self.get_skipping_indices(skip) if skip_idx is not None: max_idx = sino.shape[0] skip_idx = np.unique(np.clip(skip_idx, 0, max_idx - 1)) use_idx = np.setdiff1d(np.arange(max_idx), skip_idx) sino = sino[use_idx] angles = angles[use_idx] if self.res: res = np.zeros(self.len_res) # create volume geom vol_geom = astra.create_vol_geom(vol_shape) # create projection geom det_width = sino.shape[self.dim_detX] half_det_width = 0.5 * det_width cor_astra_scalar = half_det_width - cor # set parallel beam vector geometry vectors = self.vec_geom_init2D(np.deg2rad(angles), 1.0, cor_astra_scalar - 0.5) try: # vector geometry (astra > 1.9v) proj_geom = astra.create_proj_geom('parallel_vec', det_width, vectors) except: print('Warning: using scalar geometry since the Astra version <1.9 does not support the vector one for 2D') proj_geom = astra.create_proj_geom('parallel', 1.0, det_width, angles) sino = np.transpose(sino, (self.dim_rot, self.dim_detX)) # Create a data object to hold the sinogram data sino_id = astra.data2d.create('-sino', proj_geom, sino) # create reconstruction id if init is not None: rec_id = astra.data2d.create('-vol', vol_geom, init) else: rec_id = astra.data2d.create('-vol', vol_geom) # if self.mask_id: # self.mask_id = astra.data2d.create('-vol', vol_geom, self.mask) # setup configuration options cfg = self.set_config(rec_id, sino_id, proj_geom, vol_geom) # create algorithm id alg_id = astra.algorithm.create(cfg) # run algorithm if self.res: for j in range(self.iters): # Run a single iteration astra.algorithm.run(alg_id, 1) res[j] = astra.algorithm.get_res_norm(alg_id) else: astra.algorithm.run(alg_id, self.iters) # get reconstruction matrix if self.manual_mask is not False: recon = self.manual_mask * astra.data2d.get(rec_id) else: recon = astra.data2d.get(rec_id) # delete geometry self.delete(alg_id, sino_id, rec_id, False) return [recon, res] if self.res else recon
[docs] def astra_3D_vector_recon(self, data): proj_data3d = data[0] # get 3d block of projection data cor, angles, vol_shape, init = self.get_frame_params() skip = self.parameters['skip_projections'] skip_idx = self.get_skipping_indices(skip) if skip_idx is not None: max_idx = proj_data3d.shape[0] skip_idx = np.unique(np.clip(skip_idx, 0, max_idx - 1)) use_idx = np.setdiff1d(np.arange(max_idx), skip_idx) proj_data3d = proj_data3d[use_idx] angles = angles[use_idx] projection_shifts2d = self.get_frame_shifts() half_det_width = 0.5 * proj_data3d.shape[self.sino_dim_detX] cor_astra_scalar = half_det_width - np.mean(cor) # works with scalar CoR only atm recon = np.zeros(vol_shape) recon = np.expand_dims(recon, axis=self.slice_dir) if self.res: res = np.zeros((self.vol_shape[self.slice_dir], self.iters)) # create volume geometry vol_geom = \ astra.create_vol_geom(vol_shape[0], vol_shape[2], vol_shape[1]) # define astra vector geometry for 3d case vectors3d = self.vec_geom_init3D(np.deg2rad(angles + 90.0), 1.0, 1.0, cor_astra_scalar - 0.5, projection_shifts2d) proj_geom = astra.create_proj_geom('parallel3d_vec', proj_data3d.shape[self.sino_dim_detY], proj_data3d.shape[self.sino_dim_detX], vectors3d) proj_data3d = np.swapaxes(proj_data3d, 0, 1) if self.parameters['algorithm'] == "FBP3D_CUDA": # pre-filter projection data proj_data3d = self.filtersinc3d(proj_data3d) # create projection data id proj_id = astra.data3d.create("-sino", proj_geom, proj_data3d) # create reconstruction id if init is not None: rec_id = astra.data3d.create('-vol', vol_geom, init) else: rec_id = astra.data3d.create('-vol', vol_geom) # setup configuration options cfg = self.set_config(rec_id, proj_id, proj_geom, vol_geom) if self.parameters['algorithm'] == "FBP3D_CUDA": cfg['type'] = 'BP3D_CUDA' # create algorithm id alg_id = astra.algorithm.create(cfg) # run algorithm if self.res: for j in range(self.iters): # Run a single iteration astra.algorithm.run(alg_id, 1) res[j] = astra.algorithm.get_res_norm(alg_id) else: astra.algorithm.run(alg_id, self.iters) # get reconstruction matrix # if self.manual_mask: # recon = self.mask*astra.data3d.get(rec_id) # else: # recon = astra.data3d.get(rec_id) recon = np.transpose(astra.data3d.get(rec_id), (2, 0, 1)) # delete geometry self.delete(alg_id, proj_id, rec_id, False) self.start += 1 if self.res: return [recon, res] else: return recon
[docs] def rotation_matrix2D(self, theta): # define 2D rotation matrix return np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]])
[docs] def rotation_matrix3D(self, theta): # define 3D rotation matrix return np.array([[np.cos(theta), -np.sin(theta), 0.0], [np.sin(theta), np.cos(theta), 0.0], [0.0, 0.0, 1.0]])
[docs] def vec_geom_init2D(self, angles_rad, DetectorSpacingX, CenterRotOffset): # define 2D vector geometry s0 = [0.0, -1.0] # source d0 = [CenterRotOffset, 0.0] # detector u0 = [DetectorSpacingX, 0.0] # detector coordinates vectors = np.zeros([angles_rad.size, 6]) for i in range(0, angles_rad.size): theta = angles_rad[i] vec_temp = np.dot(self.rotation_matrix2D(theta), s0) vectors[i, 0:2] = vec_temp[:] # ray position vec_temp = np.dot(self.rotation_matrix2D(theta), d0) vectors[i, 2:4] = vec_temp[:] # center of detector position vec_temp = np.dot(self.rotation_matrix2D(theta), u0) vectors[i, 4:6] = vec_temp[:] # detector pixel (0,0) to (0,1). return vectors
[docs] def vec_geom_init3D(self, angles_rad, DetectorSpacingX, DetectorSpacingY, CenterRotOffset, projection_shifts2d): # define 3D vector geometry s0 = [0.0, -1.0, 0.0] # source u0 = [DetectorSpacingX, 0.0, 0.0] # detector coordinates v0 = [0.0, 0.0, DetectorSpacingY] # detector coordinates vectors = np.zeros([angles_rad.size, 12]) for i in range(0, angles_rad.size): d0 = [CenterRotOffset - projection_shifts2d[i, 0], 0.0, -projection_shifts2d[i, 1] - 0.5] # detector theta = angles_rad[i] vec_temp = np.dot(self.rotation_matrix3D(theta), s0) vectors[i, 0:3] = vec_temp[:] # ray position vec_temp = np.dot(self.rotation_matrix3D(theta), d0) vectors[i, 3:6] = vec_temp[:] # center of detector position vec_temp = np.dot(self.rotation_matrix3D(theta), u0) vectors[i, 6:9] = vec_temp[:] # detector pixel (0,0) to (0,1). vec_temp = np.dot(self.rotation_matrix3D(theta), v0) vectors[i, 9:12] = vec_temp[:] # Vector from detector pixel (0,0) to (1,0) return vectors
[docs] def filtersinc3d(self, projection3d): import scipy.fftpack # applies a sinc filter to 3D projection data # Data format [DetectorVert, Projections, DetectorHoriz] # adopted from Matlabs code by Waqas Akram # "a": This parameter varies the filter magnitude response. # When "a" is very small (a<<1), the response approximates |w| # As "a" is increased, the filter response starts to # roll off at high frequencies. a = 1.1 [DetectorsLengthV, projectionsNum, DetectorsLengthH] = np.shape(projection3d) w = np.linspace(-np.pi, np.pi - (2 * np.pi) / DetectorsLengthH, DetectorsLengthH, dtype='float32') rn1 = np.abs(2.0 / a * np.sin(a * w / 2.0)) rn2 = np.sin(a * w / 2.0) rd = (a * w) / 2.0 rd_c = np.zeros([1, DetectorsLengthH]) rd_c[0, :] = rd r = rn1 * (np.dot(rn2, np.linalg.pinv(rd_c))) ** 2 multiplier = (1.0 / projectionsNum) f = scipy.fftpack.fftshift(r) filtered = np.zeros(np.shape(projection3d)) for j in range(0, DetectorsLengthV): for i in range(0, projectionsNum): IMG = scipy.fftpack.fft(projection3d[j, i, :]) filtered[j, i, :] = multiplier * np.real(scipy.fftpack.ifft(IMG * f)) return np.float32(filtered)