# Copyright 2014 Diamond Light Source Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
.. module:: plugin_runner
:platform: Unix
:synopsis: Plugin list runner, which passes control to the transport layer.
.. moduleauthor:: Nicola Wadeson <scientificsoftware@diamond.ac.uk>
"""
import logging
import time
import savu.core.utils as cu
import savu.plugins.utils as pu
from savu.data.experiment_collection import Experiment
from savu.data.stats.statistics import Statistics
from savu.core.iterative_plugin_runner import IteratePluginGroup
from savu.core.iterate_plugin_group_utils import check_if_in_iterative_loop, \
check_if_end_plugin_in_iterate_group
[docs]class PluginRunner(object):
""" Plugin list runner, which passes control to the transport layer.
"""
def __init__(self, options, name='PluginRunner'):
class_name = "savu.core.transports." + options["transport"] \
+ "_transport"
cu.add_base(self, cu.import_class(class_name))
super(PluginRunner, self).__init__()
# ********* transport function ***********
self._transport_initialise(options)
self.options = options
# add all relevent locations to the path
pu.get_plugins_paths()
self.exp = Experiment(options)
def _run_plugin_list(self):
""" Create an experiment and run the plugin list.
"""
t0 = time.time()
self.exp._setup(self)
Statistics._setup_class(self.exp)
plugin_list = self.exp.meta_data.plugin_list
logging.info('Running the plugin list check')
self._run_plugin_list_setup(plugin_list)
exp_coll = self.exp._get_collection()
n_plugins = plugin_list._get_n_processing_plugins()
# ********* transport function ***********
logging.info('Running transport_pre_plugin_list_run()')
self._transport_pre_plugin_list_run()
cp = self.exp.checkpoint
checkpoint_plugin = cp.get_checkpoint_plugin()
for i in range(checkpoint_plugin, n_plugins):
self.exp._set_experiment_for_current_plugin(i)
memory_before = cu.get_memory_usage_linux()
# now that nPlugin has been reset, need to check if we're at a
# plugin index that corresponds to a plugin inside the group to
# iterate over or not
current_iterate_plugin_group = check_if_in_iterative_loop(self.exp)
if current_iterate_plugin_group is None:
# not in an iterative loop, run as normal
plugin = self.__run_plugin(exp_coll['plugin_dict'][i])
plugin_name = plugin.name
else:
# in an iterative loop, run differently
plugin_name = \
current_iterate_plugin_group._execute_iteration_0(
self.exp, self)
self.exp._barrier(msg='PluginRunner: plugin complete.')
memory_after = cu.get_memory_usage_linux()
logging.debug("{} memory usage before: {} MB, after: {} MB, change: {} MB".format(
plugin_name, memory_before, memory_after, memory_after - memory_before))
# ********* transport functions ***********
# end the plugin run if savu has been killed
if self._transport_kill_signal():
self._transport_cleanup(i + 1)
break
self.exp._barrier(msg='PluginRunner: No kill signal... continue.')
Statistics._count()
cp.output_plugin_checkpoint()
# ********* transport function ***********
logging.info('Running transport_post_plugin_list_run')
self._transport_post_plugin_list_run()
# terminate any remaining datasets
for data in list(self.exp.index['in_data'].values()):
self._transport_terminate_dataset(data)
t1 = time.time()
Statistics.total_time = round(t1 - t0, 1)
self.__output_final_message()
if self.exp.meta_data.get('email'):
cu.send_email(self.exp.meta_data.get('email'))
Statistics._post_chain()
return self.exp
def __output_final_message(self):
kill = True if 'killsignal' in \
self.exp.meta_data.get_dictionary().keys() else False
msg = "interrupted by killsignal" if kill else "Complete"
stars = 40 if kill else 23
cu.user_message("*" * stars)
cu.user_message("* Processing " + msg + " in " + str(Statistics.total_time) + " seconds *")
cu.user_message("*" * stars)
def __run_plugin(self, plugin_dict, clean_up_plugin=True, plugin=None):
# allow plugin objects to be reused for running iteratively
if plugin is None:
plugin = self._transport_load_plugin(self.exp, plugin_dict)
plugin.stats_obj.start_time()
iterate_plugin_group = check_if_in_iterative_loop(self.exp)
if iterate_plugin_group is not None and \
iterate_plugin_group._ip_iteration == 0:
iterate_plugin_group.add_plugin_to_iterate_group(plugin)
is_end_plugin_in_iterative_loop = check_if_end_plugin_in_iterate_group(
self.exp)
if iterate_plugin_group is not None and \
is_end_plugin_in_iterative_loop and \
iterate_plugin_group._ip_iteration == 0:
# check if this end plugin is ALSO the start plugin
if iterate_plugin_group.start_index == \
iterate_plugin_group.end_index:
iterate_plugin_group.set_start_plugin(plugin)
# set the end plugin in IteratePluginGroup
iterate_plugin_group.set_end_plugin(plugin)
# setup the 'iterating' key in IteratePluginGroup._ip_data_dict
iterate_plugin_group.set_alternating_datasets()
# setup the PluginData objects
iterate_plugin_group.set_alternating_plugin_datasets()
# setup the datasets for iteration 0 and 1 inside the
# IteratePluginGroup object
iterate_plugin_group.setup_datasets()
# set the output datasets of the end plugin
iterate_plugin_group._IteratePluginGroup__set_datasets()
# ********* transport function ***********
self._transport_pre_plugin()
cu.user_message("*Running the %s plugin*" % plugin.name)
# ******** transport 'process' function is called inside here ********
plugin._run_plugin(self.exp, self) # plugin driver
self.exp._barrier(msg="Plugin returned from driver in Plugin Runner")
cu._output_summary(self.exp.meta_data.get("mpi"), plugin)
# if NOT in an iterative loop, clean up the PluginData associated with
# the Data objects in the plugin object as normal
#
# if in an iterative loop, do not clean up the PluginData object
# associated with the Data objects of the plugin, apart from for the
# last iteration
if clean_up_plugin:
logging.debug(f"Cleaning up plugin {plugin.name}")
plugin._clean_up()
else:
info_msg = f"Not cleaning up plugin {plugin.name}, as it is in a " \
f"group to iterate over, will only copy metadata"
# TODO: maybe other things in Plugin._clean_up() should also be
# done?
plugin._Plugin__copy_meta_data()
logging.debug(info_msg)
finalise = self.exp._finalise_experiment_for_current_plugin()
# ********* transport function ***********
self._transport_post_plugin()
for data in finalise['remove'] + finalise['replace']:
# ********* transport function ***********
self._transport_terminate_dataset(data)
self.exp._reorganise_datasets(finalise)
plugin.stats_obj.stop_time()
return plugin
def _run_plugin_list_setup(self, plugin_list):
""" Run the plugin list through the framework without executing the
main processing.
"""
plugin_list._check_loaders()
self.__check_gpu()
n_loaders = self.exp.meta_data.plugin_list._get_n_loaders()
n_plugins = plugin_list._get_n_processing_plugins()
plist = plugin_list.plugin_list
# set loaders
for i in range(n_loaders):
pu.plugin_loader(self.exp, plist[i])
self.exp._set_initial_datasets()
# run all plugin setup methods and store information in experiment
# collection
count = 0
for plugin_dict in plist[n_loaders:n_loaders + n_plugins]:
self.__plugin_setup(plugin_dict, count)
count += 1
plugin_list._add_missing_savers(self.exp)
# ********* transport function ***********
self._transport_update_plugin_list()
# check added savers
for plugin_dict in plist[n_loaders + count:]:
self.__plugin_setup(plugin_dict, count)
count += 1
self.exp._reset_datasets()
self.exp._finalise_setup(plugin_list)
cu.user_message("Plugin list check complete!")
def __plugin_setup(self, plugin_dict, count):
self.exp.meta_data.set("nPlugin", count)
plugin = pu.plugin_loader(self.exp, plugin_dict, check=True)
plugin._revert_preview(plugin.get_in_datasets())
plugin_dict['cite'] = plugin.tools.get_citations()
plugin._clean_up()
self.exp._merge_out_data_to_in(plugin_dict)
def __check_gpu(self):
""" Check if the process list contains GPU processes and determine if
GPUs exists. Add GPU processes to the processes list if required."""
if not self.exp.meta_data.plugin_list._contains_gpu_processes():
return
try:
import pynvml as pv
except Exception:
logging.debug("pyNVML module not found")
raise Exception("pyNVML module not found")
try:
pv.nvmlInit()
count = int(pv.nvmlDeviceGetCount())
if count == 0:
raise Exception("No GPUs found")
logging.debug("%s GPUs have been found.", count)
if not self.exp.meta_data.get('test_state'):
for i in range(count):
handle = pv.nvmlDeviceGetHandleByIndex(i)
if pv.nvmlDeviceGetComputeRunningProcesses(handle):
raise Exception("Unfortunately, GPU %i is busy. Try \
resubmitting the job to the queue." % i)
except Exception as e:
raise Exception("Unable to run GPU plugins: %s", str(e))
self.__set_gpu_processes(count)
def __set_gpu_processes(self, count):
processes = self.exp.meta_data.get('processes')
if not [i for i in processes if 'GPU' in i]:
logging.debug("GPU processes missing. GPUs found so adding them.")
cpus = ['CPU' + str(i) for i in range(count)]
gpus = ['GPU' + str(i) for i in range(count)]
for i in range(min(count, len(processes))):
processes[processes.index(cpus[i])] = gpus[i]
self.exp.meta_data.set('processes', processes)