Source code for deode.templates.ecflow.default

"""Default ecflow container."""

import os

from deode.config_parser import ConfigParserDefaults, ParsedConfig
from deode.derived_variables import derived_variables
from deode.host_actions import DeodeHost
from deode.logs import LogDefaults, LoggerHandlers, logger
from deode.scheduler import EcflowClient, EcflowServer, EcflowTask
from deode.submission import ProcessorLayout
from deode.tasks.discover_task import get_task

logger.enable("deode")


[docs] def parse_ecflow_vars(): """Parse the ecflow variables.""" return { "ECF_HOST": os.environ["ECF_HOST"], "ECF_PORT": os.environ["ECF_PORT"], "ECF_NAME": os.environ["ECF_NAME"], "ECF_PASS": os.environ["ECF_PASS"], "ECF_TRYNO": os.environ["ECF_TRYNO"], "ECF_RID": os.environ["ECF_RID"], "ECF_TIMEOUT": os.environ["ECF_TIMEOUT"], "BASETIME": os.environ["BASETIME"], "VALIDTIME": os.environ["VALIDTIME"], "LOGLEVEL": os.environ["LOGLEVEL"], "ARGS": os.environ["ARGS"], "WRAPPER": os.environ["WRAPPER"], "NPROC": os.environ["NPROC"], "NPROC_IO": os.environ["NPROC_IO"], "NPROCX": os.environ["NPROCX"], "NPROCY": os.environ["NPROCY"], "CONFIG": os.environ["CONFIG"], "DEODE_HOME": os.environ["DEODE_HOME"], "KEEP_WORKDIRS": os.environ["KEEP_WORKDIRS"], }
[docs] def default_main(**kwargs): """Ecflow container default method.""" config = kwargs.get("CONFIG") deode_host = DeodeHost().detect_deode_host() config = ParsedConfig.from_file( config, json_schema=ConfigParserDefaults.MAIN_CONFIG_JSON_SCHEMA, host=deode_host ) # Reset loglevel according to (in order of priority): # (a) Configs in ECFLOW UI # (b) What was originally set in the config file # (c) The default `LogDefaults.level` if none of the above is found. loglevel = kwargs.get("LOGLEVEL", config.get("loglevel", LogDefaults.LEVEL)).upper() logger.configure(handlers=LoggerHandlers(default_level=loglevel)) args = kwargs.get("ARGS") args_dict = {} if args != "": for arg in args.split(";"): parts = arg.split("=") if len(parts) == 2: args_dict.update({parts[0]: parts[1]}) else: logger.warning("Could not convert ARGS:{} to dict, skip it", arg) # Update config based on ecflow settings config = config.copy( update={ "submission": {"task": {"wrapper": kwargs.get("WRAPPER")}}, "task": {"args": args_dict}, "general": { "times": { "validtime": kwargs.get("VALIDTIME"), "basetime": kwargs.get("BASETIME"), }, "keep_workdirs": bool(int(kwargs.get("KEEP_WORKDIRS"))), "loglevel": loglevel, }, "platform": {"deode_home": kwargs.get("DEODE_HOME")}, } ) # TODO Add wrapper server = EcflowServer(config) ecf_name = kwargs.get("ECF_NAME") ecf_pass = kwargs.get("ECF_PASS") ecf_tryno = kwargs.get("ECF_TRYNO") ecf_rid = kwargs.get("ECF_RID") ecf_timeout = kwargs.get("ECF_TIMEOUT") task = EcflowTask(ecf_name, ecf_tryno, ecf_pass, ecf_rid, ecf_timeout=ecf_timeout) # This will also handle call to sys.exit(), i.e. Client.__exit__ will still be called. with EcflowClient(server, task): processor_layout = ProcessorLayout(kwargs) update = derived_variables(config, processor_layout=processor_layout) config = config.copy(update=update) # TODO Add wrapper to config logger.info("Running task {}", task.ecf_name) get_task(task.ecf_task, config).run() logger.info("Finished task {}", task.ecf_name)
if __name__ == "__main__": # Get ecflow variables kwargs_main = parse_ecflow_vars() default_main(**kwargs_main)