Source code for deode.fullpos

#!/usr/bin/env python3
"""Fullpos namelist generation."""


import yaml

from .config_parser import ConfigPaths
from .general_utils import merge_dicts
from .logs import logger


[docs] def flatten_list(li): """Recursively flatten a list of lists (of lists).""" if li == []: return li if isinstance(li[0], list): return flatten_list(li[0]) + flatten_list(li[1:]) return li[:1] + flatten_list(li[1:])
[docs] class InvalidSelectionCombinationError(ValueError): """Custom exception."""
[docs] class Fullpos: """Fullpos namelist generator based on (yaml) dicts.""" def __init__(self, domain, fpdir=".", fpfiles=None, fpdict=None, rules=None): """Construct the fullpos generator. Args: domain (str): Domain name fpdir (str): Path to fullpos config files fpfiles (list): List of fullpos config files to read (without the yml suffix) fpdict (dict): A dictionary of fullpos settings rules (dict): A dictionary of substitution rules """ self.domain = domain self.fpdir = fpdir self.nldict = {} self.rules = rules if rules is not None else {} if fpfiles is not None: self.nldict.update(self.load(fpdir, fpfiles)) if fpdict is not None: self.nldict.update(fpdict)
[docs] def expand(self, v, levtype, levels, domain, i): """Expand fullpos namelists to levels and domains. Args: v (str): parameter list levtype (str): type of vertical level in fullpos syntax levels (list): list of levels domain (str): domain name i (int): parameter conuter Returns: d (dict): expaned names i (int): parameter counter """ d = {} for p in v["CL3DF"]: i += 1 j = 0 par = f"CL3DF({i})" d[par] = p for level in v[levtype]: j += 1 lev = f"IL3DF({j},{i})" dom = f"CLD3DF({j},{i})" d[lev] = levels.index(level) + 1 d[dom] = domain return d, i
[docs] def load(self, fpdir, fpfiles): """Load fullpos yaml file. Arguments: fpdir (str): Path do fullpos settings fpfiles (list): List of yml files to read Returns: nldict (dict): fullpos settings """ s = "selection" nldict = {s: {}} for fpfile in fpfiles: f = ConfigPaths.path_from_subpath(f"{fpdir}/{fpfile}.yml") logger.info("Read {}", f) with open(f, mode="rt", encoding="utf-8") as file: n = yaml.safe_load(file) file.close() if s in n: nldict[s] = merge_dicts(nldict[s], n[s]) else: nldict.update(n) return nldict
[docs] def update_selection(self, additions_list=None, additions_dict=None): """Add choices to the selection section. Args: additions_list (list): Additional selection to be read from files additions_dict (dict): Additional selection as a dictionary """ if additions_list is not None: # Read the update for addition in additions_list: fpfile = ConfigPaths.path_from_subpath(f"{self.fpdir}/{addition}.yml") with open(fpfile, mode="rt", encoding="utf-8") as file: nldict = yaml.safe_load(file) file.close() self.nldict["selection"] = merge_dicts(self.nldict["selection"], nldict) if additions_dict is not None: self.nldict["selection"] = merge_dicts( self.nldict["selection"], additions_dict )
[docs] def replace_rules(self, vin): """Replace patterns from the rules dict. Args: vin (str|list): Input values Raises: RuntimeError: Invalid type Returns: v (str|list): possibly replaced strings """ v = vin if isinstance(vin, str) else vin.copy() for rp, rv in self.rules.items(): if isinstance(v, list): for y in v: if isinstance(y, str) and y == rp: i = v.index(y) v[i] = rv else: raise RuntimeError("Invalid type:", type(v), v) return v
[docs] def construct(self): """Construct the fullpos namelists. Returns: namfpc_out (dict): namfpc part selection (dict): xxtddddhhmm part Raises: InvalidSelectionCombination: Invalid combination in selection """ namfpc_out = {"NAMFPC": self.nldict["NAMFPC"].copy()} selection = self.nldict["selection"].copy() level_map = self.nldict["LEVEL_MAP"] param_map = self.nldict["PARAM_MAP"] namfpc = {v: [] for k, v in level_map.items()} for v in param_map.values(): for vv in v.values(): namfpc[vv] = [] # Set empty namelists empty_namelists = {k: [] for k in level_map} for k in param_map: empty_namelists[k] = [] # Map all fields and levels to the correct # entries in NAMFPC for namelists in selection.values(): for namelist, namelist_key in namelists.items(): if namelist in ["NAMFPPHY", "NAMPPC", "NAMFPDY2"]: for key, fields in namelist_key.items(): x = param_map[namelist][key] namfpc[x].append(fields) elif "CL3DF" in namelist_key: # Handle selection sections without label if len(namelist_key.keys()) > 2: raise InvalidSelectionCombinationError(namelist_key.keys()) lmap = level_map[namelist] if len(self.rules) > 0 and lmap == "NRFP3S": namelist_key[lmap] = self.replace_rules(namelist_key[lmap]) namfpc[lmap].append(namelist_key[lmap]) pmap = param_map[namelist]["CL3DF"] namfpc[pmap].append(namelist_key["CL3DF"]) else: # Handle selection sections with label lmap = level_map[namelist] for y in namelist_key.values(): if len(self.rules) > 0 and lmap == "NRFP3S": y[lmap] = self.replace_rules(y[lmap]) namfpc[lmap].append(y[lmap]) pmap = param_map[namelist]["CL3DF"] namfpc[pmap].append(y["CL3DF"]) namfpc = {k: list(set(flatten_list(v))) for k, v in namfpc.items()} for key in namfpc: if len(namfpc[key]) > 0: namfpc[key].sort() namfpc_out["NAMFPC"][key] = namfpc[key] # Add domain and level mapping for time_section, namelists in selection.items(): # Initialize with empty namelists tmp = {namelist: {} for namelist in empty_namelists} for namelist, keys in namelists.items(): if namelist in ["NAMFPPHY", "NAMPPC", "NAMFPDY2"]: section = {} for key, val in keys.items(): param = "".join([key[0:2], "D", key[2:]]) section[key] = val section[param] = [self.domain for j in range(len(val))] tmp[namelist] = section elif "CL3DF" in keys: # Handle selection sections without label level_key = level_map[namelist] keys[level_key] = list(set(flatten_list(keys[level_key]))) tmp[namelist], i = self.expand( keys, level_key, namfpc[level_key], self.domain, 0 ) else: # Handle selection sections with label level_key = level_map[namelist] i = 0 params = {} # Set levels per requested field for sub_selection in keys.values(): for param in sub_selection["CL3DF"]: if param not in params: params[param] = { "CL3DF": [param], "NRFP3S": [sub_selection[level_key]], } params[param]["NRFP3S"].append(sub_selection[level_key]) for par in params: params[par]["NRFP3S"] = list( set(flatten_list(params[par]["NRFP3S"])) ) for sub_selection in params.values(): sub_selection[level_key] = list( set(flatten_list(sub_selection[level_key])) ) d, i = self.expand( sub_selection, level_key, namfpc[level_key], self.domain, i ) tmp[namelist].update(d) # Update the selection selection[time_section] = tmp if "xxt00000000" in selection: self.check_non_instant_fields(selection, "xxt00000000") return namfpc_out, selection
[docs] def check_non_instant_fields(self, selection, time_selection): """Search for non instant fields. Args: selection (dict): Dict with fullpos settings to be examined time_selection (str): Which selection time rules to check Raises: RuntimeError: Non instant fields found """ field_list = [] for parlist in selection[time_selection].values(): for var in parlist.values(): if isinstance(var, list): fields = [x for x in var if x in self.nldict["NON_INSTANT_FIELDS"]] field_list.append(fields) field_list = flatten_list(field_list) if len(field_list) > 0: logger.error("Non instant fields found for {}", time_selection) logger.error(field_list) logger.info( "Change selection or empty `NON_INSTANT_FIELDS` in rules.yml to override" ) raise RuntimeError("Non instant fields found")