Source code for deode.tasks.c903light

"""C903Light."""

from deode.boundary_utils import Boundary
from deode.datetime_utils import as_datetime
from deode.logs import logger
from deode.tasks.base import Task


[docs] class C903Light(Task): """C903Light, just symlink the target file to one produced elsewhere.""" def __init__(self, config): """Construct C903Light object. Args: config (deode.ParsedConfig): Configuration """ Task.__init__(self, config, __class__.__name__) self.basetime = as_datetime(self.config["general.times.basetime"]) self.boundary = Boundary(config) self.target = ( f"{self.platform.get_system_value('intp_bddir')}" + "/" + f"{self.config['file_templates.interpolated_boundaries.archive']}" ) self.name = ( f"{self.name}_{self.boundary.min_index}-{self.boundary.max_index}" ).upper()
[docs] def execute(self): """Run task. Define run sequence. """ # Get target file name duo = self.config.get("task.args.duo", "0:0") doer, i = [int(j) for j in duo.split(":", 1)] me = int(self.config.get("task.args.me", "0")) # Loop over boundary files for this batch for bd_index, bd_time in self.boundary.bd_index_time_dict.items(): validtime = as_datetime(bd_time) target = self.target.replace("@NNN@", f"{bd_index:03}") # Find the path to the actual file real_target = self.platform.substitute( target, basetime=self.boundary.bd_basetime, validtime=validtime ) real_target = real_target.replace(f"mbr{me:03d}", f"mbr{doer:03d}", 1) if i > 0: real_target += f"_slaf{i}" if "target_suffix" in self.config["task.args"]: target += self.config["task.args.target_suffix"] logger.info(f"real_target={real_target}") self.fmanager.input(real_target, target) logger.info(f" target={target}")