"""Marsprep."""
import ast
import contextlib
import os
from functools import cached_property
from pathlib import Path
from typing import Dict, List, Optional
from deode.eps.eps_setup import get_member_config, infer_members
from ..config_parser import ParsedConfig
from ..datetime_utils import as_datetime, as_timedelta, cycle_offset
from ..logs import logger
from ..mars_utils import (
BaseRequest,
add_additional_data_to_all,
add_additional_file_specific_data,
check_data_available,
compile_target,
get_and_remove_data,
get_domain_data,
get_mars_keys,
get_steplist,
get_steps_and_members_to_retrieve,
get_value_from_dict,
mars_write_method,
move_files,
write_compute_mars_req,
write_retrieve_mars_req,
)
from ..os_utils import deodemakedirs, join_files, list_files_join
from ..tasks.batch import BatchJob
from .base import Task
[docs]
class Marsprep(Task):
"""Marsprep task."""
def __init__(self, config):
"""Construct forecast object.
Args:
config (deode.ParsedConfig): Configuration
Raises:
ValueError: No data for this date.
"""
Task.__init__(self, config, __class__.__name__)
# Get bdmember(s) from member specific eps setting if member specific
# mars prep is enabled
if self.config["suite_control.member_specific_mars_prep"]:
try:
member_ = int(os.environ.get("MEMBER"))
except ValueError as exc:
raise ValueError(
"MEMBER environment variable must be an integer if "
"suite_control.member_specific_mars_prep is True"
) from exc
else:
member_config = get_member_config(self.config, member_)
bdmember_config_value = member_config["boundaries.ifs.bdmember"]
# Get bdmember(s) from eps members settings (attempted first) or
# boundaries.ifs.bdmember.
else:
try:
bdmember_config_value = self.config[
"eps.member_settings.boundaries.ifs.bdmember"
]
except KeyError:
bdmember_config_value = self.config["boundaries.ifs.bdmember"]
self.bdmember = infer_members(self.platform.substitute(bdmember_config_value))
# Default to [None] if self.bdmember is empty to cover the deterministic
# case with no boundary member nesting.
if not self.bdmember:
self.bdmember = [None]
# path to sfcdata on disk
resolution = self.mars["resolution"]
self.platform.store_macro("RESOLUTION", resolution)
self.sfcdir = Path(
self.platform.substitute(
str(self.platform.get_platform_value("global_sfcdir"))
)
)
logger.info(f"sfc dir: {self.sfcdir}")
# Get the times from config.toml
self.basetime = as_datetime(self.config["general.times.basetime"])
forecast_range = as_timedelta(self.config["general.times.forecast_range"])
# Check if there are data for specific date in mars
check_data_available(self.basetime, self.mars)
# Get boundary informations
bdint = as_timedelta(self.config["boundaries.bdint"])
bdcycle = as_timedelta(self.mars["ifs_cycle_length"])
bdcycle_start = as_timedelta(self.mars["ifs_cycle_start"])
bdshift = as_timedelta(self.config["boundaries.bdshift"])
if bdshift.total_seconds() % bdcycle.total_seconds() != 0:
raise ValueError("bdshift needs to be a multiple of bdcycle!")
# Get date/time/steps info
bd_offset = cycle_offset(
self.basetime,
bdcycle,
bdcycle_start=bdcycle_start,
bdshift=bdshift,
)
self.bd_basetime = self.basetime - bd_offset
self.steps = get_steplist(bd_offset, forecast_range, bdint)
logger.info("bd_basetime: {}", self.bd_basetime)
self.init_date_str = self.bd_basetime.strftime("%Y%m%d")
self.init_hour_str = self.bd_basetime.strftime("%H")
exist_snow = False
with contextlib.suppress(KeyError):
self.param_snow = get_value_from_dict(
self.mars["GG_snow"], self.init_date_str
)
exist_snow = True
start_snow_date = as_datetime(self.mars["start_date"])
with contextlib.suppress(KeyError):
start_snow_date = as_datetime(self.mars["start_snow_date"])
self.exist_snow = exist_snow and self.bd_basetime >= start_snow_date
# Split mars by bdint
self.split_mars = self.config["suite_control.split_mars"]
if self.split_mars:
self.prep_step = ast.literal_eval(self.config["task.args.prep_step"])
self.prepdir = Path(
self.platform.substitute(
self.config["system.bddir"],
basetime=self.bd_basetime,
validtime=self.basetime,
)
)
logger.info("MARS data expected in:{}", self.prepdir)
self.mars_version = int(
self.config.get("submission.task_exceptions.Marsprep.mars_version", "6")
)
self.mars_bin = self.get_binary("mars")
# Use fixed static orography files (spherical harmonics and/or gaussian grid)
# TODO: check existence of static orography files & read settings
if "use_static_sh" in self.mars:
self.use_static_sh_oro = bool(self.mars["use_static_sh"])
else:
self.use_static_sh_oro = False
if self.use_static_sh_oro:
self.sh_oro_source = self.mars["sh_oro_source"]
logger.info("Using static SH orography file {}", self.sh_oro_source)
if "use_static_gg" in self.mars:
self.use_static_gg_oro = bool(self.mars["use_static_gg"])
else:
self.use_static_gg_oro = False
if self.use_static_gg_oro:
self.gg_oro_source = self.mars["gg_oro_source"]
logger.info("Using static GG orography file {}", self.gg_oro_source)
[docs]
@staticmethod
def mars_selection(selection: str, config: ParsedConfig) -> dict:
"""Copy default settings if requested.
Args:
selection (str): The selection to use.
config (deode.ParsedConfig): Configuration object
Returns:
mars (dict): mars config section
"""
mars = config[f"mars.{selection}"].dict()
if "expver" not in mars:
mars["expver"] = selection
# Copy default settings if requested
if "default" in mars:
default = config[f"mars.{mars['default']}"]
for k in default:
if k not in mars:
mars[k] = default[k]
return mars
@cached_property
def mars(self) -> dict:
"""Get mars selection."""
selection = self.platform.substitute(self.config["boundaries.ifs.selection"])
return self.mars_selection(selection, self.config)
[docs]
def update_data_request(
self,
request: BaseRequest,
prefetch: bool,
specify_domain: bool,
bdmember: Optional[List[int]] = None,
grid: Optional[str] = None,
source: Optional[str] = None,
fieldset: Optional[str] = None,
):
"""Create ECMWF MARS system request.
Args:
request: BaseRequest object to update
prefetch: Retrieve or stage
specify_domain: Use lat/lon and rotation or use global (default)
bdmember: Boundary members to retrieve in case of eps.
grid: Specific grid for some request. Default None.
source: Sorce for retrieve data from disk. Defaults None.
fieldset: Name of fieldset. Defaults None.
"""
if grid is not None and self.mars_version == 6:
request.update_request({"GRID": grid})
# Additional request parameters if EPS
# Only if all members are True like. This makes it possible to distinguish
# between a deterministic run with no boundary member nesting (members = [None])
# and an ensemble run with boundary member nesting (e.g. members = [0]
# for a one member ensemble, members = [0, 1, 2] for a three member ensemble etc.)
if bdmember and all(bdmember):
request.update_request({"NUMBER": "/".join(map(str, bdmember))})
if self.mars_version == 6:
request.update_request({"PROCESS": "LOCAL"})
request.add_levelist(self.mars["levelist"])
# Set stream
stream = get_value_from_dict(self.mars["stream"], request.time)
request.update_request({"STREAM": stream})
# Retrieve from already fetched data
if not prefetch:
request.update_request({"SOURCE": source})
# NOTE: the following could be moved to get_geopotential_latlon()
if "mars_latlonZ" in request.target and (
self.use_static_gg_oro or self.use_static_sh_oro
):
z_keys = get_mars_keys(source)
request.update_request(z_keys)
request.add_database_options()
if specify_domain:
request.update_request(
{
"GRID": get_value_from_dict(self.mars["grid"], request.date),
"AREA": get_domain_data(self.config),
}
)
if fieldset:
request.update_request({"FIELDSET": fieldset})
[docs]
def execute(self):
"""Run task.
Define run sequence.
Raises:
RuntimeError: If there is an issue with the work folder.
"""
try:
if not os.path.exists(self.prepdir):
deodemakedirs(
self.prepdir, unixgroup=self.platform.get_platform_value("unix_group")
)
except OSError as e:
raise RuntimeError(f"Error while preparing the mars folder: {e}") from e
if self.split_mars and self.prep_step:
logger.debug("*** Need only latlon data")
else:
if self.split_mars:
self.steps = [self.steps[int(self.config["task.args.bd_index"])]]
logger.info("Need steps:{}", self.steps)
self.get_grid_point_surface_data()
self.get_spectral_harmonic_data()
self.get_grid_point_upper_air_data()
if self.config["suite_control.do_interpolsstsic"]:
self.get_sst_data()
self.get_sfx_data()
[docs]
def get_grid_point_surface_data(self):
"""Get grid point surface data."""
tag = "ICMGG"
steps, members_dict, _ = get_steps_and_members_to_retrieve(
self.steps, self.prepdir, tag, self.bdmember
)
if steps:
self.get_gg_data(tag, steps, members_dict)
exist_soil = False
with contextlib.suppress(KeyError):
gg_soil_param = get_value_from_dict(
self.mars["GG_soil"], self.init_date_str
)
gg1_param = get_value_from_dict(self.mars["GG1"], self.init_date_str)
exist_soil = True
additional_data = {}
if exist_soil:
additional_data["common_data"] = self.get_gg_soil_data(
tag, params={"GG_soil": gg_soil_param, "GG1": gg1_param}
)
else:
self.fmanager.input(f"{self.sfcdir}/sfcfile", "sfcdata")
additional_data["common_data"] = get_and_remove_data("sfcdata")
if self.exist_snow:
additional_data |= self.get_gg_snow_data(
tag, steps, members_dict, self.param_snow
)
add_additional_file_specific_data(additional_data=additional_data)
else:
add_additional_data_to_all(tag, steps, members_dict, additional_data)
move_files(tag, steps, members_dict, self.prepdir)
[docs]
def get_spectral_harmonic_data(self):
"""Get spectral harmonic data."""
tag = "ICMSH"
steps, members_dict, _ = get_steps_and_members_to_retrieve(
self.steps, self.prepdir, tag, self.bdmember
)
if steps:
self.get_sh_data(
tag,
steps,
members_dict,
)
additional_data = {"z": self.get_shz_data(tag)}
add_additional_data_to_all(tag, steps, members_dict, additional_data)
move_files(tag, steps, members_dict, self.prepdir)
[docs]
def get_grid_point_upper_air_data(self):
"""Get gridpoint upper air data."""
tag = "ICMUA"
steps, members_dict, _ = get_steps_and_members_to_retrieve(
self.steps, self.prepdir, tag, self.bdmember
)
if steps:
self.get_ua_data(tag, steps, members_dict)
logger.info(
"steps {}, members_dict {}",
steps,
members_dict,
)
move_files(tag, steps, members_dict, self.prepdir)
[docs]
def get_bdmember_fetch_list(self, bddir: str, bdfile: str):
"""Builds the list of missing files to fetch from MARS."""
mars_file_check_list = {}
prep_dir = Path(
self.platform.substitute(
bddir,
basetime=self.bd_basetime,
validtime=self.basetime,
)
)
for member in self.bdmember:
prep_filename = self.platform.substitute(
bdfile.replace("@BDMEMBER@", str(member or 0)),
basetime=self.bd_basetime,
validtime=self.basetime,
)
mars_file_check_list[member] = prep_dir / prep_filename
bdmember_fetch_list = [
bdmember
for bdmember, mars_file_check in mars_file_check_list.items()
if not os.path.exists(mars_file_check)
]
if not bdmember_fetch_list:
logger.info("Prep files already exists as:")
for bdmember, filename in mars_file_check_list.items():
logger.info(f" {bdmember}: {filename}")
elif self.split_mars and not self.prep_step:
logger.debug("No need Prep file")
bdmember_fetch_list = []
else:
for member in self.bdmember:
if member not in bdmember_fetch_list:
logger.info(
"Prep file member={} already exists as {}",
member,
mars_file_check_list[member],
)
else:
logger.info(
"Create prep file member={} as {}",
member,
mars_file_check_list[member],
)
return mars_file_check_list, bdmember_fetch_list
[docs]
def get_sfx_data(self):
"""Get SFX data."""
bddir = self.config["system.bddir_sfx"]
bdfile = self.config["file_templates.bdfile_sfx.archive"]
mars_file_check_list, bdmember_fetch_list = self.get_bdmember_fetch_list(
bddir, bdfile
)
# Split the lat/lon part and perform it here
self.get_lat_lon_data(bdmember_fetch_list)
self.get_geopotential_latlon(self.bdmember[0], bdmember_fetch_list)
# Get the file list to join
for bdmember in bdmember_fetch_list:
prep_pattern = f"mars_latlon*_{bdmember or 0}"
filenames = list_files_join(self.wdir, prep_pattern)
prep_filepath = mars_file_check_list[bdmember]
join_files(filenames, prep_filepath)
[docs]
def get_sst_data(self):
"""Get SST data."""
prep_dir = Path(
self.platform.substitute(
self.config["system.bddir_sst"],
basetime=self.bd_basetime,
validtime=self.basetime,
)
)
bdfile = self.config["file_templates.bdfile_sst.archive"]
_, members_dict, missing_steps_per_member = get_steps_and_members_to_retrieve(
self.steps,
prep_dir,
bdfile,
self.bdmember,
platform=self.platform,
basetime=self.bd_basetime,
validtime=self.basetime,
)
if not missing_steps_per_member:
logger.info("All SST files already exists")
else:
# Get lat lon data
self.get_lat_lon_sst_data(missing_steps_per_member, members_dict)
# Get the file list to join
for member in missing_steps_per_member:
steps = missing_steps_per_member[member]
for step in steps:
merge_pattern = f"mars_latlonGG*_{member or 0}+{step}"
filenames = list_files_join(self.wdir, merge_pattern)
sst_filename = self.platform.substitute(
bdfile.replace("@BDMEMBER@", str(member or 0)),
basetime=self.bd_basetime,
validtime=self.basetime,
bd_index=step,
)
prep_filepath = prep_dir / sst_filename
join_files(filenames, prep_filepath)
[docs]
def get_lat_lon_data(self, bdmember_list: List[int]):
"""Get Lat/Lon data.
Args:
bdmember_list (List[int]): Members to fetch data for
"""
prefetch = False
param = (
get_value_from_dict(self.mars["GG"], self.init_date_str)
+ "/"
+ get_value_from_dict(self.mars["GG_sea"], self.init_date_str)
)
for member in bdmember_list:
data_type = self.mars["type_AN"] if member == 0 else self.mars["type_FC"]
self._build_and_run_retrieve_request(
req_file_name="latlonGG.req",
data_type=data_type,
levtype="SFC",
param=param,
steps=[self.steps[0]],
members=[member],
target=f"mars_latlonGG_{member or 0}",
prefetch=prefetch,
specify_domain=True,
# Default to ICMGG_0+* if member is None
source=f'"{self.prepdir}/ICMGG_{member or 0}+{self.steps[0]}"',
write_method=mars_write_method(self.mars_version),
)
[docs]
def get_lat_lon_sst_data(
self,
missing_steps_per_member: Dict[int, List[int]],
members_dict: Dict[str, List[int]],
):
"""Get lat/lon data for sst."""
tag_mask = "ICMGG_maskland"
tag_sea = "sea_latlon"
tag_aux = "aux_latlon"
prefetch = False
param_sea = get_value_from_dict(self.mars["GG_sea"], self.init_date_str)
param_lsm = get_value_from_dict(self.mars["GG_lsm"], self.init_date_str)
param_skt = get_value_from_dict(self.mars["GG_skt"], self.init_date_str)
target_aux = "mars_latlonGG"
target_maskland = "mars_latlonGG_maskland"
fieldset_dict = dict.fromkeys(["sst_sic", "lsm"])
fieldset_dict["sst_sic"] = {"param": param_sea, "target": target_aux}
fieldset_dict["lsm"] = {"param": param_lsm, "target": target_aux}
for member_type, members in members_dict.items():
data_type = (
self.mars["type_AN"]
if member_type == "control_member"
else self.mars["type_FC"]
)
members_to_process = list(set(members) & set(missing_steps_per_member.keys()))
for member in members_to_process:
# SST/SIC for updating during run (on lat/lon), must be done in loop as
# COMPUTE doesn't seem to support multiple steps
for step in missing_steps_per_member[member]:
source = compile_target(
f"{self.prepdir}/ICMGG", member_type, member, step
)
target = compile_target(tag_mask, member_type, member, step)
# Execute MARS request to mask land
self._build_and_run_compute_request(
req_file_name=f"{tag_mask}.req",
data_type=data_type,
levtype="SFC",
param=None, # Params are per fieldset defined in fieldset_dict
steps=[step],
target=target,
fieldset_dict=fieldset_dict,
formula='"bitmap(sst_sic, bitmap(lsm > 0.5, 1))"',
members=[member],
source=source,
prefetch=prefetch,
specify_domain=False,
write_method=mars_write_method(self.mars_version),
)
# Interpolate masked ocean fields
self._build_and_run_retrieve_request(
req_file_name=f"{tag_sea}.req",
data_type=data_type,
levtype="SFC",
param=param_sea,
steps=[step],
target=compile_target(target_maskland, member_type, member),
members=[member],
source=target,
prefetch=prefetch,
specify_domain=True,
write_method=mars_write_method(self.mars_version),
)
# Interpolate lsm & skt
self._build_and_run_retrieve_request(
req_file_name=f"{tag_aux}.req",
data_type=data_type,
levtype="SFC",
param=param_lsm + "/" + param_skt,
steps=[step],
target=compile_target(target_aux, member_type, member),
members=[member],
source=source,
prefetch=prefetch,
specify_domain=True,
write_method=mars_write_method(self.mars_version),
)
[docs]
def get_geopotential_latlon(self, first_member, bdmember_list):
"""Get geopotential in lat/lon.
Args:
first_member (int): First bdmember in ensemble
bdmember_list (List[int]): Members to fetch data for
"""
prefetch = False
# Retrieve for Surface Geopotential in lat/lon
param = get_value_from_dict(self.mars["SHZ"], self.init_date_str)
data_type = get_value_from_dict(self.mars["GGZ_type"], self.init_date_str)
lev_type = get_value_from_dict(self.mars["Zlev_type"], self.init_date_str)
if self.use_static_gg_oro:
self.fmanager.input(self.gg_oro_source, "gg_oro.Z")
z_source = "gg_oro.Z"
else:
# Default to ICMSH_0+* if member is None
z_source = f'"{self.prepdir}/ICMSH_{first_member or 0}+{self.steps[0]}"'
# NOTE: for z, the step is always 0, while steps[0] may be shifted!
target = f"mars_latlonZ_{first_member or 0}"
self._build_and_run_retrieve_request(
req_file_name="latlonz.req",
data_type=data_type,
levtype=lev_type,
param=param,
steps=[0],
members=[first_member],
target=target,
prefetch=prefetch,
specify_domain=True,
source=z_source,
write_method=mars_write_method(self.mars_version),
)
for member in bdmember_list:
if member == first_member:
continue
self.fmanager.input(target, f"mars_latlonZ_{member}")
[docs]
def get_sh_data(self, tag: str, steps: List[int], members_dict: Dict[str, List[int]]):
"""Get SH data."""
logger.info("Retrieving {} data for steps: {}", tag, steps)
param = get_value_from_dict(self.mars["SH"], self.init_date_str)
for member_type, members in members_dict.items():
data_type = (
self.mars["type_AN"]
if member_type == "control_member"
else self.mars["type_FC"]
)
self._build_and_run_retrieve_request(
req_file_name=f"{member_type}_{tag}.req",
data_type=data_type,
levtype="ML",
param=param,
steps=steps,
members=members,
target=compile_target(tag, member_type, members),
grid=self.mars["grid_ML"],
)
[docs]
def get_shz_data(self, tag: str):
"""Get geopotential in spherical harmonics."""
if self.use_static_sh_oro:
self.fmanager.input(self.sh_oro_source, f"{tag}.Z")
else:
param = get_value_from_dict(self.mars["SHZ"], self.init_date_str)
self._build_and_run_retrieve_request(
req_file_name=f"{tag}Z.req",
data_type=self.mars["SHZ_type"],
levtype="ML",
param=param,
steps=[0],
target=f'"{tag}.Z"',
grid=self.mars["grid_ML"],
)
return get_and_remove_data(f"{tag}.Z")
[docs]
def get_ua_data(self, tag: str, steps: List[int], members_dict: Dict[str, List[int]]):
"""Get upper air data."""
logger.info("Retrieving {} data for steps: {}", tag, steps)
param = get_value_from_dict(self.mars["UA"], self.init_date_str)
for member_type, members in members_dict.items():
data_type = (
self.mars["type_AN"]
if member_type == "control_member"
else self.mars["type_FC"]
)
self._build_and_run_retrieve_request(
req_file_name=f"{member_type}_{tag}.req",
data_type=data_type,
levtype="ML",
param=param,
steps=steps,
members=members,
target=compile_target(tag, member_type, members),
grid=self.mars["grid_ML"],
)
[docs]
def get_gg_data(self, tag: str, steps: List[int], members_dict: Dict[str, List[int]]):
"""Get gridpoint surface data."""
logger.info("Retrieving {} data for steps: {}", tag, steps)
param = (
get_value_from_dict(self.mars["GG"], self.init_date_str)
+ "/"
+ get_value_from_dict(self.mars["GG_sea"], self.init_date_str)
)
for member_type, members in members_dict.items():
data_type = (
self.mars["type_AN"]
if member_type == "control_member"
else self.mars["type_FC"]
)
self._build_and_run_retrieve_request(
req_file_name=f"{member_type}_{tag}.req",
data_type=data_type,
levtype="SFC",
param=param,
steps=steps,
members=members,
target=compile_target(tag, member_type, members),
)
[docs]
def get_gg_soil_data(self, tag: str, params: dict):
"""Get soil gridpoint data."""
for param_name, param in params.items():
target = f"{tag}.{param_name}"
self._build_and_run_retrieve_request(
req_file_name=f"{target}.req",
data_type=self.mars["type_AN"],
levtype="SFC",
param=param,
steps=[0],
target=target,
)
# Collect and return the data from target files
# (Read the single-step MARS files first, hence the reversed order)
data = b""
for param_name in reversed(params.keys()):
target = f"{tag}.{param_name}"
data += get_and_remove_data(target)
return data
[docs]
def get_gg_snow_data(
self, tag: str, steps: List[int], members_dict: Dict[str, List[int]], param
):
"""Get soil gridpoint data."""
additional_data: Dict[str, bytes] = {}
for member_type, members in members_dict.items():
data_type = (
self.mars["type_AN"]
if member_type == "control_member"
else self.mars["type_FC"]
)
self._build_and_run_retrieve_request(
req_file_name=f"{member_type}_{tag}.snow.req",
data_type=data_type,
levtype="SOL",
param=param,
steps=steps,
members=members,
target=compile_target(f"{tag}.snow", member_type, members),
)
for step in steps:
for member in members:
# Default to "*_0+{step}" if member is None
key = f"{tag}_{member or 0}+{step}"
file_snow = f"{tag}.snow_{member or 0}+{step}"
additional_data[key] = get_and_remove_data(file_snow)
return additional_data
def _build_retrieve_request(
self,
data_type: str,
levtype: str,
param: str,
steps: List[int],
target: str,
members: Optional[List[int]] = None,
source: Optional[str] = None,
prefetch: bool = True,
specify_domain: bool = False,
grid: Optional[str] = None,
fieldset: Optional[str] = None,
) -> BaseRequest:
"""Build request to retrieve data from mars.
Args:
data_type: Data type.
levtype: Level type.
param: Parameter(s) to retrieve.
steps: Steps to retrieve.
target: Name of the target file.
members: Members to retrieve data from.
source: Source file to retrieve data from.
prefetch: Prefetch.
specify_domain: Whether to specify domain or not.
grid: The grid to use.
fieldset: Name of fieldset. Defaults None.
Returns:
request: BaseRequest to retrieve data
"""
request = BaseRequest(
class_=self.mars["class"],
data_type=data_type,
expver=self.mars["expver"],
levtype=levtype,
date=self.init_date_str,
time=self.init_hour_str,
steps=steps,
param=param,
target=target,
)
self.update_data_request(
request,
prefetch=prefetch,
specify_domain=specify_domain,
bdmember=members,
grid=grid,
source=source,
fieldset=fieldset,
)
return request
def _run_request_file(
self,
req_file_name: str,
) -> None:
"""Run request to retrieve data from mars.
Args:
req_file_name: Name of the request file.
"""
BatchJob(os.environ, wrapper=self.wrapper).run(
f"{self.get_binary('mars')} {req_file_name}"
)
def _build_and_run_retrieve_request(
self,
req_file_name: str,
data_type: str,
levtype: str,
param: str,
steps: List[int],
target: str,
members: Optional[List[int]] = None,
source: Optional[str] = None,
prefetch: bool = True,
specify_domain: bool = False,
write_method: str = "retrieve",
grid: Optional[str] = None,
) -> None:
"""Build and run request to retrieve data from mars.
Args:
req_file_name: Name of the request file.
data_type: Data type.
levtype: Level type.
param: Parameter(s) to retrieve.
steps: Steps to retrieve.
target: Name of the target file.
members: Members to retrieve data from.
source: Source file to retrieve data from.
prefetch: Prefetch.
specify_domain: Whether to specify domain or not.
write_method: The write method to use when writing the request to file.
grid: The grid to use.
"""
request = self._build_retrieve_request(
data_type,
levtype,
param,
steps,
target,
members,
source,
prefetch,
specify_domain,
grid,
)
# Write the request to file
write_retrieve_mars_req(request, req_file_name, write_method)
# Execute written request
self._run_request_file(req_file_name)
def _build_and_run_compute_request(
self,
req_file_name: str,
data_type: str,
levtype: str,
param: str,
steps: List[int],
target: str,
fieldset_dict: dict,
formula: str,
members: Optional[List[int]] = None,
source: Optional[str] = None,
prefetch: bool = True,
specify_domain: bool = False,
write_method: str = "retrieve",
grid: Optional[str] = None,
) -> None:
"""Build and run request to retrieve and process data from mars.
Args:
req_file_name: Name of the request file.
data_type: Data type.
levtype: Level type.
param: Parameter(s) to retrieve.
steps: Steps to retrieve.
target: Name of the target file.
fieldset_dict: Dict with fieldset and fieldset-specific data
formula: Formula for compute.
members: Members to retrieve data from.
source: Source file to retrieve data from.
prefetch: Prefetch.
specify_domain: Whether to specify domain or not.
write_method: The write method to use when writing the request to file.
grid: The grid to use.
"""
# First request will write to a new file
omode = "w"
for fieldset, fieldset_data in fieldset_dict.items():
fieldset_param = param
if "param" in fieldset_data:
fieldset_param = fieldset_data["param"]
fieldset_target = target
if "target" in fieldset_data:
fieldset_target = fieldset_data["target"]
request = self._build_retrieve_request(
data_type,
levtype,
fieldset_param,
steps,
fieldset_target,
members,
source,
prefetch,
specify_domain,
grid,
fieldset,
)
# Write/append request to file
write_retrieve_mars_req(request, req_file_name, write_method, omode)
# All upcomming requests will be appened to the same file
omode = "a"
# Write/append compute request to file
write_compute_mars_req(req_file_name, formula, target, omode)
# Execute written request
self._run_request_file(req_file_name)