"""Surfex tasks."""
import json
import os
from pysurfex.cli import pgd, prep
from ..config_parser import ConfigPaths
from ..datetime_utils import as_datetime, as_timedelta, cycle_offset
from ..logs import logger
from ..namelist import NamelistGenerator
from ..os_utils import deodemakedirs
from .base import Task
from .batch import BatchJob
from .marsprep import Marsprep
[docs]
class PySurfexBaseTask(Task):
"""Base task class for pysurfex tasks."""
def __init__(self, config, name):
"""Construct pysurfex-experiment base class.
Args:
-------------------------------------------
config (ParsedConfig): Configuration.
name (str): Task name.
"""
Task.__init__(self, config, name)
climdir = self.platform.get_system_value("climdir")
deodemakedirs(climdir)
self.pysurfex_domain_file = f"{climdir}/domain.json"
self.pysurfex_system_file = f"{climdir}/system.json"
if not os.path.exists(self.pysurfex_domain_file):
# Domain/geo
conf_proj_dict = {
"nam_pgd_grid": {"cgrid": "CONF PROJ"},
"nam_conf_proj_grid": {
"nimax": self.config["domain.nimax"],
"njmax": self.config["domain.njmax"],
"xloncen": self.config["domain.xloncen"],
"xlatcen": self.config["domain.xlatcen"],
"xdx": self.config["domain.xdx"],
"xdy": self.config["domain.xdy"],
"ilone": self.config["domain.ilone"],
"ilate": self.config["domain.ilate"],
},
"nam_conf_proj": {
"xlon0": self.config["domain.xlon0"],
"xlat0": self.config["domain.xlat0"],
},
}
with open(self.pysurfex_domain_file, mode="w", encoding="utf8") as fhandler:
json.dump(conf_proj_dict, fhandler)
# Binary input data
self.pysurfex_input_definition = ConfigPaths.path_from_subpath(
self.platform.get_system_value("sfx_input_definition")
)
self.pysurfex_input_definition = self.pysurfex_input_definition.as_posix()
# Create PySurfex system paths
system_paths = self.config["system"].dict()
platform_paths = self.config["platform"].dict()
exp_file_paths = {}
for key, val in system_paths.items():
lkey = self.platform.substitute(key)
lval = self.platform.substitute(val)
exp_file_paths.update({lkey: lval})
for key, val in platform_paths.items():
lkey = self.platform.substitute(key)
lval = self.platform.substitute(val)
exp_file_paths.update({lkey: lval})
if not os.path.exists(self.pysurfex_system_file):
with open(self.pysurfex_system_file, mode="w", encoding="utf8") as fhandler:
json.dump(exp_file_paths, fhandler)
[docs]
def execute(self):
"""Execute."""
raise NotImplementedError("Base class is not supposed to be executed")
[docs]
class Pgd(PySurfexBaseTask):
"""Task."""
def __init__(self, config):
"""Construct object.
Args:
config (deode.ParsedConfig): Configuration
"""
PySurfexBaseTask.__init__(self, config, __class__.__name__)
self.program = "pgd"
self.nlgen = NamelistGenerator(self.config, "surfex")
self.climdir = self.platform.get_system_value("climdir")
self.one_decade = self.config["pgd.one_decade"]
self.basetime = config["task.args.basetime"]
self.pgd_prel = self.platform.substitute(
self.config["file_templates.pgd_prel.archive"], basetime=self.basetime
)
self.force = True
[docs]
def execute(self):
"""Execute."""
output = f"{self.climdir}/{self.pgd_prel}"
if not os.path.exists(output) or self.force:
binary = self.get_binary("PGD")
self.nlgen.load(self.program)
settings = self.nlgen.assemble_namelist(self.program)
self.nlgen.write_namelist(settings, "OPTIONS.nam_input")
argv = [
"--domain",
self.pysurfex_domain_file,
"--system-file-paths",
self.pysurfex_system_file,
"--input-binary-data",
self.pysurfex_input_definition,
"--namelist-path",
"OPTIONS.nam_input",
"--print-namelist",
"--binary",
binary,
"--output",
output,
"--wrapper",
self.wrapper,
]
if self.force:
argv.append("--force")
if self.one_decade:
basetime = as_datetime(self.basetime).strftime("%Y%m%d%H")
argv += ["--basetime", basetime, "--one-decade"]
pgd(argv=argv)
self.archive_logs(["OPTIONS.nam", "LISTING_PGD.txt"], target=self.climdir)
else:
logger.warning("Output already exists: ", output)
[docs]
class Prep(PySurfexBaseTask):
"""Prep."""
def __init__(self, config):
"""Construct object.
Args:
config (deode.ParsedConfig): Configuration
"""
PySurfexBaseTask.__init__(self, config, __class__.__name__)
self.nlgen = NamelistGenerator(self.config, "surfex")
self.archive = self.platform.get_system_value("archive")
self.intp_bddir_sfx = self.platform.get_system_value("intp_bddir_sfx")
self.force = True
[docs]
def execute(self):
"""Execute."""
cnmexp = self.config["general.cnmexp"]
output = f"{self.intp_bddir_sfx}/ICMSH{cnmexp}INIT.sfx"
bdmodel = self.config["boundaries.bdmodel"]
if not os.path.exists(output) or self.force:
binary = self.get_binary("PREP")
deodemakedirs(self.archive)
bd_has_surfex = self.config["boundaries.bd_has_surfex"]
basetime = as_datetime(self.config["general.times.basetime"])
namelist_task = "prep"
self.nlgen.load(namelist_task)
settings = self.nlgen.assemble_namelist(namelist_task)
self.nlgen.write_namelist(settings, "OPTIONS.nam_input")
# TODO this should be externalized
# Select the correct input file
basetime = as_datetime(self.config["general.times.basetime"])
bddir_sfx = self.config["system.bddir_sfx"]
if bdmodel == "ifs":
mars = Marsprep.mars_selection(
selection=self.platform.substitute(
self.config["boundaries.ifs.selection"]
),
config=self.config,
)
bdcycle = as_timedelta(mars["ifs_cycle_length"])
bdcycle_start = as_timedelta(mars["ifs_cycle_start"])
else:
bdcycle = as_timedelta(self.config["boundaries.lam.bdcycle"])
bdcycle_start = as_timedelta(self.config["boundaries.lam.bdcycle_start"])
bdshift = as_timedelta(self.config["boundaries.bdshift"])
bd_basetime = basetime - cycle_offset(
basetime, bdcycle, bdcycle_start=bdcycle_start, bdshift=bdshift
)
bdfile_sfx_template: str = self.config["file_templates.bdfile_sfx.archive"]
if not self.config.get(f"boundaries.{bdmodel}.bdmember"):
bdfile_sfx_template = bdfile_sfx_template.replace("@BDMEMBER@", "0")
prep_input_file = self.platform.substitute(
f"{bddir_sfx}/{bdfile_sfx_template}",
basetime=bd_basetime,
validtime=basetime,
)
# PGD file input update
const_clim = self.config["file_templates.pgd.archive"]
pgdfile_source = self.platform.substitute(f"@CLIMDIR@/{const_clim}")
argv = [
"--basetime",
basetime.strftime("%Y%m%d%H"),
"--pgd",
pgdfile_source,
"--prep-file",
prep_input_file,
"--prep-filetype",
"grib",
"--system-file-paths",
self.pysurfex_system_file,
"--input-binary-data",
self.pysurfex_input_definition,
"--namelist-path",
"OPTIONS.nam_input",
"--print-namelist",
"--binary",
binary,
"--output",
output,
"--wrapper",
self.wrapper,
]
if self.force:
argv.append("--force")
if bd_has_surfex:
const_clim_host = self.config["file_templates.pgd_host.archive"]
pgd_host_source = self.platform.substitute(
f"@BDCLIMDIR@/{const_clim_host}"
)
argv += ["--prep-pgdfile", pgd_host_source, "--prep-pgdfiletype", "FA"]
prep(argv=argv)
self.archive_logs(["OPTIONS.nam", "LISTING_PREP0.txt"])
else:
logger.info("Output already exists: {}", output)
[docs]
class PgdFilterTownFrac(Task):
"""PgdFilterTownFrac.
Reduce Town frac in Pgd file made with eccoclimapII.
"""
def __init__(self, config):
"""Construct object.
Args:
config (deode.ParsedConfig): Configuration
"""
Task.__init__(self, config, __class__.__name__)
self.climdir = self.platform.get_system_value("climdir")
self.pgd_frac = self.get_binary("pgd_frac")
self.basetime = config["task.args.basetime"]
self.inoutfile = self.platform.substitute(
self.config["file_templates.pgd.archive"], basetime=self.basetime
)
[docs]
def execute(self):
"""Run task."""
self.fmanager.input(
f"{self.climdir}/{self.inoutfile}", self.inoutfile, provider_id="copy"
)
# Run pgd_frac
batch = BatchJob(os.environ, wrapper=self.wrapper)
batch.run(f"{self.pgd_frac} {self.inoutfile}")
self.fmanager.output(self.inoutfile, f"{self.climdir}/{self.inoutfile}")