Source code for deode.suites.deode

"""Ecflow suites."""

from pathlib import Path

from ..os_utils import deodemakedirs
from .base import EcflowSuiteTask, SuiteDefinition
from .deode_suite_components import StaticDataFamily, TimeDependentFamily


[docs] class DeodeSuiteDefinition(SuiteDefinition): """Definition of suite for the Deode Workflow.""" def __init__( self, config, dry_run=False, ): """Construct the definition. Args: config (deode.ParsedConfig): Configuration file dry_run (bool, optional): Dry run not using ecflow. Defaults to False. Raises: ModuleNotFoundError: If ecflow is not loaded and not dry_run """ # Call the base class constructor SuiteDefinition.__init__(self, config, dry_run=dry_run) # Construct directories unix_group = self.platform.get_platform_value("unix_group") deodemakedirs(self.joboutdir, unixgroup=unix_group) # Get the default input template path input_template = ( Path(__file__).parent.resolve() / "../templates/ecflow/default.py" ) input_template = input_template.as_posix() # Set max_ecf_tasks from config max_ecf_tasks = -1 try: max_ecf_tasks = self.config["submission.max_ecf_tasks"] except KeyError: max_ecf_tasks = -1 # Add limits to suite if max_ecf_tasks > 0 and self.suite.ecf_node is not None: self.suite.ecf_node.add_limit("max_ecf_tasks", max_ecf_tasks) self.suite.ecf_node.add_inlimit( "max_ecf_tasks", f"/{self.name}", max_ecf_tasks ) # Adjust parameters depending on suite_control.mode self.do_prep = True create_static_data = config["suite_control.create_static_data"] if config["suite_control.mode"] == "restart": self.do_prep = False create_static_data = False # Construct the suite from individual ecFlow components final_cleaning_trigger = None time_dependent_trigger_node = None initial_cleaning = None if config["suite_control.do_cleaning"]: initial_cleaning = EcflowSuiteTask( "PreCleaning", self.suite, config, self.task_settings, self.ecf_files, input_template=input_template, ecf_files_remotely=self.ecf_files_remotely, ) # Update triggers for final cleaning and time dependent nodes final_cleaning_trigger = [initial_cleaning] time_dependent_trigger_node = initial_cleaning if create_static_data: static_data = StaticDataFamily( self.suite, config, self.task_settings, input_template, self.ecf_files, trigger=initial_cleaning, ecf_files_remotely=self.ecf_files_remotely, ) # Update trigger for time dependent node time_dependent_trigger_node = static_data # Prepare arguments for CollectLogs task_logs = config["system.climdir"] args = ";".join( [ f"joboutdir={self.ecf_out}/{self.name}/StaticData", "tarname=StaticData", f"task_logs={task_logs}", ] ) variables = {"ARGS": args} collect_logs = EcflowSuiteTask( "CollectLogs", self.suite, config, self.task_settings, self.ecf_files, input_template=input_template, trigger=static_data, variables=variables, ecf_files_remotely=self.ecf_files_remotely, ) # Update triggers for final cleaning node final_cleaning_trigger = [collect_logs] if config["suite_control.do_archiving"]: archive_static = EcflowSuiteTask( "ArchiveStatic", self.suite, config, self.task_settings, self.ecf_files, input_template=input_template, variables=None, trigger=collect_logs, ) # Update triggers for final cleaning node final_cleaning_trigger = [archive_static] last_time_dependent_part = None if config["suite_control.create_time_dependent_suite"]: time_dependent_family = TimeDependentFamily( self.suite, config, self.task_settings, input_template, self.ecf_files, self.ecf_out, self.name, time_dependent_trigger_node, ecf_files_remotely=self.ecf_files_remotely, do_prep=self.do_prep, dry_run=dry_run, ) # Update trigger for next time dependent node last_time_dependent_part = time_dependent_family.last_node if config["suite_control.do_cleaning"]: if last_time_dependent_part is not None: # Update triggers for final cleaning node final_cleaning_trigger.append(last_time_dependent_part) EcflowSuiteTask( "PostMortem", self.suite, config, self.task_settings, self.ecf_files, input_template=input_template, trigger=final_cleaning_trigger, ecf_files_remotely=self.ecf_files_remotely, )