Source code for deode.tasks.forecast

import glob
import json
import os
from time import sleep

from ..config_parser import ConfigPaths
from ..datetime_utils import as_datetime, as_timedelta, oi2dt_list
from ..derived_variables import check_fullpos_namelist
from ..initial_conditions import InitialConditions
from ..logs import logger
from ..namelist import NamelistGenerator
from ..os_utils import deodemakedirs
from .base import Task
from .batch import BatchJob
from .sfx import InputDataFromNamelist

[docs] class Forecast(Task): """Forecast task.""" def __init__(self, config): """Construct forecast object. Args: config (deode.ParsedConfig): Configuration """ Task.__init__(self, config, __class__.__name__) self.cycle = self.config["general.cycle"] self.cnmexp = self.config["general.cnmexp"] self.domain = self.config[""] self.windfarm = self.config.get("general.windfarm", False) self.basetime = as_datetime(self.config["general.times.basetime"]) self.cycle_length = as_timedelta(self.config["general.times.cycle_length"]) self.bdint = as_timedelta(self.config["boundaries.bdint"]) self.intp_bddir = self.config["system.intp_bddir"] self.forecast_range = self.config["general.times.forecast_range"] self.archive = self.platform.get_system_value("archive") self.deode_home = self.config["platform.deode_home"] self.output_settings = self.config["general.output_settings"] self.surfex = self.config["general.surfex"] self.accelerator_device = self.config.get("accelerator_device", None) # Update namelist settings self.nlgen_master = NamelistGenerator(self.config, "master") self.nlgen_surfex = NamelistGenerator(self.config, "surfex") self.master = self.get_binary("MASTERODB") self.file_templates = self.config["file_templates"] self.unix_group = self.platform.get_platform_value("unix_group") self.n_io_merge = self.config["suite_control.n_io_merge"]
[docs] def archive_output(self, filetype, periods): """Archive forecast model output. Args: filetype (str): Filename template periods (str): Output list """ dt_list = oi2dt_list(periods, self.forecast_range)"Input template: {}", filetype["model"])"Output template: {}", filetype["archive"]) for dt in dt_list: filename_in = self.platform.substitute( filetype["model"], validtime=self.basetime + dt ) filename_out = self.platform.substitute( filetype["archive"], validtime=self.basetime + dt ) self.fmanager.output(filename_in, f"{self.archive}/{filename_out}")
[docs] def wfp_input(self): """Add wind turbine files to forecast directory.""" self.wfp_dir = self.platform.get_platform_value("windfarm_path") yy = self.basetime.strftime("%Y") self.fmanager.input( f"{self.wfp_dir}/wind_turbine_coordinates_{self.domain}_{yy}.tab", "", ) turbine_list = glob.glob(f"{self.wfp_dir}/wind_turbine_[0-9][0-9][0-9].tab") for ifile in turbine_list: infile = os.path.basename(ifile) self.fmanager.input(ifile, infile)
[docs] def accelerator_device_input(self): """Copy the input files for GPU execution. - parallel_method: input file with parallelisation technique for each algorithm - synchost: input file defining optional device-to-host memory transfers - select_gpu: wrapper file for sbatch, binding GPU to MPI rank """ for key, file_definition in self.accelerator_device.items(): if key in ["parallel_method", "sync_host", "select_gpu"]: input_file = file_definition[0] output_file = file_definition[1] if input_file and output_file: self.fmanager.input(input_file, output_file)
[docs] def merge_output(self, filetype, periods): """Merge distributed forecast model output. Args: filetype (str): File type (history, surfex, fullpos) periods (str): Output list Final result is the expected output file name in the working directory (as if there was no IO server). NOTE: This function has been replaced by io_merge tasks. It is only called if n_io_merge=0. """ dt_list = oi2dt_list(periods, self.forecast_range) for dt in dt_list: ftemplate = self.file_templates[filetype]["model"] filename = self.platform.substitute(ftemplate, validtime=self.basetime + dt) logger.debug("Merging file {}", filename) if filetype == "history": lfitools = self.get_binary("lfitools") cmd = f"{lfitools} facat all io_serv*.d/{filename}.gridall " cmd += f"io_serv*.d/{filename}.speca* {filename}" logger.debug(cmd) BatchJob(os.environ, wrapper="").run(cmd) elif filetype == "surfex": # NOTE: .sfx also has a part in the working directory, # so you *must* change the name lfitools = self.get_binary("lfitools") os.rename(filename, filename + ".part") cmd = f"{lfitools} facat all {filename}.part " cmd += f"io_serv*.d/{filename} {filename}" logger.debug(cmd) BatchJob(os.environ, wrapper="").run(cmd) else: # Fullpos (grib2) output has .hpf as extra file extension cmd = f"cat io_serv*.d/{filename}*.hfp > {filename}" logger.debug(cmd) BatchJob(os.environ, wrapper="").run(cmd)
[docs] def execute(self): """Execute forecast.""" # Fetch forecast model static input data input_definition = ConfigPaths.path_from_subpath( self.platform.get_system_value("forecast_input_definition") )"Read static data spec from: {}", input_definition) with open(input_definition, "r", encoding="utf-8") as f: input_data = json.load(f) self.fmanager.input_data_iterator(input_data) # wind farm input data if self.windfarm: self.wfp_input() # Construct master namelist and include fullpos config forecast_namelist = "forecast" self.nlgen_master.load(forecast_namelist) self.nlgen_master = check_fullpos_namelist(self.config, self.nlgen_master) nlres = self.nlgen_master.assemble_namelist(forecast_namelist) self.nlgen_master.write_namelist(nlres, "fort.4") # SURFEX: Namelists and input data self.nlgen_surfex.load("forecast") settings = self.nlgen_surfex.assemble_namelist("forecast") self.nlgen_surfex.write_namelist(settings, "EXSEG1.nam") input_definition = ConfigPaths.path_from_subpath( self.platform.get_system_value("sfx_input_definition") ) with open(input_definition, "r", encoding="utf-8") as f: input_data = json.load(f) binput_data = InputDataFromNamelist( settings, input_data, "forecast", self.platform ).get() for dest, target in binput_data.items(): logger.debug("target={}, dest={}", target, dest) self.fmanager.input(target, dest) # Initial files initfile, initfile_sfx = InitialConditions(self.config).find_initial_files() self.fmanager.input(initfile, f"ICMSH{self.cnmexp}INIT") if not self.surfex: initfile_sfx = None if initfile_sfx is not None: self.fmanager.input(initfile_sfx, f"ICMSH{self.cnmexp}INIT.sfx") # Use explicitly defined boundary dir if defined intp_bddir = self.config.get("system.intp_bddir", self.wrk) # Link the boundary files, use initial file as first boundary file self.fmanager.input(initfile, f"ELSCF{self.cnmexp}ALBC000") cdtg = self.basetime + self.bdint dtgend = self.basetime + as_timedelta(self.forecast_range) i = 1 while cdtg <= dtgend: source = f"ELSCF{self.cnmexp}ALBC{i:03d}" self.fmanager.input(f"{intp_bddir}/{source}", source) cdtg += self.bdint i += 1 if self.accelerator_device:"Processing accelerator_device section") self.accelerator_device_input() else:"No accelerator_device section found") # Create a link to working directory for IO_merge tasks if os.path.islink("../Forecast"):"Removing old link.") os.unlink("../Forecast") os.symlink(os.getcwd(), "../Forecast") # Store the output # Must happen before the forecast starts, so the io_merge tasks # can write to it. deodemakedirs(self.archive, unixgroup=self.unix_group) # Run MASTERODB batch = BatchJob(os.environ, wrapper=self.platform.substitute(self.wrapper)) io_server = os.path.exists("io_serv.000001.d") if io_server and self.n_io_merge > 0:"Waiting for iomerge output.") logger.debug("IO_SERVER detected!") # Wait for all io_merge tasks to finish. # This is signaled by creating (empty) files. for ionr in range(self.n_io_merge): io_name = f"io_merge_{ionr:02}" logger.debug("Waiting for {}", io_name) while not os.path.exists(io_name): sleep(5) else: for filetype, oi in self.output_settings.items(): if filetype in self.file_templates: if io_server: # No io_merge: should we even still consider this option? self.merge_output(filetype, oi) self.archive_output(self.file_templates[filetype], oi) self.archive_logs(["fort.4", "EXSEG1.nam", "NODE.001_01"]) # Remove link to working directory for IO_mergetasks if os.path.islink("../Forecast"): os.unlink("../Forecast")
[docs] class PrepareCycle(Task): """Task.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ Task.__init__(self, config, self.__class__.__name__) self.archive = self.platform.get_system_value("archive") deodemakedirs( self.archive, unixgroup=self.platform.get_platform_value("unix_group") )
[docs] class FirstGuess(Task): """FirstGuess.""" def __init__(self, config): """Construct FirstGuess object. Args: config (deode.ParsedConfig): Configuration """ Task.__init__(self, config, __class__.__name__)
[docs] def execute(self): """Find initial file.""" initfile, initfile_sfx = InitialConditions(self.config).find_initial_files()