Source code for deode.tasks.iomerge

"""IO merge task."""
import datetime
import glob
import os
from time import sleep, time

from ..datetime_utils import as_datetime, as_timedelta, oi2dt_list
from ..logs import logger
from .base import Task
from .batch import BatchJob


[docs] class IOmerge(Task): """IO merge task.""" def __init__(self, config): """Construct IOmerge object. Args: config (deode.ParsedConfig): Configuration """ Task.__init__(self, config, __class__.__name__) self.cycle = self.config["general.cycle"] self.cnmexp = self.config["general.cnmexp"] self.domain = self.config["domain.name"] self.basetime = as_datetime(self.config["general.times.basetime"]) self.cycle_length = as_timedelta(self.config["general.times.cycle_length"]) self.forecast_range = self.config["general.times.forecast_range"] self.deode_home = self.config["platform.deode_home"] self.output_settings = self.config["general.output_settings"] self.surfex = self.config["general.surfex"] self.n_io_merge = self.config["suite_control.n_io_merge"] self.ionr = int(config["task.args.ionr"]) self.archive = self.platform.get_system_value("archive") self.deode_home = self.config["platform.deode_home"] self.file_templates = self.config["file_templates"] self.nproc_io = int(self.config.get("task.args.nproc_io", "0")) self.iomerge = self.config["submission.iomerge"]
[docs] @staticmethod def wait_for_file(filename, age_limit=15): """Wait until a file is at least N seconds old. Args: filename (str): full path to file age_limit (int): minimum age of last modification """ now = time() st = os.stat(filename).st_mtime while now - st < age_limit: sleep(age_limit + now - st + 1) now = time() st = os.stat(filename).st_mtime
[docs] def wait_for_io(self, filetype, lt, fc_path="../Forecast", maxtries=2): """Wait for all io_server output to be stable. Args: filetype (str): kind of output file lt (timeDelta): lead time fc_path (str): path to forecast directory maxtries (int): maximum number of file search trials Returns: file_list (list): List of files expected Raises: RuntimeError: In case of erroneous number of files """ ftemplate = self.file_templates[filetype]["model"] age_limit = self.iomerge["age_limit"][filetype] files_expected = self.iomerge["files_expected"][filetype] validtime = self.basetime + lt filename = self.platform.substitute(ftemplate, validtime=validtime) ntries = 0 while True: ntries += 1 file_list = [] if filetype == "history": files_expected = files_expected if files_expected != 0 else -1 for io in range(self.nproc_io): iopath = f"io_serv.{io+1:06}.d" file_list += glob.glob(f"{fc_path}/{iopath}/{filename}.speca.*") file_list += glob.glob(f"{fc_path}/{iopath}/{filename}.gridall") elif filetype == "surfex": files_expected = ( files_expected if files_expected != 0 else (1 + self.nproc_io) ) file_list += [f"{fc_path}/{filename}"] for io in range(self.nproc_io): iopath = f"io_serv.{io+1:06}.d" file_list += glob.glob(f"{fc_path}/{iopath}/{filename}") elif filetype == "fullpos": files_expected = files_expected if files_expected != 0 else self.nproc_io # FIXME: what if we have fullpos output in FA format? for io in range(self.nproc_io): iopath = f"io_serv.{io+1:06}.d" file_list += glob.glob(f"{fc_path}/{iopath}/{filename}.hfp") else: files_expected = 0 logger.error("Bad file_type {}", filetype) for ff in file_list: self.wait_for_file(ff, age_limit) files_found = len(file_list) if files_found == files_expected or files_expected < 0: break if ntries == maxtries: logger.error( "Expected {} files found {} after {} scans", files_expected, files_found, ntries, ) raise RuntimeError(f"Expected {files_expected} files found {files_found}") return file_list
[docs] def merge_output(self, lt, fc_path="../Forecast"): """Merge distributed forecast model output. Args: lt (timeDelta): lead time fc_path (str): path to forecast directory """ logger.info("Merge_output called at lt = {}", lt) for filetype, oi in self.output_settings.items(): if filetype not in list(self.file_templates.keys()): continue # NOTE: various output types (history, surfex, fullpos) # may be created at other time intervals # so we have to check validtime for every filetype dt_list = oi2dt_list(oi, self.forecast_range) if lt not in dt_list: logger.info("{} not required at time {}", filetype, lt) continue ftemplate = self.file_templates[filetype]["model"] ftemplate_out = self.file_templates[filetype]["archive"] validtime = self.basetime + lt filename = self.platform.substitute(ftemplate, validtime=validtime) filename_out = self.platform.substitute(ftemplate_out, validtime=validtime) logger.info("Merging file {}", filename) file_list = self.wait_for_io(filetype, lt, fc_path=fc_path) files = " ".join(file_list) logger.info("{} input files:", len(file_list)) for x in file_list: logger.info(" {}", x) if filetype in ("history", "surfex"): lfitools = self.get_binary("lfitools") # NOTE: .sfx also has a part in the working directory, # so you *must* change the name cmd = f"{lfitools} facat all {files} {filename}" elif filetype == "fullpos": # FIXME: fullpos output /can/ be FA or GRIB2 # Fullpos (grib2) output has .hfp as extra file extension cmd = f"cat {files} > {filename}" else: logger.error("Unsupported filetype {}", filetype) continue BatchJob(os.environ, wrapper="").run(cmd) # write output to archive (or to the Forecast directory...) self.fmanager.output(filename, f"{self.archive}/{filename_out}")
[docs] def execute(self): """Execute IO merge.""" if self.nproc_io == 0: # Normally, this shouldn't happen, but maybe outside of ecFlow. logger.info("IOmerge called with NPROC_IO == 0.") logger.info("Nothing left to do.") return if self.n_io_merge == 0: # Normally, this shouldn't happen, but maybe outside of ecFlow. logger.info("IOmerge called with n_io_merge == 0") logger.info("Nothing left to do.") return if self.ionr < 0: # Normally, this shouldn't happen, but maybe outside of ecFlow. logger.info("IOmerge called with ionr < 0") logger.info("Nothing left to do.") return fc_path = "../Forecast" # NOTE: this is a link to the current Forecast directory. # you could manipulate via os.path.basename(os.path.realpath(fc_path)) # e.g. to get the real path name or even add "Finished_task_Forecast_". io_path = f"{fc_path}/io_serv.000001.d" echis = f"{io_path}/ECHIS" full_dt_list = [] oi_list = [] # build the list of lead times (all output types combined): # NOTE: like in the Forecast task, we assume that a filetype is # in output_settings AND in file_templates. # Alternatively, ["history", "fullpos", "surfex"] could be # hard coded. for filetype, oi in self.output_settings.items(): if filetype in list(self.file_templates.keys()): logger.debug("filetype: {}, oi: {}", filetype, oi) if oi not in oi_list: oi_list += oi full_dt_list += oi2dt_list(oi, self.forecast_range) # remove duplicates and sort full_dt_list = list(set(full_dt_list)) full_dt_list.sort() # now subset for this worker: dt_list = full_dt_list[self.ionr :: self.n_io_merge] logger.info("IONR = {}, N_IO_MERGE = {}", self.ionr, self.n_io_merge) logger.info("DT list {}", dt_list) # We must wait for the io_server output to appear # BUT: how long should we wait before aborting? logger.info("Waiting for forecast output.") while not os.path.exists(echis): sleep(5) logger.info("IO_SERVER {} detected!", echis) # Now we wait for the output times allotted to this IO worker. for dt in dt_list: logger.info("Waiting for {}", dt) while True: with open(f"{io_path}/ECHIS", "r") as ll: lt = ll.read() hh, mm, ss = [int(x) for x in lt.split(":")] cdt = datetime.timedelta(hours=hh, minutes=mm, seconds=ss) if dt > cdt: sleep(5) else: break # we have reached the next 'merge time' logger.info("lead time {} is available", dt) # NOTE: merge_output will first wait for files to be "complete" self.merge_output(dt, fc_path=fc_path) # signal completion of all merge tasks to the forecast task BatchJob(os.environ, wrapper="").run(f"touch {fc_path}/io_merge_{self.ionr:02}")