Source code for deode.tasks.impacts

"""Impact model classes."""

import contextlib
import os
import tempfile
from dataclasses import dataclass
from pathlib import Path

from git import GitCommandError, Repo

from deode.config_parser import BasicConfig
from deode.datetime_utils import as_datetime, as_timedelta
from deode.general_utils import recursive_substitute
from deode.host_actions import HostNotFoundError, SelectHost
from deode.logs import logger
from deode.os_utils import deodemakedirs
from deode.scheduler import EcflowServer
from deode.tasks.base import Task
from deode.tasks.batch import BatchJob
from deode.toolbox import Platform


[docs] @dataclass() class BaseImpactModel: """Abstract class for impact models.""" name: str config: dict platform: Platform def __new__(cls, name: str, *_args, **_kwargs): """Create a new instance of a subclass based on the field_type.""" for subclass in BaseImpactModel.__subclasses__(): if subclass.name == name: return super(BaseImpactModel, subclass).__new__(subclass) raise ValueError(f"No valid BaseImpactModel subclass found for name: {name}")
[docs] def run(self): """Starts a plugin suite. Returns: path where impact model can run Raises: KeyError: Didn't found a handle to run """ # Fetch path from config if self.config.get("git", {}).get("active", False): path = self.process_git(self.config["git"]) else: path = self.platform.substitute(self.config.get("path")) if path: logger.info("Change directory to: {}", path) os.chdir(path) else: logger.debug("Path is empty or none; so don't change directory.") # Get the runner to run this plugin poetry = self.config.get("poetry") runner = f"{poetry} run" if poetry is not None else self.config.get("runner") args = self.config.get("arguments", []) if isinstance(args, str): args = [args] if args else [] if runner is not None: logger.info("Run argument(s) via: {}", runner) for arg in args: cmd = self.platform.substitute(f"{runner} {arg}") BatchJob(os.environ, wrapper="").run(cmd) elif len(args) > 0: raise KeyError(f"Didn't found a runner to run {self.name}") # else: just return the path, let the impact model implement the actual run return path
[docs] def execute(self): """Prepares and runs the impact model commands.""" logger.info("Impact model:{} ", str(self)) # Recursively substitute variables in communicate config for key, value in self.config["communicate"].items(): if isinstance(value, dict): self.config["communicate"][key] = recursive_substitute( value, self.platform, pos=[key] ) else: self.config["communicate"][key] = self.platform.substitute(value) # Write the config file com = self.platform.resolve_macros(self.config["communicate"]) config_name = self.platform.substitute(self.config.get("config_name")) if config_name is not None: logger.info(" communication keys: {}", com) # Write the config file if len(com) > 0: deodemakedirs( Path(config_name).parent, unixgroup=self.platform.get_value("platform.unix_group"), exist_ok=True, ) BasicConfig(com).save_as(config_name) elif len(com) > 0: logger.warning("Found keys to communicate but config_name not set.") # Execute the impact model specific command self.run()
[docs] def process_git(self, git_config): """Process git-configuration block where plugin should be located. Args: git_config: git config section for impact model Returns: plugin_dir: Path where plugin is located """ logger.info(f"Git source is active for {self.name}") plugin_dir = self.platform.substitute(git_config.get("dir")) do_git_checkout = plugin_dir is None or not os.path.isdir(plugin_dir) unixgroup = self.platform.get_value("platform.unix_group") if do_git_checkout: logger.info("No pre-stored git.dir found; checkout new repository...") # Checkout new git repository path = self.platform.substitute(self.config["path"]) # Create already path if it doesn't exist deodemakedirs(path, unixgroup=unixgroup) if plugin_dir is None: plugin_dir = tempfile.NamedTemporaryFile( prefix=f"{self.name}_", dir=path, delete=True ).name deodemakedirs(plugin_dir, unixgroup=unixgroup) # Check-out repo remote_url = git_config["remote_url"] branch = git_config["branch"] logger.info(f"Clone {branch} from {remote_url} in {plugin_dir}") try: Repo.clone_from(remote_url, plugin_dir, branch=branch) except GitCommandError as ex: logger.info( f"Catched exception {ex} when checking out repo; " "retry using plain git command." ) git_cmd = f""" git clone --single-branch --branch {branch} {remote_url} {plugin_dir}; """ BatchJob(os.environ, wrapper="").run(git_cmd) else: logger.info(f"Found installed {self.name} in {plugin_dir}; use that.") # Add plugin_dir to plugin's plugin_registry config_name = self.platform.substitute(self.config.get("config_name")) if config_name is not None: plugin_comm = BasicConfig.from_file(config_name) key = f"general.plugin_registry.plugins.{self.name}" plugin_comm = plugin_comm.update(key, plugin_dir) # Write plugin dir to appropriate key as specified by plugin_dir_key plugin_dir_key = self.config.get("plugin_dir_key") if plugin_dir_key is not None: plugin_comm = plugin_comm.update(plugin_dir_key, plugin_dir) plugin_comm.save_as(config_name) if do_git_checkout: # Inject checkout folder into main config file deode_config_file = self.platform.substitute("@CONFIG@") deode_cfg = BasicConfig.from_file(deode_config_file) remove_prefix = f"impact.{self.name}.git" deode_cfg = deode_cfg.update(f"{remove_prefix}.remove_dir", do_git_checkout) deode_cfg = deode_cfg.update(f"{remove_prefix}.dir", plugin_dir) logger.warning( f"Updated config file {deode_config_file} to inject {remove_prefix} keys" ) deode_cfg.save_as(deode_config_file) return plugin_dir
def __str__(self): return self.name
[docs] @dataclass() class Ehype(BaseImpactModel): """EHYPE specific methods.""" name = "ehype"
[docs] @dataclass() class EPSUpscaling(BaseImpactModel): """EPS upscaling specific methods.""" name = "eps_upscaling"
[docs] @dataclass() class Nwp2Windpower(BaseImpactModel): """NWP2windpower specific methods.""" name = "nwp2windpower"
[docs] @dataclass() class Verification(BaseImpactModel): """Verification specific methods.""" name = "verification"
[docs] @dataclass() class AQModels(BaseImpactModel): """Verification specific methods.""" name = "aq"
[docs] class ImpactModels(Task): """Create info to and start impact models.""" def __init__(self, config, taskname=None): """Construct object. Args: config (deode.ParsedConfig): Configuration taskname (str): Indicating name of task in suite """ Task.__init__(self, config, __class__.__name__) self.impact = get_impact(config, taskname)
[docs] def execute(self): """Start the impact model(s).""" for name, impact_config in self.impact.items(): model = BaseImpactModel( name=name, config=impact_config, platform=self.platform, ) model.execute()
[docs] def get_fdb_info(config): """Build a fdb request. Args: config (deode.ParsedConfig): Configuration Returns: fdb_req (dict): fdb request with appropriate keys """ include_keys = ("class", "dataset", "expver", "georef", "stream") fdb_req = {x: config.get(f"fdb.grib_set.{x}") for x in include_keys} if fdb_req["expver"] is None: return None add_keys = { "number": "eps.general.members", "basetime": "general.times.basetime", "forecast_range": "general.times.forecast_range", } for key, value in add_keys.items(): fdb_req[key] = config.get(value) # Handle members if fdb_req["stream"] == "enfo": fdb_req["number"] = list(fdb_req["number"]) else: fdb_req.pop("number") # Date/time fdb_req["basetime"] = as_datetime(fdb_req["basetime"]) fdb_req["date"] = fdb_req["basetime"].strftime("%Y%m%d") fdb_req["time"] = fdb_req["basetime"].strftime("%H%M") fdb_req.pop("basetime") # Assumes hourly data!!! fdb_req["forecast_range"] = as_timedelta(fdb_req["forecast_range"]) total_hours = int(fdb_req["forecast_range"].total_seconds() / 3600) fdb_req["step"] = list(range(total_hours + 1)) fdb_req.pop("forecast_range") return fdb_req
[docs] def get_impact(config, taskname): """Gather impact settings. Args: config (deode.ParsedConfig): Configuration taskname (str): Indicating name of task in suite Returns: impact (dict): Impact model settings """ _impact = config.get("impact", BasicConfig({})).dict() installed_impact = config.get("platform.impact", {}) impact = {} fdb_keys = get_fdb_info(config) # Resolve ecf_host/ecf_port if used with contextlib.suppress(HostNotFoundError): ecf_host = config.get("scheduler.ecfvars.ecf_host") ecf_port = config.get("scheduler.ecfvars.ecf_port") if ecf_host is not None and ecf_port is not None: pl = Platform(config) ecf_host = pl.substitute(ecf_host) ecf_host = pl.evaluate(ecf_host, object_=SelectHost) ecf_port = pl.substitute(ecf_port) ecf_port = pl.evaluate(ecf_port, object_=EcflowServer) config = config.copy( update={ "scheduler": { "ecfvars": { "ecf_host_resolved": ecf_host, "ecf_port_resolved": ecf_port, } } } ) for name, impact_model in _impact.items(): if ( impact_model.get("active", False) and name in installed_impact and (taskname in impact_model or impact_model.get("task") == taskname) ): impact[name] = impact_model.get(taskname, {}) # Fetch path, poetry etc from platform.impact and load config from impact.* for source in [installed_impact.get(name, {}), impact_model]: if isinstance(source, (BasicConfig, dict)): for conf, value in source.items(): if conf not in impact[name]: impact[name][conf] = value if "communicate" not in impact[name]: impact[name]["communicate"] = {} if fdb_keys is not None: impact[name]["communicate"]["fdb_request"] = fdb_keys # Update user ecf host and port with resolved values with contextlib.suppress(KeyError): user_ecf_host = impact[name]["communicate"].get("user_ecf_host") user_ecf_port = impact[name]["communicate"].get("user_ecf_port") if user_ecf_host is not None and user_ecf_port is not None: pl = Platform(config) impact[name]["communicate"]["user_ecf_host"] = pl.substitute( user_ecf_host ) impact[name]["communicate"]["user_ecf_port"] = pl.substitute( user_ecf_port ) return impact
[docs] class StartImpactModels(ImpactModels): """Starts plugins and impact models.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ ImpactModels.__init__(self, config, __class__.__name__)