#!/usr/bin/env python3
"""Derive runtime variables."""
import shutil
from math import atan, floor, sin
from .datetime_utils import as_datetime, as_timedelta
from .fullpos import Fullpos, flatten_list
from .logs import logger
from .os_utils import Search
from .toolbox import Platform
[docs]
def set_times(config):
"""Set basetime/validtime if not present.
Args:
config (.config_parser.ParsedConfig): Parsed config file contents.
Returns:
update (dict): Dict of corrected basetime/validtime
"""
times = config["general.times"].dict()
if "basetime" not in times:
times.update({"basetime": times["start"]})
logger.debug("Set basetime to {}", times["basetime"])
if "validtime" not in times:
try:
times.update({"validtime": times["basetime"]})
except KeyError:
times.update({"validtime": times["start"]})
logger.debug("Set validtime to {}", times["validtime"])
if "start" not in times:
times.update({"start": times["basetime"]})
logger.debug("Set start to {}", times["start"])
if "end" not in times:
times.update({"end": times["basetime"]})
logger.debug("Set end to {}", times["end"])
update = {"general": {"times": times}}
return update
[docs]
def check_fullpos_namelist(config, nlgen):
"""Find existing fullpos select files or generate them.
Args:
config (deode.ParsedConfig): Configuration
nlgen (dict): master forecast namelist
Returns:
nlgen (dict) : Possibly updated forecast namelist
"""
platform = Platform(config)
accept_static_namelists = config["general.accept_static_namelists"]
generate_namelist = True
if accept_static_namelists:
namelists = platform.get_system_value("namelists")
fullpos_select_files = Search.find_files(
namelists, prefix="xxt", recursive=False, fullpath=True
)
if len(fullpos_select_files) > 0:
for filename in fullpos_select_files:
shutil.copy(filename, ".")
logger.info("Copy fullpos select file {}", filename)
generate_namelist = False
if generate_namelist:
_fpdir = config["fullpos.config_path"]
fpdir = platform.substitute(_fpdir)
selection = config.get("fullpos.selection", {})
fplist = [v if isinstance(v, str) else list(v) for v in selection.values()]
fplist.append(list(config["fullpos.main"]))
fpfiles = [platform.substitute(x) for x in flatten_list(fplist)]
_domain = config["fullpos.domain_name"]
domain = platform.substitute(_domain)
nrfp3s = list(range(1, int(config["vertical_levels.nlev"]) + 1))
rules = {
"${vertical_levels.nlev}": config["vertical_levels.nlev"],
"${namelist.nrfp3s}": nrfp3s,
}
fullpos = Fullpos(domain, fpdir=fpdir, fpfiles=fpfiles, rules=rules)
namfpc, selections = fullpos.construct()
logger.info("Create fullpos selection for {}", list(selections.keys()))
for head, body in selections.items():
nlgen.write_namelist(body, head)
nlgen.update(namfpc, "fullpos")
return nlgen
[docs]
def derived_variables(config, processor_layout=None):
"""Derive some variables required in the namelists.
Args:
config (deode.ParsedConfig): Configuration
processor_layout (ProcessorLayout, optional): Processor layout object
Returns:
update (dict) : Derived config update
Raises:
NotImplementedError: For configurations checking
"""
# Geometry
nbzonl = int(config["domain.nbzonl"])
if nbzonl == -1:
xdx = int(config["domain.xdx"])
nbzonl = next(x for x in range(2000) if x * xdx >= 20000)
nbzonl = int(nbzonl) if ((int(nbzonl) % 2) == 0) else int(nbzonl) + 1
if int(config["domain.nimax"]) < 250:
nbzonl = 8
nbzong = int(config["domain.nbzong"])
if nbzong == -1:
xdy = int(config["domain.xdy"])
nbzong = next(y for y in range(2000) if y * xdy >= 20000)
nbzong = int(nbzong) if ((int(nbzong) % 2) == 0) else int(nbzong) + 1
if int(config["domain.njmax"]) < 250:
nbzong = 8
ndguxg = int(config["domain.nimax"]) + int(config["domain.ilone"])
ndglg = int(config["domain.njmax"]) + int(config["domain.ilate"])
# Calculate spectral truncation
truncation_map = {"linear": 2, "quadratic": 3, "cubic": 4, "custom": None}
gridtype = config["domain.gridtype"]
if gridtype == "custom":
truncation_map[gridtype] = config["domain.custom_truncation"]
truncation = truncation_map[gridtype]
nsmax = floor((ndglg - 2) / truncation)
nmsmax = floor((ndguxg - 2) / truncation)
orographic_smoothing_method = config["domain.orographic_smoothing_method"]
if orographic_smoothing_method == "spectral":
gridtype_oro = gridtype
lspsmoro_map = config["domain.spectral_smoothing_by_gridtype"]
lspsmoro = lspsmoro_map[gridtype]
logger.info("lspsmoro:{}", lspsmoro)
nsmax_oro = nsmax
nmsmax_oro = nmsmax
elif orographic_smoothing_method == "truncation":
lspsmoro = False
gridtype_oro = config["domain.gridtype_oro"]
if gridtype_oro == "":
gridtype_oro_map = config["domain.truncation_by_gridtype"]
gridtype_oro = gridtype_oro_map[gridtype]
logger.info("gridtype_oro set to {}", gridtype_oro)
if gridtype_oro == "custom":
truncation_map[gridtype] = config["domain.custom_truncation_oro"]
nsmax_oro = floor((ndglg - 2) / truncation_map[gridtype_oro])
nmsmax_oro = floor((ndguxg - 2) / truncation_map[gridtype_oro])
else:
msg = (
"Orographic smoothing method: "
f"{orographic_smoothing_method} is not implemented"
)
raise NotImplementedError(msg)
xlat0 = config.get("domain.xlat0", "")
xlon0 = config.get("domain.xlon0", "")
if xlat0 == "":
xlat0 = config.get("domain.xlatcen")
if xlon0 == "":
xlon0 = config.get("domain.xloncen")
pi = 4.0 * atan(1.0)
xrpk = sin(float(xlat0) * pi / 180.0)
# Vertical levels
nrfp3s = list(range(1, int(config["vertical_levels.nlev"]) + 1))
# Current time
basetime = as_datetime(config["general.times.basetime"])
year = basetime.year
month = basetime.month
day = basetime.day
time = basetime.strftime("%H%M")
# Time ranges
bdint = as_timedelta(config["boundaries.bdint"])
forecast_range = as_timedelta(config["general.times.forecast_range"])
cstop = int((forecast_range.days * 24 * 3600 + forecast_range.seconds) / 60)
if cstop % 60 == 0:
cstop = int(cstop / 60)
cstop = f"h{cstop}"
else:
cstop = f"m{cstop}"
# Wind farm parameterization
if config["general.windfarm"]:
selection = list(config["fullpos.selection"])
selection.append("windfarm")
# Turn boolean to strings and macros
gen_macros = list(config["macros.gen_macros"])
decades = "one_decade" if config["pgd.one_decade"] else "all_decade"
gen_macros.append("namelist.decades")
sg_input = "osm" if config["pgd.use_osm"] else ""
gen_macros.append("namelist.sg_input")
# Update config and namelist settings
update = {
"domain": {
"gridtype_oro": gridtype_oro,
"nbzong": nbzong,
"nbzonl": nbzonl,
"ndguxg": ndguxg,
"ndglg": ndglg,
"xrpk": xrpk,
"xlat0": xlat0,
"xlon0": xlon0,
"xtrunc": truncation,
"nsmax": nsmax,
"nmsmax": nmsmax,
"nsmax_oro": nsmax_oro,
"nmsmax_oro": nmsmax_oro,
"lspsmoro": lspsmoro,
},
"macros": {
"gen_macros": gen_macros,
},
"namelist": {
"cstop": cstop,
"tefrcl": bdint.seconds,
"nrfp3s": nrfp3s,
"year": year,
"month": month,
"day": day,
"time": int(time),
"sg_input": sg_input,
"decades": decades,
},
}
# Wind farm parameterization
if config["general.windfarm"]:
update["fullpos"] = {"selection": {"windfarm": ["windfarm"]}}
if processor_layout is not None:
procs = processor_layout.get_proc_dict()
# Update namelist dicts
if procs:
update["namelist"].update(procs)
update.update(
{"submission": {"task": {"wrapper": processor_layout.get_wrapper()}}}
)
return update