Source code for deode.tasks.gribmodify

"""AddCalculatedFields."""

import math

from ..datetime_utils import as_datetime
from ..logs import logger
from ..toolbox import FileManager
from .base import Task

# For now (on ATOS), only tasks with prgenv/gnu can import eccodes in python
try:
    import eccodes
except (ImportError, OSError, RuntimeError):
    logger.warning("eccodes python API could not be imported. Usually OK.")


[docs] class AddCalculatedFields(Task): """Create grib files.""" def __init__(self, config): """Construct create grib object. Args: config (deode.ParsedConfig): Configuration """ Task.__init__(self, config, __class__.__name__) self.archive = self.platform.get_system_value("archive") self.basetime = as_datetime(self.config["general.times.basetime"]) self.forecast_range = self.config["general.times.forecast_range"] self.output_settings = self.config["general.output_settings"] self.file_templates = self.config["file_templates"]
[docs] def add_field_to_grib(self, fname, params, operation, short_name): """Add sum of two fields to the same grib. Args: fname (str): gribe file params (list): list of paramaterNumbers of input parameters operation: operarion to compute new field short_name (str) : short_name of added field Raises: NotImplementedError: Operation not implemented yet" """ gids = [None for i in range(len(params))] if any(params) is None: raise ValueError("Parameters are not define") with open(fname, "rb") as f_in, open(fname, "ab") as f_out: while None in gids: gid = eccodes.codes_grib_new_from_file(f_in) par_nam = eccodes.codes_get(gid, "shortName") if par_nam in params: gids[params.index(par_nam)] = gid else: eccodes.codes_release(gid) values_list = [eccodes.codes_get_values(gid) for gid in gids] if operation == "add": result_values = [sum(values) for values in zip(*values_list)] elif operation == "vectorLength": if len(params) != 2: raise ValueError("Vector must has 2 components!") result_values = [ math.sqrt(values[0] * values[0] + values[1] * values[1]) for values in zip(*values_list) ] elif operation == "vectorDirection": if len(params) != 2: raise ValueError("Vector must has 2 components!") result_values = [ (math.atan2(values[0], values[1]) * 180 / math.pi) % 360 for values in zip(*values_list) ] else: raise NotImplementedError("Operation {} not implemented yet.", operation) gid_res = eccodes.codes_clone(gids[0]) eccodes.codes_set_values(gid_res, result_values) eccodes.codes_set(gid_res, "shortNameECMF", short_name) eccodes.codes_set(gid_res, "shortName", short_name) eccodes.codes_write(gid_res, f_out) eccodes.codes_release(gid_res) for gid in gids: eccodes.codes_release(gid) f_in.close() f_out.close()
[docs] def find_par(self, short_name, fname): """Check if field with shortName already exists in grib. Args: short_name: shortName, fname: grib file Returns: bool: True if field exists """ found = False with open(fname, "rb") as f_in: while not found: gid = eccodes.codes_grib_new_from_file(f_in) if gid is None: break name = eccodes.codes_get(gid, "shortName") eccodes.codes_release(gid) found = name == short_name f_in.close() return found
[docs] def execute(self): """Execute add_total_precip.""" file_handle = FileManager.create_list( self, self.basetime, self.forecast_range, self.file_templates["fullpos"]["archive"], self.output_settings["fullpos"], ) del file_handle[self.basetime] modify_rules = self.config["gribmodify"] for fname in file_handle.values(): for name in modify_rules: logger.info(fname) if not self.find_par(modify_rules[name]["output"], fname): logger.info( "Adding field with shortName: {}", modify_rules[name]["output"] ) if self.find_par( modify_rules[name]["input"][0], fname ) and self.find_par(modify_rules[name]["input"][1], fname): self.add_field_to_grib( fname, modify_rules[name]["input"], modify_rules[name]["operator"], modify_rules[name]["output"], ) else: raise ValueError( "There are no input parameters {} for output parameter {}", modify_rules[name]["input"], modify_rules[name]["output"], )