Source code for deode.tasks.sfx

"""Surfex tasks."""

import json
import os

from ..config_parser import ConfigPaths
from ..datetime_utils import as_datetime, as_timedelta, cycle_offset, get_decade
from ..logs import logger
from ..namelist import NamelistGenerator
from ..os_utils import deodemakedirs
from .base import Task
from .batch import BatchJob
from .marsprep import Marsprep


[docs] class InputDataFromNamelist: """Binary input data for offline executables.""" def __init__( self, nml, input_data, program, platform, basetime=None, validtime=None, one_decade=False, ): """Construct the binary input set up from namelist. Args: nml (f90nml): Namelist input_data (dict): Input data definitions program (str): Program mode to set up for (pgd/prep etc) platform (Platform): Platform dependent settings basetime (as_datetime, optional): basetime. Defaults to None. validtime (as:datetime, optional): Validtime. Defaults to None. one_decade (bool, optional): one decade or not. Defaults to False. Raises: RuntimeError: Could not find program """ self.nml = nml self.platform = platform self.basetime = basetime self.validtime = validtime self.one_decade = one_decade try: self.data = input_data[program] except KeyError: raise RuntimeError(f"Could not find program {program}") from KeyError self.data = self.process_data()
[docs] def get(self): """Return data.""" return self.data
[docs] @staticmethod def get_nml_value(nml, block, key, indices=None): """Get namelist value. Args: nml (nmlf90.Namelist): Namelist block (str): Namelist block key (str): Namelist key indices (list, optional): Indices to read. Defaults to None. Returns: setting (any): Namelist setting """ logger.debug("Checking block={} key={}", block, key) if block not in nml: return None if key in nml[block]: vals = [] val_dict = {} val = nml[block][key] logger.debug("namelist type={}", type(val)) if indices is not None: logger.debug("indices={}", indices) if len(indices) == 2: val = nml[block][key][indices[1]][indices[0]] else: val = nml[block][key][indices[0]] logger.debug("Found 1D value {}", val) if isinstance(val, list): return None val_dict.update({"value": val, "indices": None}) vals.append(val_dict) elif isinstance(val, list): dim_size = len(val) logger.debug("dim_size={}", dim_size) dims = [] tval = val more_dimensions = True while more_dimensions: logger.debug("tval={} type(tval)={}", tval, type(tval)) if isinstance(tval, int): more_dimensions = False else: logger.debug("type(tval)={}", type(tval)) if not isinstance(tval, list) or isinstance(tval[0], int): more_dimensions = False else: logger.debug("len(tval)={} type(tval)={}", len(tval), type) dim_size = len(tval) dims.append(dim_size) tval = tval[0] logger.debug("New tval={} dim_size={}", tval, dim_size) logger.debug("dims={}", dims) logger.debug("type(val)={}", type(val)) if len(dims) == 2: for i in range(dims[0]): for j in range(dims[1]): val_dict = {} indices = [j, i] lval = val[i][j] val_dict.update({"value": lval, "indices": indices}) logger.debug("value={} indices={}", lval, indices) vals.append(val_dict) elif len(dims) == 1: for i in range(dims[0]): val_dict = {} indices = [i] logger.debug("i={}, val[i]={}", i, val[i]) lval = val[i] val_dict.update({"value": lval, "indices": indices}) logger.debug("value={} indices={}", lval, indices) vals.append(val_dict) elif len(dims) == 0: val_dict = {} logger.debug("val={}", val) val_dict.update({"value": val, "indices": None}) vals.append(val_dict) else: val_dict = {} if isinstance(val, bool): val = str(val) val_dict.update({"value": val, "indices": None}) vals.append(val_dict) logger.debug("Found: value={}", val_dict["value"]) return vals return None
[docs] @staticmethod def get_nml_value_from_string(nml, string, sep="#", indices=None): """Get namelist value from a string. Args: nml (nmlf90.Namelist): Namelist string (str): Namelist identifier sep (str, optional): _description_. Defaults to "#". indices (list, optional): Indices to read. Defaults to None. Returns: setting (any): Namelist setting """ nam_section = string.split(sep)[0] nam_key = string.split(sep)[1] return InputDataFromNamelist.get_nml_value( nml, nam_section, nam_key, indices=indices )
[docs] def substitute(self, key, val, macros=None, micro="@"): """Substitute patterns. Args: key (str): _description_ val (str): _description_ macros (dict, optional): Macros. Defaults to None. micro (str, optional): Micro character. Defaults to "@". Returns: dict: Substituted key=value """ logger.debug( "Substitute key={} and val={} {} {}", key, val, self.basetime, self.validtime ) pkey = key pval = val if macros is not None: for macro_key, macro_val in macros.items(): logger.debug("macro_key={} macro_val={}", macro_key, macro_val) pkey = pkey.replace(f"{micro}{macro_key}{micro}", macro_val) pval = pval.replace(f"{micro}{macro_key}{micro}", macro_val) pkey = self.platform.substitute( pkey, basetime=self.basetime, validtime=self.validtime ) pval = self.platform.substitute( pval, basetime=self.basetime, validtime=self.validtime ) logger.debug("SUBSTITUTED: pkey={} pval={}", pkey, pval) return pkey, pval
[docs] def read_macro_setting(self, macro_defs, key, default=None, sep="#"): """Read a macro setting. Args: macro_defs (dict): Macro definition key (str): Macro setting to get. default (str, optional): Default value. Defaults to None. sep (str, optional): Namelist key separator. Defaults to "#". Returns: setting (any) """ try: setting = macro_defs[key] if isinstance(setting, str) and setting.find(sep) > 0: logger.debug("Read macro setting from namelist {}", setting) setting = self.get_nml_value_from_string(self.nml, setting) if isinstance(setting, list): setting = setting[0]["value"] return setting except KeyError: return default
[docs] def extend_macro(self, key, val, macros, sep="#"): """Extend entries from macro. Args: key (_type_): _description_ val (_type_): _description_ macros (dict): Macros sep (str, optional): Namelist key separator. Defaults to "#". Raises: NotImplementedError: _description_ NotImplementedError: _description_ Returns: dict: Key, value dictionary """ logger.debug("extenders={}", macros) if macros is None: return {key: val} processed_data = {} for macro, macro_types in macros.items(): loop = {} for macro_type, macro_defs in macro_types.items(): logger.debug("macro_defs={}", macro_defs) if macro_type == "ekfpert": nncvs = self.read_macro_setting(macro_defs, "list", sep=sep) logger.debug("nncvs={}", nncvs) nncvs = nncvs.copy() duplicate = self.read_macro_setting(macro_defs, "duplicate", sep=sep) if duplicate: nncvs += nncvs loop.update({"0": "0"}) icounter1 = 1 icounter2 = 1 for nncv in nncvs: if nncv == 1: loop.update({str(icounter1): str(icounter2)}) icounter1 += 1 icounter2 += 1 elif macro_type == "dict": values = self.get_nml_value_from_string(self.nml, macro_defs) counter = 0 for key, val in values[0].items(): loop.update({str(key): str(val)}) counter += 1 elif macro_type == "iterator": start = self.read_macro_setting(macro_defs, "start", sep=sep) end = self.read_macro_setting(macro_defs, "end", sep=sep) fmt = self.read_macro_setting( macro_defs, "fmt", sep=sep, default=None ) if fmt is None: fmt = "{:d}" for lval_ in range(start, end): lval = fmt.format(lval_) loop.update({str(lval): str(lval)}) else: raise NotImplementedError # Loop normal macros not being nml arrays unprocessed_data = processed_data.copy() unprocessed_data = processed_data if processed_data else {key: val} for key_, val in unprocessed_data.items(): key = key_ for vmacro1, vmacro2 in loop.items(): logger.debug( "key={} val={} macro={} vmacro1={} vmacro2={}", key, val, macro, vmacro1, vmacro2, ) if key.find("#") > 0: key = self.get_nml_value_from_string(self.nml, key, sep=sep) pkey = key.replace(f"@{macro}@", vmacro1) pval = val.replace(f"@{macro}@", vmacro2) processed_data.update({pkey: pval}) logger.debug("Processed data={}", processed_data) return processed_data
[docs] def process_macro(self, key, val, macros, sep="#", indices=None): """Process macro. Args: key (str): Key val (str): Value macros (dict): Macros sep (str, optional): Namelist key separator. Defaults to "#". indices (list, optional): Process macro from namelist indices. Raises: NotImplementedError: Only 2 dimensions are implemented Returns: dict: Key, value dictionary """ logger.debug("macros={}", macros) if macros is None: return key, val logger.debug("indices={}", indices) if indices is None: return key, val pkey = key pval = val for macro in macros: lindex = None if len(indices) == 2: lindex = indices[1] if macro == "DECADE" else indices[0] elif len(indices) == 1: lindex = indices[0] elif len(indices) > 2: raise NotImplementedError("Only 2 dimensions are implemented") vmacro = None if lindex is not None: try: macro_defs = macros[macro] logger.debug("macro_defs={}", macro_defs) except KeyError: logger.warning( "Macro {} not defined. Use index value {}", macro, lindex ) vmacro = str(lindex + 1) if macro == "VTYPE": vmacro = str(lindex + 1) elif "DECADE" in macros: ntime = self.read_macro_setting(macro_defs, "ntime", sep=sep) dec_days = int(360 / float(ntime)) dec_start = int(dec_days / 2) dec_end = 360 + dec_start dec = 0 for day in range(dec_start, dec_end, dec_days): logger.debug("day={}, dec={} lindex={}", day, dec, lindex) month = int(day / 30) + 1 mday = int(day % 30) if dec == lindex: vmacro = f"{month:02d}{mday:02d}" dec += 1 if self.one_decade: basetime = as_datetime(self.basetime) vmacro = get_decade(basetime) logger.debug( "Substitute @{}@ with {} pkey={} pval={}", macro, vmacro, pkey, pval ) if isinstance(pkey, str): pkey = pkey.replace(f"@{macro}@", vmacro) if isinstance(pval, str): pval = pval.replace(f"@{macro}@", vmacro) logger.debug( "Substitute @{}@ with {} pkey={} pval={}", macro, vmacro, pkey, pval ) return pkey, pval
[docs] def matching_value(self, data, val, sep="#", indices=None): """Match the value. Possibly also read namelist value. Args: data (dict): Data to check keys for val (str): Key to find sep (str, optional): Namelist separator. Defaults to "#". indices(list, optional): Indices in namelist Raises: RuntimeError: "Malformed input data" Returns: dict: Matching entry in data. """ if val in ["macro", "extenders"]: return None logger.debug("type(data)={}", type(data)) logger.debug("type(val)={}", type(val)) logger.debug("indices={}", indices) mdata = data.keys() if isinstance(data, dict) else [data] val = str(val) logger.debug("Check if val={} matches mdata={}", val, mdata) sval = None for mval in mdata: if val.find(sep) > 0: logger.debug("val={} is a namelist variable", val) sval = self.get_nml_value_from_string(self.nml, val, indices=indices) logger.debug("Got sval={}", sval) if sval is None: return None indices = sval[0]["indices"] sval = sval[0]["value"] if mval == val: logger.debug("Found matching data. val={} data={}", val, data) try: rval = data[val] except TypeError: raise RuntimeError("Malformed input data") from TypeError if sval is not None: rval = {sval: rval} logger.debug("Return data rval={}", rval) return rval logger.warning("Value={} not found in data", val) return None
[docs] def process_data(self, sep="#"): """Process input definitions on files to map. Args: sep (str, optional): Namelist separator. Defaults to "#". Returns: mapped_data (dict): A dict with mapped local names and target files. """ logger.debug("Process data: {}", self.data) def _process_data(mapped_data, data, indices=None, macros=None, extenders=None): for key, value in data.items(): logger.debug(".................. key={}", key) # Required namelist variable if key.find(sep) > 0: vals = self.get_nml_value_from_string(self.nml, key, indices=indices) else: vals = [{"value": value, "indices": None}] if isinstance(vals, list): for val_dict in vals: logger.debug("=========== val_dict={}", val_dict) val = val_dict["value"] indices = val_dict["indices"] setting = self.matching_value( value, val, sep=sep, indices=indices ) logger.debug("Setting={}", setting) if setting is not None: if "macros" in setting: macros = setting.copy() macros = macros["macros"] if "extenders" in setting: extenders = setting.copy() extenders = extenders["extenders"] last_dict = True if isinstance(setting, dict): for tval in setting.values(): if isinstance(tval, dict): last_dict = False if not last_dict: logger.debug("------ Call next loop. setting={}", setting) _process_data( mapped_data, setting, indices=indices, macros=macros, extenders=extenders, ) else: for key2_, value2 in setting.items(): key2 = key2_ logger.debug( "Setting1 key={} value={} indices={}", key2, value2, indices, ) if key2.find(sep) > 0: keys = self.get_nml_value_from_string( self.nml, key2, indices=indices ) key2 = keys[0]["value"] processed = False logger.debug( "Setting2 key={} value={} indices={}", key2, value2, indices, ) if macros is not None: processed = True key3, value3 = self.process_macro( key2, value2, macros, indices=indices ) if value3.endswith(".dir"): dir_key = key3 + ".dir" dir_val = value3 hdr_key = key3 + ".hdr" hdr_val = value3.replace(".dir", ".hdr") hdr_key, hdr_val = self.substitute( hdr_key, hdr_val ) dir_key, dir_val = self.substitute( dir_key, dir_val ) mapped_data.update({hdr_key: hdr_val}) mapped_data.update({dir_key: dir_val}) elif value3.endswith(".nc"): my_key, my_val = self.substitute(key3, value3) logger.debug( "my_key={}, my_val={}", my_key, my_val ) if not my_key.endswith(".nc"): my_key = my_key + ".nc" mapped_data.update({my_key: my_val}) elif value3.endswith( ".txt" ): # need to remove the ".txt" my_key, my_val = self.substitute(key3, value3) logger.debug( "my_key={}, my_val={}", my_key, my_val ) if not my_key.endswith(".txt"): my_key = my_key + "" mapped_data.update({my_key: my_val}) else: my_key, my_val = self.substitute(key3, value3) mapped_data.update({my_key: my_val}) if extenders is not None: processed = True processed_values = self.extend_macro( key2, value2, extenders ) for pkey3, pval3 in processed_values.items(): logger.debug( "pkey3={} pval3={}", pkey3, pval3 ) new_pkey3, new_pval3 = self.substitute( pkey3, pval3 ) logger.debug( "pkey3={} pval3={}", pkey3, pval3 ) mapped_data.update({new_pkey3: new_pval3}) if not processed: pkey3 = key2 pval3 = value2 logger.debug("pkey3={} pval3={}", pkey3, pval3) if pval3.endswith(".nc") and not pkey3.endswith( ".nc" ): pkey3 = pkey3 + ".nc" pkey3, pval3 = self.substitute(pkey3, pval3) if pval3.endswith(".dir"): dir_key = pkey3 + ".dir" dir_val = pval3 hdr_key = pkey3 + ".hdr" hdr_val = pval3.replace(".dir", ".hdr") hdr_key, hdr_val = self.substitute( hdr_key, hdr_val ) dir_key, dir_val = self.substitute( dir_key, dir_val ) mapped_data.update({hdr_key: hdr_val}) mapped_data.update({dir_key: dir_val}) else: mapped_data.update({pkey3: pval3}) indices = None elif key not in ["macros", "extenders"]: logger.warning("Could not match key={} value={}", key, val) else: logger.warning("Could not find namelist key={}", key) indices = None mapped_data = {} _process_data(mapped_data, self.data) logger.debug("Mapped data={}", mapped_data) return mapped_data
[docs] class Pgd(Task): """Task.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ Task.__init__(self, config, __class__.__name__) self.program = "pgd" self.nlgen = NamelistGenerator(self.config, "surfex") self.climdir = self.platform.get_system_value("climdir") self.one_decade = self.config["pgd.one_decade"] self.basetime = config["task.args.basetime"] self.pgd_prel = self.platform.substitute( self.config["file_templates.pgd_prel.archive"], basetime=self.basetime ) # TODO get from args self.force = True
[docs] def execute(self): """Execute.""" output = f"{self.climdir}/{self.pgd_prel}" if not os.path.exists(output) or self.force: binary = self.get_binary("PGD") batch = BatchJob(os.environ, wrapper=self.wrapper) self.nlgen.load(self.program) settings = self.nlgen.assemble_namelist(self.program) self.nlgen.write_namelist(settings, "OPTIONS.nam") filetype = settings["nam_io_offline"]["csurf_filetype"].lower() pgdfile = settings["nam_io_offline"]["cpgdfile"] pgdfile = f"{pgdfile}.{filetype}" # Input data input_definition = ConfigPaths.path_from_subpath( self.platform.get_system_value("sfx_input_definition") ) with open(input_definition, "r", encoding="utf-8") as f: input_data = json.load(f) if self.one_decade: def replace(data, match, repl): if isinstance(data, dict): for k, v in data.items(): if isinstance(data[k], str): data[k] = data[k].replace(match, repl) replace(v, match, repl) return data input_data = replace( input_data, "@DECADE@", get_decade(as_datetime(self.basetime)) ) # Could potentially manipulate input_data depending on settings # or send input_data as input from an external file binput_data = InputDataFromNamelist( settings, input_data, self.program, self.platform, basetime=self.basetime, one_decade=self.one_decade, ).get() for dest, target in binput_data.items(): logger.debug("target={}, dest={}", target, dest) self.fmanager.input(target, dest) # Run PGD batch.run(binary) self.fmanager.output(pgdfile, output) self.archive_logs(["OPTIONS.nam", "LISTING_PGD.txt"], target=self.climdir) else: logger.warning("Output already exists: ", output)
[docs] class Prep(Task): """Prep.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ Task.__init__(self, config, __class__.__name__) self.nlgen = NamelistGenerator(self.config, "surfex") self.archive = self.platform.get_system_value("archive") # TODO get from args self.force = True
[docs] def execute(self): """Execute.""" cnmexp = self.config["general.cnmexp"] output = f"{self.archive}/ICMSH{cnmexp}INIT.sfx" if not os.path.exists(output) or self.force: binary = self.get_binary("PREP") deodemakedirs(self.archive) batch = BatchJob(os.environ, wrapper=self.wrapper) bd_has_surfex = self.config["boundaries.bd_has_surfex"] basetime = as_datetime(self.config["general.times.basetime"]) namelist_task = "prep" self.nlgen.load(namelist_task) settings = self.nlgen.assemble_namelist(namelist_task) settings["nam_prep_surf_atm"]["nyear"] = int(basetime.strftime("%Y")) settings["nam_prep_surf_atm"]["nmonth"] = int(basetime.strftime("%m")) settings["nam_prep_surf_atm"]["nday"] = int(basetime.strftime("%d")) settings["nam_prep_surf_atm"]["xtime"] = ( basetime.hour * 3600 + basetime.minute * 60 + basetime.second ) self.nlgen.write_namelist(settings, "OPTIONS.nam") # Input data input_definition = ConfigPaths.path_from_subpath( self.platform.get_system_value("sfx_input_definition") ) with open(input_definition, "r", encoding="utf-8") as f: input_data = json.load(f) # Determine PGD type and name filetype = settings["nam_io_offline"]["csurf_filetype"].lower() pgd = settings["nam_io_offline"]["cpgdfile"] pgdfile = f"{pgd}.{filetype}" # PGD file input update const_clim = self.config["file_templates.pgd.archive"] pgdfile_source = self.platform.substitute(f"@CLIMDIR@/{const_clim}") input_data["prep"]["NAM_IO_OFFLINE#CPGDFILE"] = { pgd: {pgdfile: pgdfile_source} } if bd_has_surfex: # Host model PGD type and name filetype = settings["nam_prep_surf_atm"]["cfilepgdtype"].lower() pgd_host = settings["nam_prep_surf_atm"]["cfilepgd"] const_clim_host = self.config["file_templates.pgd_host.archive"] pgd_host_source = self.platform.substitute( f"@BDCLIMDIR@/{const_clim_host}" ) input_data["prep"]["NAM_PREP_SURF_ATM#CFILEPGD"] = { pgd_host: {f"{pgd_host}.{filetype}": pgd_host_source} } # Determine prep output name prep_output_file = settings["nam_io_offline"]["cprepfile"] prep_output_file = f"{prep_output_file}.{filetype}" # Select the correct input file basetime = as_datetime(self.config["general.times.basetime"]) bddir_sfx = self.config["system.bddir_sfx"] bdfile_sfx_template = self.config["system.bdfile_sfx_template"] if self.config["boundaries.bdmodel"] != "IFS": bdcycle = as_timedelta(self.config["boundaries.bdcycle"]) bdcycle_start = as_timedelta(self.config["boundaries.bdcycle_start"]) else: mars = Marsprep.mars_selection(self) bdcycle = as_timedelta(mars["ifs_cycle_length"]) bdcycle_start = as_timedelta(mars["ifs_cycle_start"]) bdshift = as_timedelta(self.config["boundaries.bdshift"]) bd_basetime = basetime - cycle_offset( basetime, bdcycle, bdcycle_start=bdcycle_start, bdshift=-bdshift ) prep_input_file = self.platform.substitute( f"{bddir_sfx}/{bdfile_sfx_template}", basetime=bd_basetime, validtime=basetime, ) # Update input file linking filetype_ext = {"FA": ".fa", "GRIB": ""} cfiletype = settings["nam_prep_surf_atm"]["cfiletype"].upper() cfile = settings["nam_prep_surf_atm"]["cfile"] cfileext = filetype_ext[cfiletype] input_data["prep"]["NAM_PREP_SURF_ATM#CFILETYPE"] = { cfiletype: {f"{cfile}{cfileext}": prep_input_file} } # Fetch the input binput_data = InputDataFromNamelist( settings, input_data, "prep", self.platform ).get() for dest, target in binput_data.items(): logger.debug("target={}, dest={}", target, dest) self.fmanager.input(target, dest) # Run PREP and archive output batch.run(binary) self.fmanager.output(prep_output_file, output) self.archive_logs(["OPTIONS.nam", "LISTING_PREP0.txt"]) else: logger.info("Output already exists: {}", output)