Source code for savu.data.chunking

# Copyright 2014 Diamond Light Source Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
.. module:: chunking
   :platform: Unix
   :synopsis: A class to optimise hdf5 chunking
.. moduleauthor:: Nicola Wadeson <scientificsoftware@diamond.ac.uk>
"""

import copy
import logging
from math import gcd
import numpy as np


[docs]class Chunking(object): """ A class to save tomography data to a hdf5 file """ def __init__(self, exp, patternDict): self.pattern_dict = patternDict self.current = patternDict['current'][list(patternDict['current'].keys())[0]] if patternDict['next']: self.next = patternDict['next'][list(patternDict['next'].keys())[0]] else: self.next = self.current try: self.next_pattern = list(patternDict['next'].keys())[0] except AttributeError: self.next_pattern = list(patternDict['current'].keys())[0] self.exp = exp self.core = None self.slice1 = None self.other = None self.default_chunk_max = 1000000 def __lustre_workaround(self, chunks, shape): nChunks_to_create_file = \ np.ceil(np.prod(np.array(shape)/np.array(chunks, dtype=np.float))) nProcesses = self.exp.meta_data.get('processes') dims = list(range(len(shape))) chunks = list(chunks) if nChunks_to_create_file < nProcesses: idx = [i for i in dims if shape[i] - chunks[i] > 0 and chunks[i] > 1] idx = idx if idx else [i for i in dims if chunks[i] > 1] if idx: chunks[idx[0]] = int(np.ceil(chunks[idx[0]] / 2.0)) return tuple(chunks) else: raise Exception('There is an error in the lustre workaround') def _calculate_chunking(self, shape, ttype, chunk_max=None): """ Calculate appropriate chunk sizes for this dataset """ self.chunk_max = chunk_max if chunk_max else self.default_chunk_max logging.debug("shape = %s", shape) if len(shape) < 3: return True chunks = [1]*len(shape) adjust = self.__set_adjust_params(shape) self.__set_chunks(chunks, shape, adjust) if 0 in chunks: return True else: chunks = self.__adjust_chunk_size(chunks, ttype, shape, adjust) # temporary work around for lustre if self.exp.meta_data.get('lustre') is True: chunks = self.__lustre_workaround(chunks, shape) logging.debug("chunk size %s", chunks) return tuple(chunks) def __set_adjust_params(self, shape): """ Set adjustable dimension parameters (the dimension number, increment and max value) """ adjust_dim = self.__get_adjustable_dims() array_len = len(adjust_dim) adjust_max = [1]*array_len for i in range(array_len): adjust_max[i] = shape[adjust_dim[i]] inc_dict = {'up': [1]*array_len, 'down': [1]*array_len} bounds = {'min': [1]*array_len, 'max': adjust_max} return {'dim': adjust_dim, 'inc': inc_dict, 'bounds': bounds} def __get_adjustable_dims(self): """ Get all core dimensions and fastest changing slice dimension (all potentially adjustable) """ self.core = \ list(self.current['core_dims']) + list(self.next['core_dims']) c_sl = list(self.current['slice_dims']) n_sl = list(self.next['slice_dims']) self.slice1 = [c_sl[0]] + [n_sl[0]] self.other = c_sl[1:] + n_sl[1:] return list(set(self.core + self.slice1)) def __set_chunks(self, chunks, shape, adjust): """ Calculate initial chunk values """ chunk_functions = {0: self.__slice_slice, 1: self.__slice_other, 2: self.__core_core, 3: self.__core_other, 4: self.__core_slice} adj_idx = 0 for dim in adjust['dim']: count = (dim in self.core)*4 + (dim in self.slice1)*2 + \ (dim in self.other) - 2 chunks[dim] = chunk_functions[count](dim, adj_idx, adjust, shape) # if 'VOLUME' in self.next_pattern: # self.__set_volume_bounds(adjust, dim, chunks) adj_idx += 1 return chunks def __set_volume_bounds(self, adjust, dim, chunks): adjust['bounds']['min'][dim] = \ eval(str(62) + adjust['inc']['down'][dim]) chunks[dim] = int(min(adjust['bounds']['max'][dim], 62)) def __core_core(self, dim, adj_idx, adjust, shape): adjust['inc']['up'][adj_idx] = '+1' adjust['inc']['down'][adj_idx] = '/2' adjust['bounds']['max'][adj_idx] = shape[dim] return shape[dim] def __core_slice(self, dim, adj_idx, adjust, shape): max_frames = self.__get_max_frames_dict()[dim] adjust['inc']['up'][adj_idx] = '+' + str(max_frames) adjust['inc']['down'][adj_idx] = '/2' # '-' + str(max_frames) # which is the slice dimension: current or next? ddict = self.current if dim in self.current['slice_dims'] else self.next shape, allslices = self.__get_shape(shape, ddict) adjust['bounds']['max'][adj_idx] = self.__max_frames_per_process( shape, max_frames, allslices=allslices) return min(max_frames, shape) def __core_other(self, dim, adj_idx, adjust, shape): adjust['inc']['up'][adj_idx] = '+1' adjust['inc']['down'][adj_idx] = '-1' adjust['bounds']['max'][adj_idx] = shape[dim] return 1 def __slice_slice(self, dim, adj_idx, adjust, shape): max_frames = self.__get_max_frames_dict()[dim] adjust['inc']['up'][adj_idx] = '+' + str(max_frames) adjust['inc']['down'][adj_idx] = '/2' shape1 = np.prod([shape[s] for s in self.current['slice_dims']]) shape2 = np.prod([shape[s] for s in self.next['slice_dims']]) ddict = self.current if shape1 < shape2 else self.next shape, allslices = self.__get_shape(shape, ddict) adjust['bounds']['max'][adj_idx] = self.__max_frames_per_process( shape, max_frames, allslices=allslices) return min(max_frames, shape) def __slice_other(self, dim, adj_idx, adjust, shape): adjust['inc']['up'][adj_idx] = '+1' adjust['inc']['down'][adj_idx] = '-1' adjust['bounds']['max'][adj_idx] = shape[dim] return 1 def __get_max_frames_dict(self): current_sdir = self.current['slice_dims'][0] next_sdir = self.next['slice_dims'][0] mft = 'max_frames_transfer' if current_sdir == next_sdir: c_max = self.current[mft] n_max = self.next[mft] least_common_multiple = (c_max*n_max) // gcd(c_max, n_max) ddict = {current_sdir: least_common_multiple} else: ddict = {self.current['slice_dims'][0]: self.current[mft], self.next['slice_dims'][0]: self.next[mft]} return ddict def __get_shape(self, shape, ddict): """ Get shape taking into account padding. """ shape = [shape[s] for s in ddict['slice_dims']] if 'transfer_shape' not in list(ddict.keys()): return shape[0], np.prod(shape) size_list = [ddict['transfer_shape'][s] for s in ddict['slice_dims']] trans_per_dim = np.ceil(np.array(shape)/np.array( size_list, dtype=np.float32)) full_shape = np.prod(trans_per_dim*size_list) return shape[0], full_shape def __max_frames_per_process(self, shape, nFrames, allslices=None): """ Calculate the max possible frames per process """ nSlices = allslices if allslices else shape total_plugin_runs = np.ceil(float(nSlices) / nFrames) frame_list = np.arange(total_plugin_runs) nProcs = len(self.exp.meta_data.get('processes')) frame_list_per_proc = np.array_split(frame_list, nProcs) flist_len = [len(f) for f in frame_list_per_proc if len(f) > 0] runs_per_proc = int(np.median(np.array(flist_len))) return int(min(runs_per_proc*nFrames, shape)) def __adjust_chunk_size(self, chunks, ttype, shape, adjust): """ Adjust the chunk size to as close to 1MB as possible """ chunks = np.array(chunks) chunk_size = np.prod(chunks)*np.dtype(ttype).itemsize cache_size = self.chunk_max if (chunk_size > cache_size): self.__decrease_chunks(chunks, ttype, adjust) else: chunks = self.__increase_chunks(chunks, ttype, shape, adjust) return tuple(chunks) def __decrease_chunks(self, chunks, ttype, adjust): """ Decrease the chunk size to below but as close to 1MB as possible """ while ((np.prod(chunks)*np.dtype(ttype).itemsize) > self.chunk_max): idx = self.__get_idx_decrease(chunks, adjust) dim = adjust['dim'].index(idx) # if idx == -1: # break chunks[idx] = int(np.ceil(eval(str(float(chunks[idx])) + adjust['inc']['down'][dim]))) def __increase_chunks(self, chunks, ttype, shape, adjust): """ Increase the chunk size as close to 1MB as possible """ next_chunks = copy.copy(chunks) while ((np.prod(next_chunks)*np.dtype(ttype).itemsize) <= self.chunk_max): chunks = copy.copy(next_chunks) idx = self.__get_idx_increase(next_chunks, adjust) if idx == -1: break dim = adjust['dim'].index(idx) next_chunks[idx] = \ eval(str(next_chunks[idx]) + adjust['inc']['up'][dim]) return chunks def __get_idx_decrease(self, chunks, adjust): """ Determine the chunk dimension to decrease """ self.check = lambda a, b, c, i: \ True if (eval(str(a) + b[i])) < c['min'][i] else False self.__check_adjust_dims(adjust, chunks, 'down') return self.__get_idx_order(adjust, chunks, 'down') def __get_idx_increase(self, chunks, adjust): """ Determine the chunk dimension to increase """ self.check = lambda a, b, c, i: \ True if (eval(str(a) + b[i])) > c['max'][i] else False self.__check_adjust_dims(adjust, chunks, 'up') return self.__get_idx_order(adjust, chunks, 'up') def __get_idx_order(self, adjust, chunks, direction): process_order = [self.slice1, self.core] sl = slice(None, None, -1) if direction == 'up': sl = slice(None, None, 1) process_order = process_order[::-1] avail = list(set(adjust['dim']).intersection(process_order[0])) idx_order = np.argsort(chunks[avail])[sl] if not idx_order.size: avail = list(set(adjust['dim']).intersection(process_order[1])) idx_order = np.argsort(chunks[avail])[sl] return avail[idx_order[0]] if idx_order.size else -1 def __check_adjust_dims(self, adjust, chunks, up_down): nDel = 0 for i in range(len(adjust['dim'])): i -= nDel dim = adjust['dim'][i] # if self.check(chunks[dim], adjust['inc'][up_down], # adjust['bounds'], i): # adjust['inc']['down'][i] = '-1' if self.check(chunks[dim], adjust['inc'][up_down], adjust['bounds'], i): del adjust['dim'][i] del adjust['inc']['up'][i] del adjust['inc']['down'][i] del adjust['bounds']['max'][i] del adjust['bounds']['min'][i] nDel += 1