Source code for deode.mars_utils

"""Utility for marsprep."""

import contextlib
import os
import shutil
from dataclasses import dataclass, field, replace
from pathlib import Path
from subprocess import run
from typing import Dict, List, Tuple

from .datetime_utils import as_datetime
from .geo_utils import Projection, Projstring
from .logs import logger


[docs] def write_mars_req(req, name, method): """Write a request for MARS. Args: req (BaseRequest): namelist object to write name (string): request file name method (string): selected method, retrieve or read """ sep0 = "{:<2}".format("") sep1 = " = " sep2 = "," keys = list(req.request.keys()) with open(name, "w") as f: f.write(str(method.upper()) + ",\n") for key in keys: row_str = sep0 + key + sep1 + str(req.request[key]) if key != keys[-1]: row_str += sep2 f.write(row_str + "\n")
[docs] def get_mars_keys(source, key_filter="-w shortName:s=z"): """Get the basic MARS settings from a static file, to correct a request.""" if source is None or not os.path.exists(source.strip("\"'")): logger.error("Missing file: {}", source) logger.info("Reading MARS config settings from {}", source) grib_command = f"grib_get {key_filter} " param_list = { "CLASS": "marsClass", "STREAM": "marsStream", "TYPE": "marsType", "EXPVER": "experimentVersionNumber", "DATE": "dataDate", "TIME": "dataTime", "STEP": "stepRange", "LEVTYPE": "levelType", "LEVEL": "level", } result = {} for prm in param_list: # TODO: could be faster with only 1 call for all keys # All output is then space-separated, so just use .split() result[prm] = ( run( grib_command + f"-p {param_list[prm]}:s {source}", check=True, shell=True, # noqa capture_output=True, ) .stdout.decode() .strip("\n") ) logger.info("Mars config - {} = {}", prm, result[prm]) return result
[docs] def get_steps_and_members_to_retrieve( steps: List[int], path: Path, file_name: str, members: List[int] ) -> Tuple[List[int], Dict[str, List[int | None]]]: """Check which mars file already exist and returns steps and members which missing. Args: steps (List[int]): list of steps path (pathlib.Path): path to global files file_name (string): name of file to check members (List[int]): members to retrieve Returns: steps (List[int]): steps to retrieve members_dict (Dict[str, List[int | None]]): members to retrieve """ step_list = [] member_list = [] for step in steps: for member in members: # Default to member 0 if member is None without adding member to # the member_list. This covers the deterministic case with no # boundary member nesting. mars_file_check = path / f"{file_name}_{member or 0}+{step}" if not os.path.exists(mars_file_check): step_list.append(step) logger.info("Missing file:{}", mars_file_check) member_list.append(member) steps = sorted(set(step_list)) # Get perturbed members only perturbed_members = [member for member in sorted(set(member_list)) if member != 0] # Construct dictionary with perturbed members and control depending on the # provided bdmembers list members_dict = {} if 0 in members: members_dict["control_member"] = [0] # If no perturbed members requested, return if not perturbed_members: return steps, members_dict # Add perturbed members for the cases where # - control and perturbed members are requested # - only perturbed members are requested # - no perturbed members are requested (perturbed_members = [None]) # The latter case covers the "deterministic" case, where the ensemble only # contains one member. members_dict["perturbed_members"] = perturbed_members return steps, members_dict
[docs] def check_data_available(basetime, mars): """Check if there is data for basetime for choosen expver. Args: basetime (str): basetime mars (dict): mars config section Raises: ValueError: No data for this date. """ start_date = as_datetime(mars["start_date"]) with contextlib.suppress(KeyError): end_date = as_datetime(mars["end_date"]) if basetime > end_date: raise ValueError( f"No data for {basetime}! " f"The data is available between {start_date} and {end_date}." ) if basetime < start_date: raise ValueError( f"No data for {basetime}! The data is available after {start_date}" )
[docs] def get_domain_data(config): """Read and return domain data. Args: config (deode.ParsedConfig): Configuration from which we get the domain data Returns: String containing the domain info for MARS """ # Get domain specs domain_spec = { "nlon": config["domain.nimax"], "nlat": config["domain.njmax"], "latc": config["domain.xlatcen"], "lonc": config["domain.xloncen"], "lat0": config["domain.xlat0"], "lon0": config["domain.xlon0"], "gsize": config["domain.xdx"], } # Get domain properties projstring = Projstring() projection = Projection( projstring.get_projstring(lon0=domain_spec["lon0"], lat0=domain_spec["lat0"]) ) domain_properties = projection.get_domain_properties(domain_spec) fdomainstr = "/".join( [ str(domain_properties["maxlat"]), str(domain_properties["minlon"]), str(domain_properties["minlat"]), str(domain_properties["maxlon"]), ] ) return fdomainstr
[docs] def get_value_from_dict(dict_, key_orig): """Check value according to key. - If a string returns the value itself - If key is a date search for the most suitable match in value - Else return the value matching the key. Args: dict_ (str, BaseConfig object): Values to select key_orig (str): key for value checking Returns: value (str): Found value Raises: ValueError: Exception """ if isinstance(dict_, str): return dict_ try: ref_date = as_datetime(key_orig) for key, val in sorted(dict_.items(), reverse=True): if ref_date >= as_datetime(key): return val except ValueError: key = str(key_orig) if key in dict_: return dict_[key] raise ValueError(f"Value not found for {key_orig} within {dict_}")
[docs] def get_steplist(bd_offset, fc_range, bdint): """Get the list of steps for Mars request. Args: bd_offset (timedelta): first boundary time fc_range (timedelta): forecast range bdint (timedelta): frequency of boundary files Returns: steps (List[int]): list of steps """ step_int = int(bdint.total_seconds() // 3600) first_step = int(bd_offset.total_seconds() // 3600) fc_range_int = int(fc_range.total_seconds() // 3600) steps = list(range(first_step, first_step + fc_range_int + step_int, step_int)) return steps
[docs] def get_and_remove_data(file_name: str) -> bytes: """Read in and subsequently remove binary data. Args: file_name (str): The name of the file containing the data to be read. Returns: bytes: The binary data read from the file. """ with open(file_name, "rb") as fp: additional_data = fp.read() os.remove(file_name) return additional_data
[docs] def add_additional_file_specific_data( additional_data: Dict[str, bytes], ): """Add additional file specific data. The additional_data dict is expected to have the following structure, where the "common_data" key is optional: { "common_data": b"Common data to add to all files", "file1": b"Data to add to file1", "file2": b"Data to add to file2", ... } If the "common_data" key is present, the data will be added to a file before the file-specific data is added. Args: additional_data (Dict[str, bytes]): Dictionary containing the data to add (value) to a given file (key). Raises: FileNotFoundError: If a file does not exist when trying to append data to it. """ common_data = additional_data.get("common_data", b"") for key, data in additional_data.items(): if key == "common_data": continue combined_data = common_data + data if os.path.exists(key): if combined_data: logger.debug("Adding additional file specific data to file: {}", key) with open(key, "ab") as fp: fp.write(combined_data) else: raise FileNotFoundError( f"File {key} not found. Trying to append data to non-existing file." )
[docs] def add_additional_data_to_all( tag: str, steps: List[str], members_dict: Dict[str, List[int]], additional_data: Dict[str, bytes], ): """Add additional common data to all files defined by tag, step and member. The additional_data dict is expected to have the following structure: { "data_key1": b"Data to add to all files", "data_key2": b"Data to add to all files", ... } Args: tag (str): Name of tag steps (List[int]): Steps used to construct filename. members_dict (Dict[str, List[int]]): Dictionary with members used to construct filename. additional_data (Dict[str, bytes]): Dictionary containing the data to add. Raises: FileNotFoundError: If a file does not exist when trying to append data to it. """ for step in steps: for members in members_dict.values(): for member in members: filename = f"{tag}_{member or 0}+{step}" if os.path.exists(filename): with open(filename, "ab") as fp: for key, data in additional_data.items(): logger.debug( "Adding additional common data ({}) to file: {}", key, filename, ) fp.write(data) else: raise FileNotFoundError( f"File {filename} not found. " "Trying to append data to non-existing file." )
[docs] def move_files( tag: str, steps: List[int], members_dict: Dict[str, List[int]], target_dir: Path ): """Move files to the final location. Args: tag (str): Name of tag steps (list): steps to process the files members_dict (Dict[str, List[int]]): dict with members to procces the files target_dir (Path): target directory to move the files to """ for step in steps: for members in members_dict.values(): for member in members: # Define target file name. Member defaults to 0 if member is None file_name = f"{tag}_{member or 0}+{step}" if os.path.exists(file_name): shutil.move(file_name, target_dir / file_name)
[docs] @dataclass(kw_only=True) class BaseRequest: """Represents a structured data request with configurable parameters. This class facilitates the creation of data requests by encapsulating request attributes and providing methods for modifying and enhancing the request structure. The request data is stored in a dictionary accessible via the `request` attribute. Parameters: class_ (str): The class of the request, e.g., "D1". data_type (str): The type of data requested (e.g., analysis, forecast). expver (str): Experiment version identifier. levtype (str): Level type (e.g., surface, pressure levels). date (str): The date range for the request in YYYYMMDD format. time (str): The time range for the request in HHMM format. steps (str): Forecast steps or time intervals. param (str): Parameter codes for the requested data. target (str): Target file or identifier for the output. request (dict): A dictionary representation of the request, initialized automatically upon object creation. """ class_: str data_type: str expver: str levtype: str date: str time: str steps: str param: str target: str request: dict = field(init=False) def __post_init__(self): self.request = { "CLASS": self.class_, "TYPE": self.data_type, "EXPVER": self.expver, "LEVTYPE": self.levtype, "DATE": self.date, "TIME": self.time, "STEP": "/".join(map(str, self.steps)), "PARAM": self.param, "TARGET": self.target, }
[docs] def update_request(self, upd: dict): """Add or replace heys in a mars request. Args: upd (dict): a dict with keys and values """ self.request.update(upd)
[docs] def add_eps_members(self, members: List[int], prefetch: bool): """Fix parameters in case of eps members. Args: members (list): members to retrieve prefetch (string): type of retrieve """ if prefetch: if "+[STEP]" in self.target: self.target = self.target.replace("+[STEP]", "_[NUMBER]+[STEP]") else: self.target = f'"{self.target}_[NUMBER]"' self.request.update( { "NUMBER": "/".join(map(str, members)), "TARGET": self.target, } ) else: self.request["TARGET"] = self.target + f"_{members[0]}"
[docs] def add_levelist(self, levelist: str): """Set multilevel if needed. Args: levelist (string): list of levels Returns: None """ if self.levtype == "ML" and self.param == "129": local_levelist = 1 elif self.levtype == "ML": local_levelist = get_value_from_dict(levelist, self.date) logger.info("levelist:{}", local_levelist) elif self.levtype == "SOL": local_levelist = "1/2/3/4/5" logger.info("levelist:{}", local_levelist) else: return # Add levelist to request self.request["LEVELIST"] = local_levelist
[docs] def add_database_options(self): """Add database options.""" if self.class_ == "D1": self.request.update( { "DATASET": "extremes-dt", } ) if "latlon" not in self.target: self.request.update( { "DATABASE": "fdb", } )
[docs] def replace(self, **kwargs): """Return new instance with updated values.""" return replace(self, **kwargs)