"""Ecflow suites."""
from pathlib import Path
from ..os_utils import deodemakedirs
from .base import EcflowSuiteTask, SuiteDefinition
from .deode_suite_components import StaticDataFamily, TimeDependentFamily
[docs]
class DeodeSuiteDefinition(SuiteDefinition):
"""Definition of suite for the Deode Workflow."""
def __init__(
self,
config,
dry_run=False,
):
"""Construct the definition.
Args:
config (deode.ParsedConfig): Configuration file
dry_run (bool, optional): Dry run not using ecflow. Defaults to False.
Raises:
ModuleNotFoundError: If ecflow is not loaded and not dry_run
"""
# Call the base class constructor
SuiteDefinition.__init__(self, config, dry_run=dry_run)
# Construct directories
unix_group = self.platform.get_platform_value("unix_group")
deodemakedirs(self.joboutdir, unixgroup=unix_group)
# Get the default input template path
input_template = (
Path(__file__).parent.resolve() / "../templates/ecflow/default.py"
)
input_template = input_template.as_posix()
# Set max_ecf_tasks from config
max_ecf_tasks = -1
try:
max_ecf_tasks = self.config["submission.max_ecf_tasks"]
except KeyError:
max_ecf_tasks = -1
# Add limits to suite
if max_ecf_tasks > 0 and self.suite.ecf_node is not None:
self.suite.ecf_node.add_limit("max_ecf_tasks", max_ecf_tasks)
self.suite.ecf_node.add_inlimit(
"max_ecf_tasks", f"/{self.name}", max_ecf_tasks
)
# Adjust parameters depending on suite_control.mode
self.do_prep = True
create_static_data = config["suite_control.create_static_data"]
if config["suite_control.mode"] == "restart":
self.do_prep = False
create_static_data = False
# Construct the suite from individual ecFlow components
final_cleaning_trigger = None
time_dependent_trigger_node = None
initial_cleaning = None
if config["suite_control.do_cleaning"]:
initial_cleaning = EcflowSuiteTask(
"PreCleaning",
self.suite,
config,
self.task_settings,
self.ecf_files,
input_template=input_template,
ecf_files_remotely=self.ecf_files_remotely,
)
# Update triggers for final cleaning and time dependent nodes
final_cleaning_trigger = [initial_cleaning]
time_dependent_trigger_node = initial_cleaning
if create_static_data:
static_data = StaticDataFamily(
self.suite,
config,
self.task_settings,
input_template,
self.ecf_files,
trigger=initial_cleaning,
ecf_files_remotely=self.ecf_files_remotely,
)
# Update trigger for time dependent node
time_dependent_trigger_node = static_data
# Prepare arguments for CollectLogs
task_logs = config["system.climdir"]
args = ";".join(
[
f"joboutdir={self.ecf_out}/{self.name}/StaticData",
"tarname=StaticData",
f"task_logs={task_logs}",
]
)
variables = {"ARGS": args}
collect_logs = EcflowSuiteTask(
"CollectLogs",
self.suite,
config,
self.task_settings,
self.ecf_files,
input_template=input_template,
trigger=static_data,
variables=variables,
ecf_files_remotely=self.ecf_files_remotely,
)
# Update triggers for final cleaning node
final_cleaning_trigger = [collect_logs]
if config["suite_control.do_archiving"]:
archive_static = EcflowSuiteTask(
"ArchiveStatic",
self.suite,
config,
self.task_settings,
self.ecf_files,
input_template=input_template,
variables=None,
trigger=collect_logs,
)
# Update triggers for final cleaning node
final_cleaning_trigger = [archive_static]
last_time_dependent_part = None
if config["suite_control.create_time_dependent_suite"]:
time_dependent_family = TimeDependentFamily(
self.suite,
config,
self.task_settings,
input_template,
self.ecf_files,
self.ecf_out,
self.name,
time_dependent_trigger_node,
ecf_files_remotely=self.ecf_files_remotely,
do_prep=self.do_prep,
dry_run=dry_run,
)
# Update trigger for next time dependent node
last_time_dependent_part = time_dependent_family.last_node
if config["suite_control.do_cleaning"]:
if last_time_dependent_part is not None:
# Update triggers for final cleaning node
final_cleaning_trigger.append(last_time_dependent_part)
EcflowSuiteTask(
"PostMortem",
self.suite,
config,
self.task_settings,
self.ecf_files,
input_template=input_template,
trigger=final_cleaning_trigger,
ecf_files_remotely=self.ecf_files_remotely,
)