"""Utility for marsprep."""
import contextlib
import os
import shutil
from dataclasses import dataclass, field, replace
from pathlib import Path
from subprocess import run
from typing import Dict, List, Tuple
from .datetime_utils import as_datetime
from .geo_utils import Projection, Projstring
from .logs import logger
[docs]
def write_mars_req(req, name, method):
"""Write a request for MARS.
Args:
req (BaseRequest): namelist object to write
name (string): request file name
method (string): selected method, retrieve or read
"""
sep0 = "{:<2}".format("")
sep1 = " = "
sep2 = ","
keys = list(req.request.keys())
with open(name, "w") as f:
f.write(str(method.upper()) + ",\n")
for key in keys:
row_str = sep0 + key + sep1 + str(req.request[key])
if key != keys[-1]:
row_str += sep2
f.write(row_str + "\n")
[docs]
def get_mars_keys(source, key_filter="-w shortName:s=z"):
"""Get the basic MARS settings from a static file, to correct a request."""
if source is None or not os.path.exists(source.strip("\"'")):
logger.error("Missing file: {}", source)
logger.info("Reading MARS config settings from {}", source)
grib_command = f"grib_get {key_filter} "
param_list = {
"CLASS": "marsClass",
"STREAM": "marsStream",
"TYPE": "marsType",
"EXPVER": "experimentVersionNumber",
"DATE": "dataDate",
"TIME": "dataTime",
"STEP": "stepRange",
"LEVTYPE": "levelType",
"LEVEL": "level",
}
result = {}
for prm in param_list:
# TODO: could be faster with only 1 call for all keys
# All output is then space-separated, so just use .split()
result[prm] = (
run(
grib_command + f"-p {param_list[prm]}:s {source}",
check=True,
shell=True, # noqa
capture_output=True,
)
.stdout.decode()
.strip("\n")
)
logger.info("Mars config - {} = {}", prm, result[prm])
return result
[docs]
def get_steps_and_members_to_retrieve(
steps: List[int], path: Path, file_name: str, members: List[int]
) -> Tuple[List[int], Dict[str, List[int | None]]]:
"""Check which mars file already exist and returns steps and members which missing.
Args:
steps (List[int]): list of steps
path (pathlib.Path): path to global files
file_name (string): name of file to check
members (List[int]): members to retrieve
Returns:
steps (List[int]): steps to retrieve
members_dict (Dict[str, List[int | None]]): members to retrieve
"""
step_list = []
member_list = []
for step in steps:
for member in members:
# Default to member 0 if member is None without adding member to
# the member_list. This covers the deterministic case with no
# boundary member nesting.
mars_file_check = path / f"{file_name}_{member or 0}+{step}"
if not os.path.exists(mars_file_check):
step_list.append(step)
logger.info("Missing file:{}", mars_file_check)
member_list.append(member)
steps = sorted(set(step_list))
# Get perturbed members only
perturbed_members = [member for member in sorted(set(member_list)) if member != 0]
# Construct dictionary with perturbed members and control depending on the
# provided bdmembers list
members_dict = {}
if 0 in members:
members_dict["control_member"] = [0]
# If no perturbed members requested, return
if not perturbed_members:
return steps, members_dict
# Add perturbed members for the cases where
# - control and perturbed members are requested
# - only perturbed members are requested
# - no perturbed members are requested (perturbed_members = [None])
# The latter case covers the "deterministic" case, where the ensemble only
# contains one member.
members_dict["perturbed_members"] = perturbed_members
return steps, members_dict
[docs]
def check_data_available(basetime, mars):
"""Check if there is data for basetime for choosen expver.
Args:
basetime (str): basetime
mars (dict): mars config section
Raises:
ValueError: No data for this date.
"""
start_date = as_datetime(mars["start_date"])
with contextlib.suppress(KeyError):
end_date = as_datetime(mars["end_date"])
if basetime > end_date:
raise ValueError(
f"No data for {basetime}! "
f"The data is available between {start_date} and {end_date}."
)
if basetime < start_date:
raise ValueError(
f"No data for {basetime}! The data is available after {start_date}"
)
[docs]
def get_domain_data(config):
"""Read and return domain data.
Args:
config (deode.ParsedConfig): Configuration from which we get the domain data
Returns:
String containing the domain info for MARS
"""
# Get domain specs
domain_spec = {
"nlon": config["domain.nimax"],
"nlat": config["domain.njmax"],
"latc": config["domain.xlatcen"],
"lonc": config["domain.xloncen"],
"lat0": config["domain.xlat0"],
"lon0": config["domain.xlon0"],
"gsize": config["domain.xdx"],
}
# Get domain properties
projstring = Projstring()
projection = Projection(
projstring.get_projstring(lon0=domain_spec["lon0"], lat0=domain_spec["lat0"])
)
domain_properties = projection.get_domain_properties(domain_spec)
fdomainstr = "/".join(
[
str(domain_properties["maxlat"]),
str(domain_properties["minlon"]),
str(domain_properties["minlat"]),
str(domain_properties["maxlon"]),
]
)
return fdomainstr
[docs]
def get_value_from_dict(dict_, key_orig):
"""Check value according to key.
- If a string returns the value itself
- If key is a date search for the most suitable match in value
- Else return the value matching the key.
Args:
dict_ (str, BaseConfig object): Values to select
key_orig (str): key for value checking
Returns:
value (str): Found value
Raises:
ValueError: Exception
"""
if isinstance(dict_, str):
return dict_
try:
ref_date = as_datetime(key_orig)
for key, val in sorted(dict_.items(), reverse=True):
if ref_date >= as_datetime(key):
return val
except ValueError:
key = str(key_orig)
if key in dict_:
return dict_[key]
raise ValueError(f"Value not found for {key_orig} within {dict_}")
[docs]
def get_steplist(bd_offset, fc_range, bdint):
"""Get the list of steps for Mars request.
Args:
bd_offset (timedelta): first boundary time
fc_range (timedelta): forecast range
bdint (timedelta): frequency of boundary files
Returns:
steps (List[int]): list of steps
"""
step_int = int(bdint.total_seconds() // 3600)
first_step = int(bd_offset.total_seconds() // 3600)
fc_range_int = int(fc_range.total_seconds() // 3600)
steps = list(range(first_step, first_step + fc_range_int + step_int, step_int))
return steps
[docs]
def get_and_remove_data(file_name: str) -> bytes:
"""Read in and subsequently remove binary data.
Args:
file_name (str): The name of the file containing the data to be read.
Returns:
bytes: The binary data read from the file.
"""
with open(file_name, "rb") as fp:
additional_data = fp.read()
os.remove(file_name)
return additional_data
[docs]
def add_additional_file_specific_data(
additional_data: Dict[str, bytes],
):
"""Add additional file specific data.
The additional_data dict is expected to have the following structure, where
the "common_data" key is optional:
{
"common_data": b"Common data to add to all files",
"file1": b"Data to add to file1",
"file2": b"Data to add to file2",
...
}
If the "common_data" key is present, the data will be added to a file before
the file-specific data is added.
Args:
additional_data (Dict[str, bytes]): Dictionary containing the data to
add (value) to a given file (key).
Raises:
FileNotFoundError: If a file does not exist when trying to append
data to it.
"""
common_data = additional_data.get("common_data", b"")
for key, data in additional_data.items():
if key == "common_data":
continue
combined_data = common_data + data
if os.path.exists(key):
if combined_data:
logger.debug("Adding additional file specific data to file: {}", key)
with open(key, "ab") as fp:
fp.write(combined_data)
else:
raise FileNotFoundError(
f"File {key} not found. Trying to append data to non-existing file."
)
[docs]
def add_additional_data_to_all(
tag: str,
steps: List[str],
members_dict: Dict[str, List[int]],
additional_data: Dict[str, bytes],
):
"""Add additional common data to all files defined by tag, step and member.
The additional_data dict is expected to have the following structure:
{
"data_key1": b"Data to add to all files",
"data_key2": b"Data to add to all files",
...
}
Args:
tag (str): Name of tag
steps (List[int]): Steps used to construct filename.
members_dict (Dict[str, List[int]]): Dictionary with members used to
construct filename.
additional_data (Dict[str, bytes]): Dictionary containing the data to add.
Raises:
FileNotFoundError: If a file does not exist when trying to append
data to it.
"""
for step in steps:
for members in members_dict.values():
for member in members:
filename = f"{tag}_{member or 0}+{step}"
if os.path.exists(filename):
with open(filename, "ab") as fp:
for key, data in additional_data.items():
logger.debug(
"Adding additional common data ({}) to file: {}",
key,
filename,
)
fp.write(data)
else:
raise FileNotFoundError(
f"File {filename} not found. "
"Trying to append data to non-existing file."
)
[docs]
def move_files(
tag: str, steps: List[int], members_dict: Dict[str, List[int]], target_dir: Path
):
"""Move files to the final location.
Args:
tag (str): Name of tag
steps (list): steps to process the files
members_dict (Dict[str, List[int]]): dict with members to procces the files
target_dir (Path): target directory to move the files to
"""
for step in steps:
for members in members_dict.values():
for member in members:
# Define target file name. Member defaults to 0 if member is None
file_name = f"{tag}_{member or 0}+{step}"
if os.path.exists(file_name):
shutil.move(file_name, target_dir / file_name)
[docs]
@dataclass(kw_only=True)
class BaseRequest:
"""Represents a structured data request with configurable parameters.
This class facilitates the creation of data requests by encapsulating
request attributes and providing methods for modifying and enhancing
the request structure. The request data is stored in a dictionary
accessible via the `request` attribute.
Parameters:
class_ (str): The class of the request, e.g., "D1".
data_type (str): The type of data requested (e.g., analysis, forecast).
expver (str): Experiment version identifier.
levtype (str): Level type (e.g., surface, pressure levels).
date (str): The date range for the request in YYYYMMDD format.
time (str): The time range for the request in HHMM format.
steps (str): Forecast steps or time intervals.
param (str): Parameter codes for the requested data.
target (str): Target file or identifier for the output.
request (dict): A dictionary representation of the request, initialized
automatically upon object creation.
"""
class_: str
data_type: str
expver: str
levtype: str
date: str
time: str
steps: str
param: str
target: str
request: dict = field(init=False)
def __post_init__(self):
self.request = {
"CLASS": self.class_,
"TYPE": self.data_type,
"EXPVER": self.expver,
"LEVTYPE": self.levtype,
"DATE": self.date,
"TIME": self.time,
"STEP": "/".join(map(str, self.steps)),
"PARAM": self.param,
"TARGET": self.target,
}
[docs]
def update_request(self, upd: dict):
"""Add or replace heys in a mars request.
Args:
upd (dict): a dict with keys and values
"""
self.request.update(upd)
[docs]
def add_eps_members(self, members: List[int], prefetch: bool):
"""Fix parameters in case of eps members.
Args:
members (list): members to retrieve
prefetch (string): type of retrieve
"""
if prefetch:
if "+[STEP]" in self.target:
self.target = self.target.replace("+[STEP]", "_[NUMBER]+[STEP]")
else:
self.target = f'"{self.target}_[NUMBER]"'
self.request.update(
{
"NUMBER": "/".join(map(str, members)),
"TARGET": self.target,
}
)
else:
self.request["TARGET"] = self.target + f"_{members[0]}"
[docs]
def add_levelist(self, levelist: str):
"""Set multilevel if needed.
Args:
levelist (string): list of levels
Returns:
None
"""
if self.levtype == "ML" and self.param == "129":
local_levelist = 1
elif self.levtype == "ML":
local_levelist = get_value_from_dict(levelist, self.date)
logger.info("levelist:{}", local_levelist)
elif self.levtype == "SOL":
local_levelist = "1/2/3/4/5"
logger.info("levelist:{}", local_levelist)
else:
return
# Add levelist to request
self.request["LEVELIST"] = local_levelist
[docs]
def add_database_options(self):
"""Add database options."""
if self.class_ == "D1":
self.request.update(
{
"DATASET": "extremes-dt",
}
)
if "latlon" not in self.target:
self.request.update(
{
"DATABASE": "fdb",
}
)
[docs]
def replace(self, **kwargs):
"""Return new instance with updated values."""
return replace(self, **kwargs)