"""Module to handle submissions."""
import collections.abc
import os
import subprocess
import sys
from .derived_variables import derived_variables
from .logs import logger
from .os_utils import deodemakedirs
from .plugin import DeodePluginRegistryFromConfig
from .tasks.discover_task import available_tasks
from .toolbox import FileManager, Platform
[docs]
class ProcessorLayout:
"""Set processor information."""
def __init__(self, kwargs: dict):
"""Construct object.
Use integers internally.
Args:
kwargs(dict): Processor information
"""
self.wrapper = kwargs.get("WRAPPER")
self.nproc = kwargs.get("NPROC")
if self.nproc == "":
self.nproc = None
if isinstance(self.nproc, str):
self.nproc = int(self.nproc)
self.nproc_io = kwargs.get("NPROC_IO")
if self.nproc_io == "":
self.nproc_io = None
if isinstance(self.nproc_io, str):
self.nproc_io = int(self.nproc_io)
self.nprocx = kwargs.get("NPROCX")
if self.nprocx == "":
self.nprocx = None
if isinstance(self.nprocx, str):
self.nprocx = int(self.nprocx)
self.nprocy = kwargs.get("NPROCY")
if self.nprocy == "":
self.nprocy = None
if isinstance(self.nprocy, str):
self.nprocy = int(self.nprocy)
[docs]
def get_proc_dict(self):
"""Generate a processor dict."""
procs = {}
nproc = self.nproc
nproc_io = self.nproc_io
nprocx = self.nprocx
nprocy = self.nprocy
procs.update(
{"nproc": nproc, "nproc_io": nproc_io, "nprocx": nprocx, "nprocy": nprocy}
)
return procs
[docs]
def get_wrapper(self):
"""Get and potentially parse the wrapper."""
wrapper = self.wrapper
if wrapper is not None and self.nproc is not None:
nproc = self.nproc
if not isinstance(nproc, str):
nproc = str(nproc)
wrapper = wrapper.replace("@NPROC@", nproc)
return wrapper
[docs]
class TaskSettings(object):
"""Set the task specific setttings."""
def __init__(self, config):
"""Construct the task specific settings.
Args:
config(deode.ParserdConfig): Configuration
"""
self.config = config
self.submission_defs = self.config["submission"].dict()
self.job_type = None
self.processor_layout = None
self.fmanager = FileManager(self.config)
self.platform = self.fmanager.platform
self.unix_group = self.platform.get_value("platform.unix_group")
[docs]
@staticmethod
def update_task_setting(dic, upd):
"""Update task settings dictionary.
Args:
dic (dict): Dictionary to update
upd (dict): Values to update
Returns:
dict: updated dictionary
"""
for key, val in upd.items():
if isinstance(val, collections.abc.Mapping):
dic[key] = TaskSettings.update_task_setting(dic.get(key, {}), val)
else:
logger.debug("key={} value={}", key, val)
dic[key] = val
return dic
[docs]
def parse_submission_defs(self, task):
"""Parse the submssion definitions.
Args:
task (str): The name of the task
Returns:
dict: Parsed settings
Raises:
RuntimeError: Undefined submit type
"""
task_settings = {"BATCH": {}, "ENV": {}, "MODULES": {}}
all_defs = self.submission_defs
try:
all_types = all_defs["types"]
except KeyError:
all_types = {}
submit_types = list(all_types)
default_submit_type = all_defs["default_submit_type"]
if default_submit_type not in submit_types:
raise RuntimeError(
f"Default submit type: {default_submit_type} is not defined"
)
logger.debug("default_submit_type={}", default_submit_type)
task_submit_type = None
for s_t in submit_types:
if s_t in all_types and "tasks" in all_types[s_t]:
for tname in all_types[s_t]["tasks"]:
if tname == task:
task_submit_type = s_t
if task_submit_type is None:
task_submit_type = default_submit_type
logger.debug("task_submit_type for task {}: {}", task, task_submit_type)
# Update task_settings
task_settings = self.update_task_setting(
task_settings, all_types[task_submit_type]
)
if (
"BATCH" in task_settings
and "NAME" in task_settings["BATCH"]
and "@TASK_NAME@" in task_settings["BATCH"]["NAME"]
):
task_settings["BATCH"]["NAME"] = task_settings["BATCH"]["NAME"].replace(
"@TASK_NAME@", task
)
if "task_exceptions" in all_defs and task in all_defs["task_exceptions"]:
logger.debug("Task task_exceptions for task {}", task)
task_settings = self.update_task_setting(
task_settings, all_defs["task_exceptions"][task]
)
if "SCHOST" in task_settings:
self.job_type = task_settings["SCHOST"]
# Set task specific processor layout
self.processor_layout = ProcessorLayout(task_settings)
logger.debug("Task settings for task {}: {}", task, task_settings)
return task_settings
[docs]
def get_task_settings(self, task, key=None, variables=None, ecf_micro="%"):
"""Get task settings.
Args:
task (_type_): _description_
key (_type_, optional): _description_. Defaults to None.
variables (_type_, optional): _description_. Defaults to None.
ecf_micro (str, optional): _description_. Defaults to "%".
Returns:
_type_: _description_
"""
task_settings = self.parse_submission_defs(task)
if key is None:
return task_settings
if key in task_settings:
m_task_settings = {}
logger.debug(type(task_settings[key]))
if isinstance(task_settings[key], dict):
for setting, value_ in task_settings[key].items():
value = value_
logger.debug("{} {} variables: {}", setting, value, variables)
if variables is not None and setting in variables:
value = f"{ecf_micro}{setting}{ecf_micro}"
logger.debug(value)
if isinstance(value, str):
value = value.replace("@TASK_NAME@", task)
m_task_settings.update({setting: value})
logger.debug(m_task_settings)
return m_task_settings
value = task_settings[key]
if variables is not None and key in variables:
value = f"{ecf_micro}{variables[key]}{ecf_micro}"
return value
return None
[docs]
def recursive_items(self, dictionary):
"""Recursive loop of dict.
Args:
dictionary (_type_): _description_
Yields:
_type_: _description_
"""
for key, value in dictionary.items():
if isinstance(value, dict):
yield (key, value)
yield from self.recursive_items(value)
else:
yield (key, value)
[docs]
def get_settings(self, task):
"""Get the settings.
Args:
task (_type_): _description_
Returns:
_type_: _description_
"""
settings = {}
task_settings = self.parse_submission_defs(task)
keys = []
for key, value in self.recursive_items(task_settings):
if isinstance(value, (int, str)):
logger.debug(key)
keys.append(key)
logger.debug(keys)
for key, value in self.recursive_items(task_settings):
logger.debug("key={} value={}", key, value)
if key in keys:
logger.debug("update {} {}", key, value)
settings.update({key: value})
return settings
[docs]
def parse_job(
self,
task,
config,
input_template_job,
task_job,
variables=None,
ecf_micro="%",
scheduler="ecflow",
):
"""Read default job and change interpretor.
Args:
task (str): Task name
config (deode.config): The configuration
input_template_job (str): Input container template.
task_job (str): Task container
variables (_type_, optional): _description_. Defaults to None.
ecf_micro (str, optional): _description_.
scheduler (str, optional): Scheduler. Defaults to ecflow.
Raises:
RuntimeError: In case of missing module env file
"""
interpreter = self.get_task_settings(task, "INTERPRETER")
logger.debug(interpreter)
if interpreter is None:
interpreter = f"{sys.executable}"
logger.debug(interpreter)
dir_name = os.path.dirname(os.path.realpath(task_job))
deodemakedirs(dir_name, unixgroup=self.unix_group)
with open(task_job, mode="w", encoding="utf-8") as file_handler:
file_handler.write("#!/bin/bash\n")
# Batch settings
batch_settings = self.get_task_settings(
task, "BATCH", variables=variables, ecf_micro=ecf_micro
)
# Add account if not set
if "account" not in batch_settings and "account" in config["submission"]:
batch_settings["ACCOUNT"] = config["submission.account"]
logger.debug("batch settings {}", batch_settings)
for b_setting in batch_settings.values():
file_handler.write(f"{b_setting}\n")
if scheduler is not None and scheduler == "ecflow":
ecf_vars = [
"ECF_HOST",
"ECF_PORT",
"ECF_NAME",
"ECF_PASS",
"ECF_TRYNO",
"ECF_RID",
"ECF_TIMEOUT",
"BASETIME",
"VALIDTIME",
"LOGLEVEL",
"ARGS",
"WRAPPER",
"NPROC",
"NPROC_IO",
"NPROCX",
"NPROCY",
"CONFIG",
"DEODE_HOME",
"KEEP_WORKDIRS",
]
for ecf_var in ecf_vars:
file_handler.write(f'export {ecf_var}="%{ecf_var}%"\n')
# Module settings
module_settings = self.get_task_settings(
task, "MODULES", variables=variables, ecf_micro=ecf_micro
)
logger.debug("module settings {}", module_settings)
if module_settings is not None and len(module_settings) > 0:
env_file = (
f"{self.submission_defs['module_initfile']}"
if "module_initfile" in self.submission_defs
else f"{self.submission_defs['module_initpath']}/bash"
)
if not os.path.isfile(env_file):
raise RuntimeError(
f"Environment file {env_file} is not a file or does not exists"
)
file_handler.write(f". {env_file}\n")
for key in module_settings.values():
# Skip empty sections
if len(key) == 0:
continue
if len(key) < 2 or len(key) > 3:
raise RuntimeError(f"Module command has the wrong length:{key}")
cmd = "module " + " ".join([f"{x}" for x in key])
file_handler.write(f"{cmd}\n")
# Environment settings
env_settings = self.get_task_settings(
task, "ENV", variables=variables, ecf_micro=ecf_micro
)
logger.debug(env_settings)
for key, val in env_settings.items():
file_handler.write(f'export {key}="{val}"\n')
if scheduler is None:
file_handler.write(f'export STAND_ALONE_TASK_NAME="{task}"\n')
platform = Platform(config)
deode_home = platform.get_platform_value("DEODE_HOME")
file_handler.write(f'export STAND_ALONE_DEODE_HOME="{deode_home}"\n')
config_file = config.metadata["source_file_path"]
file_handler.write(f'export STAND_ALONE_TASK_CONFIG="{config_file!s}"\n')
file_handler.write(f"{interpreter} {input_template_job} || exit 1\n")
# Make file executable for user
os.chmod(task_job, 0o744)
[docs]
class NoSchedulerSubmission:
"""Create and submit job without a scheduler."""
def __init__(self, task_settings):
"""Construct the task specific settings.
Args:
task_settings (dict): Submission definitions
"""
self.task_settings = task_settings
[docs]
def submit(self, task, config, template_job, task_job, output, troika="troika"):
"""Submit task.
Args:
task (str): Task name
config (deode.ParsedConfig): Config
template_job (str): Task template job file
task_job (str): Task job file
output(str): Output file
troika (str, optional): troika binary. Defaults to "troika".
Raises:
RuntimeError: Submission failure.
"""
reg = DeodePluginRegistryFromConfig(config)
known_tasks = available_tasks(reg)
name = task.lower()
if name not in known_tasks:
raise NotImplementedError(f"Task {name} not implemented")
platform = Platform(config)
# Update dervived variables (nproc etc)
settings = self.task_settings.get_settings(task)
processor_layout = ProcessorLayout(settings)
update = derived_variables(config, processor_layout=processor_layout)
config = config.copy(update=update)
troika_config = platform.get_value("troika.config_file")
self.task_settings.parse_job(
task=task,
config=config,
input_template_job=template_job,
task_job=task_job,
scheduler=None,
)
cmd = (
f"{troika} -c {troika_config} submit {self.task_settings.job_type} "
f"{task_job} -o {output}"
)
try:
subprocess.check_call(cmd.split()) # noqa S603
except subprocess.CalledProcessError as exc:
raise RuntimeError(f"Submission failed with {exc!r}") from exc