Source code for deode.tasks.marsprep

"""Marsprep."""

import ast
import contextlib
import os
import shutil
from functools import cached_property
from pathlib import Path
from typing import Dict, List, Optional

from ..config_parser import ParsedConfig
from ..datetime_utils import as_datetime, as_timedelta, cycle_offset
from ..logs import logger
from ..mars_utils import (
    BaseRequest,
    add_additional_data_to_all,
    add_additional_file_specific_data,
    check_data_available,
    get_and_remove_data,
    get_domain_data,
    get_mars_keys,
    get_steplist,
    get_steps_and_members_to_retrieve,
    get_value_from_dict,
    move_files,
    write_mars_req,
)
from ..os_utils import deodemakedirs, list_files_join
from ..tasks.batch import BatchJob
from .base import Task


[docs] class Marsprep(Task): """Marsprep task.""" def __init__(self, config): """Construct forecast object. Args: config (deode.ParsedConfig): Configuration Raises: ValueError: No data for this date. """ Task.__init__(self, config, __class__.__name__) # Default to [None] to cover the deterministic case with no # boundary member nesting. self.bdmembers = ( [ int(self.platform.substitute(bdmember)) for bdmember in self.config["boundaries.ifs.bdmembers"] ] if self.config["boundaries.ifs.bdmembers"] else [None] ) # path to sfcdata on disk self.sfcdir = Path(self.platform.get_platform_value("global_sfcdir")) logger.info(f"sfc dir: {self.sfcdir}") # Get the times from config.toml self.basetime = as_datetime(self.config["general.times.basetime"]) forecast_range = as_timedelta(self.config["general.times.forecast_range"]) # Check if there are data for specific date in mars check_data_available(self.basetime, self.mars) # Get boundary informations bdint = as_timedelta(self.config["boundaries.bdint"]) bdcycle = as_timedelta(self.mars["ifs_cycle_length"]) bdcycle_start = as_timedelta(self.mars["ifs_cycle_start"]) bdshift = as_timedelta(self.config["boundaries.bdshift"]) if bdshift.total_seconds() % bdcycle.total_seconds() != 0: raise ValueError("bdshift needs to be a multiple of bdcycle!") # Get date/time/steps info bd_offset = cycle_offset( self.basetime, bdcycle, bdcycle_start=bdcycle_start, bdshift=bdshift, ) self.bd_basetime = self.basetime - bd_offset self.steps = get_steplist(bd_offset, forecast_range, bdint) logger.info("bd_basetime: {}", self.bd_basetime) self.init_date_str = self.bd_basetime.strftime("%Y%m%d") self.init_hour_str = self.bd_basetime.strftime("%H") exist_snow = False with contextlib.suppress(KeyError): self.param_snow = get_value_from_dict( self.mars["GG_snow"], self.init_date_str ) exist_snow = True start_snow_date = as_datetime(self.mars["start_date"]) with contextlib.suppress(KeyError): start_snow_date = as_datetime(self.mars["start_snow_date"]) self.exist_snow = exist_snow and self.bd_basetime >= start_snow_date # Split mars by bdint self.split_mars = self.config["suite_control.split_mars"] if self.split_mars: self.prep_step = ast.literal_eval(self.config["task.args.prep_step"]) self.prepdir = Path( self.platform.substitute( self.config["system.marsdir"], basetime=self.bd_basetime, validtime=self.basetime, ) ) self.mars_version = int( self.config.get("submission.task_exceptions.Marsprep.mars_version", "6") ) logger.info("MARS data expected in:{}", self.prepdir) # Use fixed static orography files (spherical harmonics and/or gaussian grid) # TODO: check existence of static orography files & read settings if "use_static_sh" in self.mars: self.use_static_sh_oro = bool(self.mars["use_static_sh"]) else: self.use_static_sh_oro = False if self.use_static_sh_oro: self.sh_oro_source = self.mars["sh_oro_source"] logger.info("Using static SH orography file {}", self.sh_oro_source) if "use_static_gg" in self.mars: self.use_static_gg_oro = bool(self.mars["use_static_gg"]) else: self.use_static_gg_oro = False if self.use_static_gg_oro: self.gg_oro_source = self.mars["gg_oro_source"] logger.info("Using static GG orography file {}", self.gg_oro_source)
[docs] @staticmethod def mars_selection(selection: str, config: ParsedConfig) -> dict: """Copy default settings if requested. Args: selection (str): The selection to use. config (deode.ParsedConfig): Configuration object Returns: mars (dict): mars config section """ mars = config[f"mars.{selection}"].dict() if "expver" not in mars: mars["expver"] = selection # Copy default settings if requested if "default" in mars: default = config[f"mars.{mars['default']}"] for k in default: if k not in mars: mars[k] = default[k] return mars
@cached_property def mars(self) -> dict: """Get mars selection.""" selection = self.platform.substitute(self.config["boundaries.ifs.selection"]) return self.mars_selection(selection, self.config)
[docs] def update_data_request( self, request: BaseRequest, prefetch: bool, specify_domain: bool, bdmembers: Optional[List[int]] = None, grid: Optional[str] = None, source: Optional[str] = None, ): """Create ECMWF MARS system request. Args: request: BaseRequest object to update prefetch: Retrieve or stage specify_domain: Use lat/lon and rotation or use global (default) bdmembers: Boundary members to retrieve in case of eps. grid: Specific grid for some request. Default None. source: Sorce for retrieve data from disk. Defaults None. """ if grid is not None and self.mars_version == 6: request.update_request({"GRID": grid}) # Additional request parameters if EPS # Only if all members are True like. This makes it possible to distinguish # between a deterministic run with no boundary member nesting (members = [None]) # and an ensemble run with boundary member nesting (e.g. members = [0] # for a one member ensemble, members = [0, 1, 2] for a three member ensemble etc.) if bdmembers and all(bdmembers): request.add_eps_members(bdmembers, prefetch=prefetch) if self.mars_version == 6: request.update_request({"PROCESS": "LOCAL"}) request.add_levelist(self.mars["levelist"]) # Set stream stream = get_value_from_dict(self.mars["stream"], request.time) request.update_request({"STREAM": stream}) # Retrieve from already fetched data if not prefetch: request.update_request({"SOURCE": source}) # NOTE: the following could be moved to get_geopotential_latlon() if "mars_latlonZ" in request.target and ( self.use_static_gg_oro or self.use_static_sh_oro ): z_keys = get_mars_keys(source) request.update_request(z_keys) request.add_database_options() if specify_domain: request.update_request( { "GRID": get_value_from_dict(self.mars["grid"], request.date), "AREA": get_domain_data(self.config), } )
[docs] def execute(self): """Run task. Define run sequence. Raises: RuntimeError: If there is an issue with the work folder. """ try: if not os.path.exists(self.prepdir): deodemakedirs( self.prepdir, unixgroup=self.platform.get_platform_value("unix_group") ) except OSError as e: raise RuntimeError(f"Error while preparing the mars folder: {e}") from e if self.split_mars and self.prep_step: logger.debug("*** Need only latlon data") else: if self.split_mars: self.steps = [self.steps[int(self.config["task.args.bd_index"])]] logger.info("Need steps:{}", self.steps) self.get_grid_point_surface_data() self.get_spectral_harmonic_data() self.get_grid_point_upper_air_data() self.get_sfx_data()
[docs] def get_grid_point_surface_data(self): """Get grid point surface data.""" tag = "ICMGG" steps, members_dict = get_steps_and_members_to_retrieve( self.steps, self.prepdir, tag, self.bdmembers ) if steps: self.get_gg_data(tag, steps, members_dict) exist_soil = False with contextlib.suppress(KeyError): gg_soil_param = get_value_from_dict( self.mars["GG_soil"], self.init_date_str ) gg1_param = get_value_from_dict(self.mars["GG1"], self.init_date_str) exist_soil = True additional_data = {} if exist_soil: additional_data["common_data"] = self.get_gg_soil_data( tag, params={"GG_soil": gg_soil_param, "GG1": gg1_param} ) else: self.fmanager.input(f"{self.sfcdir}/sfcfile", "sfcdata") additional_data["common_data"] = get_and_remove_data("sfcdata") if self.exist_snow: additional_data |= self.get_gg_snow_data( tag, steps, members_dict, self.param_snow ) add_additional_file_specific_data(additional_data=additional_data) move_files(tag, steps, members_dict, self.prepdir) else: add_additional_data_to_all(tag, steps, members_dict, additional_data) move_files(tag, steps, members_dict, self.prepdir)
[docs] def get_spectral_harmonic_data(self): """Get spectral harmonic data.""" tag = "ICMSH" steps, members_dict = get_steps_and_members_to_retrieve( self.steps, self.prepdir, tag, self.bdmembers ) if steps: self.get_sh_data( tag, steps, members_dict, ) additional_data = {"z": self.get_shz_data(tag)} add_additional_data_to_all(tag, steps, members_dict, additional_data) move_files(tag, steps, members_dict, self.prepdir)
[docs] def get_grid_point_upper_air_data(self): """Get gridpoint upper air data.""" tag = "ICMUA" steps, members_dict = get_steps_and_members_to_retrieve( self.steps, self.prepdir, tag, self.bdmembers ) if steps: self.get_ua_data(tag, steps, members_dict) logger.info( "steps {}, members_dict {}", steps, members_dict, ) move_files(tag, steps, members_dict, self.prepdir)
[docs] def get_sfx_data(self): """Get SFX data.""" # Split the lat/lon part and perform it here mars_file_check_list = {} for member in self.bdmembers: bddir_sfx = Path( self.platform.substitute( self.config["system.bddir_sfx"], basetime=self.bd_basetime, validtime=self.basetime, ) ) bdfile_sfx = self.config["system.bdfile_sfx_template"].replace( "@BDMEMBER@", str(member or 0) ) prep_filename = self.platform.substitute( bdfile_sfx, basetime=self.bd_basetime, validtime=self.basetime, ) mars_file_check_list[member] = bddir_sfx / prep_filename if all( os.path.exists(mars_file_check) for mars_file_check in mars_file_check_list.values() ): logger.info("Prep files already exists as {}", mars_file_check_list) elif self.split_mars and not self.prep_step: logger.debug("No need Prep file") else: self.get_lat_lon_data() self.get_geopotential_latlon() # Get the file list to join for member in self.bdmembers: prep_pattern = f"mars_latlon*_{member or 0}" logger.info("prep_pattern: {}", prep_pattern) filenames = list_files_join(self.wdir, prep_pattern) logger.info(filenames) prep_filename = mars_file_check_list[member] _prep_filename = os.path.basename(prep_filename) with open(_prep_filename, "wb") as output_file: for filename in filenames: with open(filename, "rb") as input_file: output_file.write(input_file.read()) shutil.move(_prep_filename, prep_filename) logger.info("Created {}", prep_filename)
[docs] def get_lat_lon_data(self): """Get Lat/Lon data.""" prefetch = False param = ( get_value_from_dict(self.mars["GG"], self.init_date_str) + "/" + get_value_from_dict(self.mars["GG_sea"], self.init_date_str) ) for member in self.bdmembers: data_type = self.mars["type_AN"] if member == 0 else self.mars["type_FC"] self._build_and_run_request( req_file_name="latlonGG.req", data_type=data_type, levtype="SFC", param=param, steps=[self.steps[0]], members=[member], target=f"mars_latlonGG_{member or 0}", prefetch=prefetch, specify_domain=True, # Default to ICMGG_0+* if member is None source=f'"{self.prepdir}/ICMGG_{member or 0}+{self.steps[0]}"', write_method="retrieve" if self.mars_version == 6 else "read", )
[docs] def get_geopotential_latlon(self): """Get geopotential in lat/lon.""" prefetch = False # Retrieve for Surface Geopotential in lat/lon param = get_value_from_dict(self.mars["SHZ"], self.init_date_str) data_type = get_value_from_dict(self.mars["GGZ_type"], self.init_date_str) lev_type = get_value_from_dict(self.mars["Zlev_type"], self.init_date_str) first_member = self.bdmembers[0] if self.use_static_gg_oro: self.fmanager.input(self.gg_oro_source, "gg_oro.Z") z_source = "gg_oro.Z" else: # Default to ICMSH_0+* if member is None z_source = f'"{self.prepdir}/ICMSH_{first_member or 0}+{self.steps[0]}"' # NOTE: for z, the step is always 0, while steps[0] may be shifted! self._build_and_run_request( req_file_name="latlonz.req", data_type=data_type, levtype=lev_type, param=param, steps=[0], members=[first_member], target=f"mars_latlonZ_{first_member or 0}", prefetch=prefetch, specify_domain=True, source=z_source, write_method="retrieve" if self.mars_version == 6 else "read", ) for member in self.bdmembers[1:]: self.fmanager.input( f"mars_latlonZ_{first_member or 0}", f"mars_latlonZ_{member}" )
[docs] def get_sh_data(self, tag: str, steps: List[int], members_dict: Dict[str, List[int]]): """Get SH data.""" logger.info("Retrieving {} data for steps: {}", tag, steps) param = get_value_from_dict(self.mars["SH"], self.init_date_str) for member_type, members in members_dict.items(): data_type = ( self.mars["type_AN"] if member_type == "control_member" else self.mars["type_FC"] ) self._build_and_run_request( req_file_name=f"{member_type}_{tag}.req", data_type=data_type, levtype="ML", param=param, steps=steps, members=members, target=( f'"{tag}_0+[STEP]"' if member_type == "control_member" or not all(members) # Default to target="*_0+*" if member is None else f'"{tag}+[STEP]"' ), grid=self.mars["grid_ML"], )
[docs] def get_shz_data(self, tag: str): """Get geopotential in spherical harmonics.""" if self.use_static_sh_oro: self.fmanager.input(self.sh_oro_source, f"{tag}.Z") else: param = get_value_from_dict(self.mars["SHZ"], self.init_date_str) self._build_and_run_request( req_file_name=f"{tag}Z.req", data_type=self.mars["SHZ_type"], levtype="ML", param=param, steps=[0], target=f'"{tag}.Z"', grid=self.mars["grid_ML"], ) return get_and_remove_data(f"{tag}.Z")
[docs] def get_ua_data(self, tag: str, steps: List[int], members_dict: Dict[str, List[int]]): """Get upper air data.""" logger.info("Retrieving {} data for steps: {}", tag, steps) param = get_value_from_dict(self.mars["UA"], self.init_date_str) for member_type, members in members_dict.items(): data_type = ( self.mars["type_AN"] if member_type == "control_member" else self.mars["type_FC"] ) self._build_and_run_request( req_file_name=f"{member_type}_{tag}.req", data_type=data_type, levtype="ML", param=param, steps=steps, members=members, target=( f'"{tag}_0+[STEP]"' if member_type == "control_member" or not all(members) # Default to target="*_0+*" if member is None else f'"{tag}+[STEP]"' ), grid=self.mars["grid_ML"], )
[docs] def get_gg_data(self, tag: str, steps: List[int], members_dict: Dict[str, List[int]]): """Get gridpoint surface data.""" logger.info("Retrieving {} data for steps: {}", tag, steps) param = ( get_value_from_dict(self.mars["GG"], self.init_date_str) + "/" + get_value_from_dict(self.mars["GG_sea"], self.init_date_str) ) for member_type, members in members_dict.items(): data_type = ( self.mars["type_AN"] if member_type == "control_member" else self.mars["type_FC"] ) self._build_and_run_request( req_file_name=f"{member_type}_{tag}.req", data_type=data_type, levtype="SFC", param=param, steps=steps, members=members, target=( f'"{tag}_0+[STEP]"' if member_type == "control_member" or not all(members) # Default to target="*_0+*" if member is None else f'"{tag}+[STEP]"' ), )
[docs] def get_gg_soil_data(self, tag: str, params: dict): """Get soil gridpoint data.""" for param_name, param in params.items(): target = f"{tag}.{param_name}" self._build_and_run_request( req_file_name=f"{target}.req", data_type=self.mars["type_AN"], levtype="SFC", param=param, steps=[0], target=target, ) # Collect and return the data from target files # (Read the single-step MARS files first, hence the reversed order) data = b"" for param_name in reversed(params.keys()): target = f"{tag}.{param_name}" data += get_and_remove_data(target) return data
[docs] def get_gg_snow_data( self, tag: str, steps: List[int], members_dict: Dict[str, List[int]], param ): """Get soil gridpoint data.""" additional_data: Dict[str, bytes] = {} for member_type, members in members_dict.items(): data_type = ( self.mars["type_AN"] if member_type == "control_member" else self.mars["type_FC"] ) self._build_and_run_request( req_file_name=f"{member_type}_{tag}.snow.req", data_type=data_type, levtype="SOL", param=param, steps=steps, members=members, target=( f'"{tag}.snow_0+[STEP]"' if member_type == "control_member" or not all(members) # Default to target="*_0+*" if member is None else f'"{tag}.snow+[STEP]"' ), ) for step in steps: for member in members: # Default to "*_0+{step}" if member is None key = f"{tag}_{member or 0}+{step}" file_snow = f"{tag}.snow_{member or 0}+{step}" additional_data[key] = get_and_remove_data(file_snow) return additional_data
def _build_and_run_request( self, req_file_name: str, data_type: str, levtype: str, param: str, steps: List[int], target: str, members: Optional[List[int]] = None, source: Optional[str] = None, prefetch: bool = True, specify_domain: bool = False, write_method: str = "retrieve", grid: Optional[str] = None, ) -> None: """Build and run request to retrieve data from mars. Args: req_file_name: Name of the request file. data_type: Data type. levtype: Level type. param: Parameter(s) to retrieve. steps: Steps to retrieve. target: Name of the target file. members: Members to retrieve data from. source: Source file to retrieve data from. prefetch: Prefetch. specify_domain: Whether to specify domain or not. write_method: The write method to use when writing the request to file. grid: The grid to use. """ request = BaseRequest( class_=self.mars["class"], data_type=data_type, expver=self.mars["expver"], levtype=levtype, date=self.init_date_str, time=self.init_hour_str, steps=steps, param=param, target=target, ) self.update_data_request( request, prefetch=prefetch, specify_domain=specify_domain, bdmembers=members, source=source, grid=grid, ) write_mars_req(request, req_file_name, write_method) BatchJob(os.environ, wrapper=self.wrapper).run( f"{self.get_binary('mars')} {req_file_name}" )