"""Base site class."""
import atexit
import contextlib
import os
import shutil
import socket
from ..config_parser import ConfigParserDefaults
from ..logs import logger
from ..os_utils import deodemakedirs
from ..toolbox import FileManager
def _get_name(cname, cls, suffix, attrname="__plugin_name__"):
"""Get name.
Args:
cname (_type_): cname
cls (_type_): cls
suffix (str): suffix
attrname (str, optional): _description_. Defaults to "__plugin_name__".
Returns:
_type_: Name
"""
# __dict__ vs. getattr: do not inherit the attribute from a parent class
name = getattr(cls, "__dict__", {}).get(attrname, None)
if name is not None:
return name
name = cname.lower()
if name.endswith(suffix):
name = name[: -len(suffix)]
return name
[docs]
class Task(object):
"""Base Task class."""
def __init__(self, config, name):
"""Construct base task.
Args:
config (deode.ParsedConfig): Configuration
name (str): Task name
Raises:
ValueError: "You must set wrk"
"""
self.config = config
if "." in name:
name = name.split(".")[-1]
self.name = name
self.fmanager = FileManager(self.config)
self.platform = self.fmanager.platform
self.wrapper = self.config["submission.task.wrapper"]
self.wrk = self.platform.get_system_value("wrk")
if self.wrk is None:
raise ValueError("You must set wrk", self.wrk)
wdir = f"{self.wrk}/{socket.gethostname()}{os.getpid()!s}"
self.wdir = wdir
self.unix_group = self.platform.get_value("platform.unix_group")
logger.info("Task {} running in {}", self.name, self.wdir)
self._set_eccodes_environment()
def _set_eccodes_environment(self):
"""Set correct path for ECCODES tables.
Respect ECCODES_DEINITION_PATH if set or combine local settings with library
ones in different ways depending on version
"""
eccodes_definition_path = os.getenv("ECCODES_DEFINITION_PATH")
if eccodes_definition_path is not None:
logger.info("Use ECCODES_DEFINITION_PATH {}", eccodes_definition_path)
return
# Path to local tables
deode_eccodes_definition_path = str(
ConfigParserDefaults.DATA_DIRECTORY / "eccodes/definitions"
)
try:
eccodes_version = tuple(
[int(x) for x in os.getenv("ECCODES_VERSION").split(".")]
)
except AttributeError:
eccodes_version = (2, 30, 0)
eccodes_definition_path = deode_eccodes_definition_path
if eccodes_version < (2, 30, 0):
try:
eccodes_dir = os.environ["ECCODES_DIR"]
eccodes_definition_path = ":".join(
[
eccodes_definition_path,
f"{eccodes_dir}/share/eccodes/definitions",
]
)
except KeyError:
pass
os.environ["ECCODES_DEFINITION_PATH"] = str(eccodes_definition_path)
logger.info(
"Set ECCODES_DEFINITION_PATH to {}", os.environ["ECCODES_DEFINITION_PATH"]
)
[docs]
def archive_logs(self, files, target=None):
"""Archive files in a log directory.
Args:
files (str,list): File(s) to be archived
target (str): Target directory for archiving
"""
if target is None:
target = self.wrk
logdir = os.path.join(target, "logs", self.name)
deodemakedirs(logdir, unixgroup=self.unix_group)
if isinstance(files, str):
self.fmanager.output(files, logdir, provider_id="copy")
else:
for f in files:
self.fmanager.output(f, logdir, provider_id="copy")
[docs]
def create_wrkdir(self):
"""Create a cycle working directory."""
deodemakedirs(self.wrk, unixgroup=self.unix_group)
[docs]
def create_wdir(self):
"""Create task working directory and check for unix group and set permissions."""
deodemakedirs(self.wdir, unixgroup=self.unix_group)
[docs]
def change_to_wdir(self):
"""Change to task working dir."""
os.chdir(self.wdir)
[docs]
def remove_wdir(self):
"""Remove working directory."""
os.chdir(self.wrk)
shutil.rmtree(self.wdir)
logger.debug("Remove {}", self.wdir)
[docs]
def rename_wdir(self, prefix="Failed_task_"):
"""Rename failed working directory."""
if os.path.isdir(self.wdir):
fdir = f"{self.wrk}/{prefix}{self.name}"
if os.path.exists(fdir):
logger.debug("{} exists. Remove it", fdir)
shutil.rmtree(fdir)
pid = os.path.basename(self.wdir)
fdir = f"{fdir}_{pid}"
shutil.move(self.wdir, fdir)
logger.info("Renamed {} to {}", self.wdir, fdir)
[docs]
def get_binary(self, binary_name):
"""Determine binary path from task or system config section.
Args:
binary_name (str): Name of binary
Returns:
bindir (str): full path to binary
"""
binary = binary_name
with contextlib.suppress(KeyError):
binary = self.config[f"submission.task_exceptions.{self.name}.binary"]
try:
bindir = self.config[f"submission.task_exceptions.{self.name}.bindir"]
except KeyError:
try:
binaries = self.config[
f"submission.task_exceptions.{self.name}.binaries.{binary_name}"
]
logger.debug("binaries:{}", binaries)
with contextlib.suppress(KeyError):
binary = binaries["binary"]
with contextlib.suppress(KeyError):
bindir = binaries["bindir"]
except KeyError:
bindir = self.config["submission.bindir"]
bindir = os.path.realpath(bindir)
logger.debug("binary:{}", binary)
logger.debug("bindir:{}", bindir)
return f"{bindir}/{binary}"
[docs]
def execute(self):
"""Do nothing for base execute task."""
logger.debug("Using empty base class execute")
[docs]
def prep(self):
"""Do default preparation before execution.
E.g. clean
"""
logger.debug("Base class prep")
self.create_wdir()
self.change_to_wdir()
atexit.register(self.rename_wdir)
[docs]
def post(self):
"""Do default postfix.
E.g. clean
"""
logger.debug("Base class post")
# Clean workdir
if self.config["general.keep_workdirs"]:
self.rename_wdir(prefix="Finished_task_")
else:
self.remove_wdir()
[docs]
def run(self):
"""Run task.
Define run sequence.
"""
self.prep()
self.execute()
self.post()
[docs]
def get_task_setting(self, setting):
"""Get task setting.
Args:
setting (str): Setting to find in task.{self.name}
Returns:
value : Found setting
"""
task_subsection_name_in_config = _get_name(
self.__class__.__name__,
self.__class__,
Task.__name__.lower(),
attrname="__type_name__",
)
setting_to_be_retrieved = f"task.{task_subsection_name_in_config}.{setting}"
try:
value = self.config[setting_to_be_retrieved]
except KeyError:
logger.exception(
"Task setting '{}' not found in config.", setting_to_be_retrieved
)
return None
logger.debug("Setting = {} value ={}", setting_to_be_retrieved, value)
return value
[docs]
class UnitTest(Task):
"""Base Task class."""
def __init__(self, config):
"""Construct test task.
Args:
config (deode.ParsedConfig): Configuration
"""
Task.__init__(self, config, __class__.__name__)