"""Ecflow suites base class."""
import os
from typing import List, Optional, Union
from ..logs import LogDefaults, logger
from ..submission import TaskSettings
from ..toolbox import Platform
try:
import ecflow
except ImportError:
ecflow = None
def _get_name(cname, cls, attrname="__plugin_name__"):
"""Get name.
Args:
cname (_type_): cname
cls (_type_): cls
attrname (str, optional): _description_. Defaults to "__plugin_name__".
Returns:
_type_: Name
"""
# __dict__ vs. getattr: do not inherit the attribute from a parent class
name = getattr(cls, "__dict__", {}).get(attrname, None)
if name is not None:
return name
name = cname.lower()
return name
[docs]
class SuiteDefinition(object):
"""Definition of suite."""
def __init__(self, config, dry_run=False):
"""Construct the definition.
Args:
config (ParsedConfig): Parsed configuration
dry_run (bool, optional): Dry run not using ecflow. Defaults to False.
Raises:
ModuleNotFoundError: If ecflow is not loaded and not dry_run
"""
if ecflow is None and not dry_run:
raise ModuleNotFoundError("Ecflow not found")
self.task_settings = TaskSettings(config)
self.name = config["general.case"]
self.config = config
self.platform = Platform(config)
self.config = config
ecf_out = self.config["scheduler.ecfvars.ecf_out"]
ecf_files = self.config["scheduler.ecfvars.ecf_files"]
ecf_user = self.config["scheduler.ecfvars.ecf_user"]
joboutdir = self.config["scheduler.ecfvars.ecf_jobout"]
ecf_files_remotely = self.config["scheduler.ecfvars.ecf_files_remotely"]
ecf_home = self.config["scheduler.ecfvars.ecf_home"]
ecf_ssl = self.config["scheduler.ecfvars.ecf_ssl"]
ecf_host = self.config["scheduler.ecfvars.ecf_host"]
ecf_tries = self.config["scheduler.ecfvars.ecf_tries"]
self.ecf_user = ecf_user
self.ecf_host = ecf_host
self.ecf_ssl = ecf_ssl
self.joboutdir = joboutdir
try:
ecf_include = self.config["scheduler.ecfvars.ecf_include"]
except KeyError:
ecf_include = ecf_files
self.ecf_include = ecf_include
self.ecf_files = ecf_files
if ecf_home is None:
ecf_home = joboutdir
self.ecf_home = ecf_home
if ecf_out is None:
ecf_out = joboutdir
self.ecf_out = ecf_out
self.ecf_micro = "%"
ecf_jobout = (
joboutdir
+ f"/{self.ecf_micro}ECF_NAME{self.ecf_micro}."
+ f"{self.ecf_micro}ECF_TRYNO{self.ecf_micro}"
)
self.ecf_jobout = ecf_jobout
self.ecf_files_remotely = ecf_files_remotely
if ecf_files_remotely is None:
self.ecf_files_remotely = self.ecf_files
# Commands started from the scheduler does not have full environment
ecf_job_cmd = (
f"{self.ecf_micro}TROIKA{self.ecf_micro} "
f"-c {self.ecf_micro}TROIKA_CONFIG{self.ecf_micro} submit "
f"-o {self.ecf_micro}ECF_JOBOUT{self.ecf_micro} "
f"{self.ecf_micro}SCHOST{self.ecf_micro} "
f"{self.ecf_micro}ECF_JOB{self.ecf_micro}"
)
# %ECF_JOB%"
self.ecf_job_cmd = ecf_job_cmd
ecf_status_cmd = (
f"{self.ecf_micro}TROIKA{self.ecf_micro} "
f"-c {self.ecf_micro}TROIKA_CONFIG{self.ecf_micro} monitor "
f"{self.ecf_micro}SCHOST{self.ecf_micro} "
f"{self.ecf_micro}ECF_JOB{self.ecf_micro}"
)
self.ecf_status_cmd = ecf_status_cmd
ecf_kill_cmd = (
f"{self.ecf_micro}TROIKA{self.ecf_micro} "
f"-vv -c {self.ecf_micro}TROIKA_CONFIG{self.ecf_micro} kill "
f"{self.ecf_micro}SCHOST{self.ecf_micro} "
f"{self.ecf_micro}ECF_JOB{self.ecf_micro}"
)
self.ecf_kill_cmd = ecf_kill_cmd
platform = Platform(config)
try:
troika = platform.substitute(config["troika.troika"])
except KeyError:
troika = "troika"
if troika is None:
troika = "troika"
troika_config = config["troika.config_file"]
config_file = config.metadata["source_file_path"]
deode_home = platform.get_platform_value("DEODE_HOME")
keep_workdirs = "1" if config["general.keep_workdirs"] else "0"
loglevel = config.get("general.loglevel", LogDefaults.LEVEL).upper()
starttime = config.get("general.times.start")
variables = {
"ECF_USER": self.ecf_user,
"ECFTYPES": "fc",
"ECF_EXTN": ".bash",
"ECF_TRIES": ecf_tries,
"ECF_FILES": self.ecf_files_remotely,
"ECF_INCLUDE": self.ecf_include,
"ECF_SSL": self.ecf_ssl,
"ECF_HOME": self.ecf_home,
"ECF_KILL_CMD": self.ecf_kill_cmd,
"ECF_JOB_CMD": self.ecf_job_cmd,
"ECF_STATUS_CMD": self.ecf_status_cmd,
"ECF_OUT": self.ecf_out,
"ECF_JOBOUT": self.ecf_jobout,
"ECF_TIMEOUT": 20,
"ECF_LOGHOST": self.ecf_host,
"ARGS": "",
"LOGLEVEL": loglevel,
"CONFIG": str(config_file),
"TROIKA": troika,
"TROIKA_CONFIG": troika_config,
"BASETIME": starttime,
"VALIDTIME": starttime,
"DEODE_HOME": deode_home,
"NPROC": "",
"NPROC_IO": "",
"NPROCX": "",
"NPROCY": "",
"KEEP_WORKDIRS": keep_workdirs,
}
self.suite = EcflowSuite(
self.name,
ecf_files,
variables=variables,
dry_run=dry_run,
ecf_files_remotely=self.ecf_files_remotely,
)
[docs]
def save_as_defs(self, def_file):
"""Save definition file.
Args:
def_file (str): Name of definition file
"""
logger.debug("Saving def file {}", def_file)
self.suite.save_as_defs(def_file)
[docs]
class EcflowNode:
"""A Node class is the abstract base class for Suite, Family and Task.
Every Node instance has a name, and a path relative to a suite.
The trigger of a node can either be another EcflowNode, a list of EcflowNode
objects or an EcflowSuiteTriggers object.
"""
def __init__(
self,
name,
node_type,
parent,
ecf_files,
variables=None,
trigger: Optional[
Union["EcflowSuiteTriggers", List["EcflowNode"], "EcflowNode"]
] = None,
def_status=None,
ecf_files_remotely=None,
cron=None,
limit=None,
):
"""Construct the EcflowNode.
Args:
name (str): Name of node
node_type (str): Node type
parent (EcflowNode): Parent node
ecf_files (str): Location of ecf files
variables (dict, optional): Variables to map. Defaults to None
trigger (Union[EcflowSuiteTriggers, List[EcflowNode], EcflowNode]):
Trigger. Defaults to None
def_status (str, ecflow.Defstatus): Def status. Defaults to None
ecf_files_remotely(str, optional): Remote file prefix
cron (EcflowSuiteCron): Cron. Defauts to None
limit (EcflowSuiteLimit): Limit. Defaults to None
Raises:
NotImplementedError: Node type not implemented
TypeError: "Triggers must be an EcflowSuiteTriggers object"
TypeError: "defstatus must be either str or an ecflow.Defstatus object"
"""
self.name = name
self.node_type = node_type
has_node = True
if parent is None:
has_node = False
if (
has_node
and node_type != "suite"
and hasattr(parent, "ecf_node")
and parent.ecf_node is None
):
has_node = False
if not has_node:
self.ecf_node = None
path = ""
else:
if self.node_type == "family":
self.ecf_node = parent.ecf_node.add_family(self.name)
elif self.node_type == "task":
self.ecf_node = parent.ecf_node.add_task(self.name)
elif self.node_type == "suite":
self.ecf_node = parent.add_suite(self.name)
else:
raise NotImplementedError
path = self.ecf_node.get_abs_node_path()
self.path = path
logger.debug("path={} ecf_files_remotely={}", self.path, ecf_files_remotely)
if ecf_files_remotely is None:
ecf_files_remotely = ecf_files
self.ecf_local_container_path = ecf_files + self.path
self.ecf_remote_container_path = ecf_files_remotely + self.path
logger.debug(
"path={} local_container={} remote_container={}",
self.path,
self.ecf_local_container_path,
self.ecf_remote_container_path,
)
if variables is not None:
for key, value in variables.items():
logger.debug("key={} value={}", key, value)
if self.ecf_node is not None:
self.ecf_node.add_variable(key, value)
if trigger is not None:
# Add trigger to the node from an EcflowSuiteTriggers object
if isinstance(trigger, EcflowSuiteTriggers):
if trigger.trigger_string is not None:
if self.ecf_node is not None:
self.ecf_node.add_trigger(trigger.trigger_string)
else:
logger.warning("Empty trigger")
elif isinstance(trigger, list):
if len(trigger) == 0:
logger.warning("Empty trigger list")
# Resolve the trigger list into an EcflowSuiteTriggers object,
# and add the trigger string to the node
elif all(isinstance(node, EcflowNode) for node in trigger):
trigger = EcflowSuiteTriggers(
[EcflowSuiteTrigger(node) for node in trigger]
)
if trigger.trigger_string is not None and self.ecf_node is not None:
self.ecf_node.add_trigger(trigger.trigger_string)
else:
raise TypeError(
"When parsing a list of trigger, the "
+ "triggers must be EcflowNode objects"
)
# Turn the EcflowNode trigger into an EcflowSuiteTriggers object
# and add the trigger string to the node
elif isinstance(trigger, EcflowNode):
trigger = EcflowSuiteTriggers([EcflowSuiteTrigger(trigger)])
if trigger.trigger_string is not None and self.ecf_node is not None:
self.ecf_node.add_trigger(trigger.trigger_string)
else:
raise TypeError(
"Triggers must be an EcflowSuiteTriggers, List[EcflowNode]"
+ " or a EcflowNode object"
)
self.trigger = trigger
if cron is not None:
if isinstance(cron, EcflowSuiteCron):
self.ecf_node.add_cron(cron.cron)
else:
raise TypeError("Cron must be an EcflowSuiteCron object")
if limit is not None:
if isinstance(limit, EcflowSuiteLimit):
if self.node_type == "family":
self.ecf_node.add_inlimit(limit.limit_name)
else:
raise NotImplementedError(
"Limit is implemented only for Suite and Family!"
)
else:
raise TypeError("Limit should be an EcflowSuitenLimit object")
if def_status is not None and self.ecf_node is not None:
if isinstance(def_status, str):
self.ecf_node.add_defstatus(ecflow.Defstatus(def_status))
elif isinstance(def_status, ecflow.Defstatus):
self.ecf_node.add_defstatus(def_status)
else:
raise TypeError(
"defstatus must be either str or an ecflow.Defstatus object"
)
[docs]
class EcflowNodeContainer(EcflowNode):
"""Ecflow node container."""
def __init__(
self,
name,
node_type,
parent,
ecf_files,
variables=None,
trigger=None,
def_status=None,
ecf_files_remotely=None,
cron=None,
limit=None,
):
"""Construct EcflowNodeContainer.
Args:
name (str): Name of the node container.
node_type (str): What kind of node.
parent (EcflowNode): Parent to this node.
ecf_files (str): Location of ecf files
variables (dict, optional): Variables to map. Defaults to None
trigger (EcflowSuiteTriggers): Trigger. Defaults to None
def_status (str, ecflow.Defstatus): Def status. Defaults to None
ecf_files_remotely(str, optional): ECF_FILES on ecflow server
cron (EcflowSuiteCron): Cron. Defauts to None
limit (EcflowSuiteLimit): Limit. Default None
"""
EcflowNode.__init__(
self,
name,
node_type,
parent,
variables=variables,
ecf_files=ecf_files,
trigger=trigger,
def_status=def_status,
ecf_files_remotely=ecf_files_remotely,
cron=cron,
limit=limit,
)
[docs]
class EcflowSuite(EcflowNodeContainer):
"""EcflowSuite."""
def __init__(
self,
name,
ecf_files,
variables=None,
dry_run=False,
def_status=None,
ecf_files_remotely=None,
):
"""Construct the Ecflow suite.
Args:
name (str): Name of suite
ecf_files (str): Location of ecf files
variables (dict, optional): Variables to map. Defaults to None
dry_run (bool, optional): Dry run not using ecflow. Defaults to False.
def_status (str, ecflow.Defstatus): Def status. Defaults to None
ecf_files_remotely(str, optional): ECF_FILES on ecflow server
"""
if dry_run:
self.defs = None
else:
self.defs = ecflow.Defs({})
EcflowNodeContainer.__init__(
self,
name,
"suite",
self.defs,
ecf_files,
variables=variables,
def_status=def_status,
ecf_files_remotely=ecf_files_remotely,
)
[docs]
def save_as_defs(self, def_file):
"""Save defintion file.
Args:
def_file (str): Name of the definition file.
"""
if self.defs is not None:
self.defs.save_as_defs(def_file)
logger.info("def file saved to {}", def_file)
[docs]
class EcflowSuiteFamily(EcflowNodeContainer):
"""A family in ecflow."""
def __init__(
self,
name,
parent,
ecf_files,
variables=None,
trigger=None,
def_status=None,
ecf_files_remotely=None,
cron=None,
limit=None,
):
"""Construct the family.
Args:
name (str): Name of the family.
parent (EcflowNodeContainer): Parent node.
ecf_files (str): Location of ecf files
variables (dict, optional): Variables to map. Defaults to None
trigger (EcflowSuiteTriggers): Trigger. Defaults to None
def_status (str, ecflow.Defstatus): Def status. Defaults to None
ecf_files_remotely(str, optional): ECF_FILES on ecflow server
cron (EcflowSuiteCron): Cron. Defaut None
limit (EcflowSuiteLimit): Limit. Default None
"""
EcflowNodeContainer.__init__(
self,
name,
"family",
parent,
ecf_files,
variables=variables,
trigger=trigger,
def_status=def_status,
ecf_files_remotely=ecf_files_remotely,
cron=cron,
limit=limit,
)
logger.debug(self.ecf_remote_container_path)
if self.ecf_node is not None:
self.ecf_node.add_variable("ECF_FILES", self.ecf_remote_container_path)
[docs]
class EcflowSuiteTask(EcflowNode):
"""A task in an ecflow suite/family."""
def __init__(
self,
name,
parent,
config,
task_settings,
ecf_files,
input_template=None,
parse=True,
variables=None,
ecf_micro="%",
trigger=None,
def_status=None,
ecf_files_remotely=None,
cron=None,
):
"""Constuct the EcflowSuiteTask.
Args:
name (str): Name of task
parent (EcflowNode): Parent node.
ecf_files (str): Path to ecflow containers
task_settings (TaskSettings): Submission configuration
config (deode.ParsedConfig): Configuration file
task_settings (deode.TaskSettings): Task settings
input_template(str, optional): Input template
parse (bool, optional): To parse template file or not
variables (dict, optional): Variables to map. Defaults to None
ecf_micro (str, optional): ECF_MICRO. Defaults to %
trigger (EcflowSuiteTriggers): Trigger. Defaults to None
def_status (str, ecflow.Defstatus): Def status. Defaults to None
ecf_files_remotely(str, optional): ECF_FILES on ecflow server
cron (EcflowSuiteCron): Cron. Defaut None
Raises:
ValueError: If input template is to be parsed but it is not passed.
FileNotFoundError: If the task container is not found.
"""
EcflowNode.__init__(
self,
name,
"task",
parent,
ecf_files,
variables=variables,
trigger=trigger,
def_status=def_status,
ecf_files_remotely=ecf_files_remotely,
cron=cron,
)
logger.debug(parent.path)
logger.debug(parent.ecf_local_container_path)
task_container = parent.ecf_local_container_path + "/" + name + ".bash"
if parse:
if input_template is None:
raise ValueError("Must pass input template if it is to be parsed")
variables = task_settings.get_settings(name)
logger.debug("vars {}", variables)
for var, value_ in variables.items():
value = value_
if isinstance(value, int):
value = str(value)
logger.debug("var={} value={}", var, value)
if self.ecf_node is not None:
self.ecf_node.add_variable(var, value)
task_settings.parse_job(
task=name,
config=config,
input_template_job=input_template,
task_job=task_container,
variables=variables,
ecf_micro=ecf_micro,
)
elif not os.path.exists(task_container):
raise FileNotFoundError(f"Container {task_container} is missing!")
[docs]
class EcflowSuiteTriggers:
"""Triggers to an ecflow suite."""
def __init__(self, triggers, mode="AND"):
"""Construct EcflowSuiteTriggers.
Args:
triggers (list): List of EcflowSuiteTrigger objects.
mode (str, optional): Cat mode. Defaults to "AND".
"""
trigger_string = self.create_string(triggers, mode)
self.trigger_string = trigger_string
[docs]
@staticmethod
def create_string(triggers, mode):
"""Create the trigger string.
Args:
triggers (list): List of trigger objects
mode (str): Concatenation type.
Raises:
ValueError: If there are no triggers to be processed
TypeError: If trigger is not an EcflowSuiteTrigger object
Returns:
str: The trigger string based on trigger objects.
"""
if not isinstance(triggers, list):
triggers = [triggers]
if len(triggers) == 0:
raise ValueError("No triggers to be processed")
trigger_string = "("
first = True
for trigger in triggers:
if trigger is not None:
cat = ""
if not first:
cat = " " + mode + " "
if isinstance(trigger, EcflowSuiteTriggers):
trigger_string = trigger_string + cat + trigger.trigger_string
elif isinstance(trigger, EcflowSuiteTrigger):
trigger_string = (
trigger_string + cat + trigger.node.path + " == " + trigger.mode
)
else:
raise TypeError("Trigger must be an EcflowSuiteTrigger object")
first = False
trigger_string = trigger_string + ")"
# If no triggers were found/set
if first:
trigger_string = None
return trigger_string
[docs]
def add_triggers(self, triggers, mode="AND"):
"""Add triggers.
Args:
triggers (EcflowSuiteTriggers): The triggers
mode (str, optional): Cat mode. Defaults to "AND".
"""
cat_string = " " + mode + " "
trigger_string = self.create_string(triggers, mode)
if trigger_string is not None:
self.trigger_string = self.trigger_string + cat_string + trigger_string
[docs]
class EcflowSuiteTrigger:
"""EcFlow Trigger in a suite."""
def __init__(self, node, mode="complete"):
"""Create a EcFlow trigger object.
Args:
node (scheduler.EcflowNode): The node to trigger on
mode (str, optional): Trigger type. Defaults to "complete".
"""
self.node = node
self.mode = mode
[docs]
class EcflowSuiteCron:
"""EcFlow Cron in a suite."""
def __init__(self, days_of_week, time):
"""Create a EcFlow cron object.
Args:
days_of_week (list of int): 0-6, Sunday-Saturday
time (datatime): time to start
"""
time_str = time.strftime("%H:%M")
days_of_week = list(days_of_week)
logger.info("days: {}, time: {}", days_of_week, time_str)
self.cron = ecflow.Cron(time_str, days_of_week=days_of_week)
[docs]
class EcflowSuiteLimit:
"""Ecflow limit for active jobs in family."""
def __init__(self, limit_name, max_jobs):
"""Create a EcFlow limit object.
Args:
limit_name (str): Name of the limit
max_jobs (int): number of maximal active jobs
"""
self.limit_name = limit_name
self.max_jobs = max_jobs