Source code for deode.toolbox
"""Toolbox handling e.g. input/output."""
import ast
import contextlib
import inspect
import os
import re
import sys
from typing import Any, Union
import geohash_hilbert
from troika.connections.ssh import SSHConnection
from .datetime_utils import as_datetime, get_decade, oi2dt_list
from .logs import logger
from .os_utils import deodemakedirs
[docs]
class Provider:
"""Base provider class."""
def __init__(self, config, identifier, fetch=True):
"""Construct the object.
Args:
config (deode.ParsedConfig): Configuration
identifier (str): Identifier string
fetch (bool, optional): Fetch data. Defaults to False.
"""
self.config = config
self.identifier = identifier
self.fetch = fetch
self.unix_group = self.config.get("platform.unix_group")
logger.debug(
"Constructed Base Provider object. {} {} ", self.identifier, self.fetch
)
[docs]
def create_missing_dir(self, target):
"""Create a directory if missing.
Args:
target (str): Name of target
"""
target_dir = os.path.dirname(target)
if not os.path.isdir(target_dir) and len(target_dir) > 0:
deodemakedirs(target_dir, unixgroup=self.unix_group)
[docs]
def create_resource(self, resource):
"""Create the resource.
Args:
resource (Resource): The resource to be created
Raises:
NotImplementedError: Should be implemented
"""
raise NotImplementedError
[docs]
class Platform:
"""Platform."""
def __init__(self, config):
"""Construct object.
Args:
config (deode.ParsedConfig): Config.
"""
self.config = config
self.fill_macros()
[docs]
def get_system_value(self, role):
"""Get the system value.
Args:
role (str): Type of variable to substitute
Returns:
str: Value from system.[role]
"""
role = role.lower()
try:
val = self.config[f"system.{role}"]
return self.substitute(val)
except KeyError:
return None
[docs]
def get_value(self, setting, alt=None):
"""Get the config value with substition.
Args:
setting (str): Type of variable to substitute
alt (str): Alternative return value
Returns:
str: Value from config with substituted variables
"""
try:
val = self.config[setting]
return self.substitute(val)
except KeyError:
return alt
[docs]
def get_platform_value(self, role, alt=None):
"""Get the path.
Args:
role (str): Type of variable to substitute
alt (str): Alternative return value
Returns:
str: Value from platform.[role]
"""
role = role.lower()
try:
val = self.config[f"platform.{role}"]
return self.substitute(val)
except KeyError:
return alt
[docs]
def get_platform(self):
"""Get the platform.
Returns:
dict: Platform specifc values.
"""
return self.config["general.platform"]
[docs]
def fill_each_macro(self, macro_config):
"""Fill each of the macros."""
group_macros = f"{macro_config}.group_macros"
for source in self.config.get(group_macros, []):
for macro, val in self.config[source].dict().items():
self.store_macro(macro.upper(), val)
os_macros = f"{macro_config}.os_macros"
for macro in list(self.config.get(os_macros, [])):
with contextlib.suppress(KeyError):
self.store_macro(macro, os.environ[macro])
gen_macros = f"{macro_config}.gen_macros"
for key, val in self.expand_macros(self.config.get(gen_macros, {})).items():
self.store_macro(key, val)
[docs]
def fill_macros(self):
"""Fill the macros."""
self.macros = {}
self.fill_each_macro("macros")
self.fill_each_macro("macros.user_macros")
[docs]
def get_system_macros(self):
"""Get the macros.
Returns:
dict: Macros to define.
"""
return list(self.config["system"].dict().keys())
[docs]
def get_os_macros(self):
"""Get the environment macros.
Returns:
dict: Environment macros to be used.
"""
return self.config["macros.os_macros"]
[docs]
def get_gen_macros(self):
"""Get the environment macros.
Returns:
dict: Environment macros to be used.
"""
return self.config["macros.gen_macros"]
[docs]
def store_macro(self, key, val):
"""Check and store a macro.
Args:
key (str) : Macro key
val (str) : Macro value
Raises:
RuntimeError: If macro already exists
"""
if key in self.macros:
logger.error("Duplicated macro: {} {}", key, val)
logger.error("Existing macro value: {}", self.macros[key])
raise RuntimeError(f"Duplicated macro: {key}")
self.macros[key] = val
[docs]
def expand_macros(self, macros):
"""Check and expand macros.
Args:
macros (dict): Input macros
Returns:
out_macros (dict): Stored macros
"""
out_macros = {}
for macro in macros:
if isinstance(macro, dict):
key = next(iter(macro))
val = self.config.get(macro[key].lower(), None)
key = key.upper()
else:
val = self.config.get(macro.lower(), None)
key = macro.split(".")[-1].upper()
if val is None:
logger.warning("Macro {} is not defined", macro)
else:
out_macros[key] = val
return out_macros
[docs]
def resolve_macros(self, config_dict, keyval=None):
"""Resolve all macros in a nested dict (both keys and values!)."""
if isinstance(config_dict, list):
result = [self.substitute(x, keyval=keyval) for x in config_dict]
elif isinstance(config_dict, dict):
result = {
self.substitute(x): self.resolve_macros(y, keyval=x)
for x, y in config_dict.items()
}
else:
result = self.substitute(config_dict, keyval=keyval)
return result
[docs]
def get_provider(self, provider_id, target, fetch=True):
"""Get the needed provider.
Args:
provider_id (str): The intent of the provider.
target (Resource): The target.
fetch (boolean): Fetch the file or store it. Default to True.
Returns:
Provider: Provider
Raises:
NotImplementedError: If provider not defined.
"""
# TODO handle platform differently archive etc
if provider_id == "symlink":
return LocalFileSystemSymlink(self.config, target, fetch=fetch)
if provider_id == "copy":
return LocalFileSystemCopy(self.config, target, fetch=fetch)
if provider_id == "move":
return LocalFileSystemMove(self.config, target, fetch=fetch)
if provider_id == "ecfs":
return ECFS(self.config, target, fetch=fetch)
if provider_id == "fdb":
return FDB(self.config, target, fetch=fetch)
if provider_id == "scp":
return SCP(self.config, target, fetch=fetch)
raise NotImplementedError(f"Provider for {provider_id} not implemented")
[docs]
def sub_value(self, pattern, key, value, micro="@", ci=True):
"""Substitute the value case-insensitively.
Args:
pattern (str): Input string
key (str): Key to replace
value (str): Value to replace
micro (str, optional): Micro character. Defaults to "@".
ci (bool, optional): Case insensitive. Defaults to True.
Returns:
str: Replaces string
"""
# create the list.
logger.debug("Pattern: {}", pattern)
logger.debug("key={} value={}", key, value)
if not isinstance(value, str):
value = str(value)
logger.debug(
"Input value {} for key={} is not a string, but {}!",
value,
key,
type(value),
)
if ci:
compiled = re.compile(re.escape(f"{micro}{key}{micro}"), re.IGNORECASE)
else:
compiled = re.compile(re.escape(f"{micro}{key}{micro}"))
res = compiled.sub(value, pattern)
logger.debug("Substituted string: {}", res)
return res
[docs]
def sub_str_dict(self, input_dict, basetime=None, validtime=None):
"""Substitute strings in dictionary.
Args:
input_dict (dict): Dict to be parsed
basetime (datetime.datetime, optional): Base time. Defaults to None.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
Returns:
d (dict): Updated dict
"""
d = input_dict.copy()
for k, v in input_dict.items():
if isinstance(v, dict):
d[k] = self.sub_str_dict(v, basetime, validtime)
elif isinstance(v, str):
d[k] = self.substitute(v, basetime, validtime)
else:
d[k] = v
return d
[docs]
def substitute_datetime(self, pattern, datetime, suffix=""):
"""Substitute datetime related properties.
Args:
pattern (str): _description_
datetime(DateTime object): datetime to treat
suffix (str): Add on to key
Returns:
str: Substituted string.
"""
datetime_substitution_map = {
"YMD": "%Y%m%d",
"BASETIME": "%Y-%m-%dT%H:%M:%SZ",
"YYYY": "%Y",
"YY": "%y",
"MM": "%m",
"DD": "%d",
"HH": "%H",
"mm": "%M",
"ss": "%S",
}
for key, val in datetime_substitution_map.items():
ci = key not in ["MM", "mm", "ss"]
_key = key + suffix
pattern = self.sub_value(pattern, _key, datetime.strftime(val), ci=ci)
return pattern
[docs]
def substitute(self, pattern, basetime=None, validtime=None, keyval=None):
"""Substitute pattern.
Args:
pattern (str): _description_
basetime (datetime.datetime, optional): Base time. Defaults to None.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
keyval (str): Key associated with pattern
Returns:
str: Substituted string.
Raises:
RuntimeError: In case of erroneous macro
"""
if not isinstance(pattern, str):
return pattern
if pattern.count("@") % 2 != 0:
if keyval is None:
message = f"Erroneous macro somewhere in pattern={pattern}"
else:
message = f"Erroneous macro for config key={keyval}"
message += f" somewhere in pattern={pattern}"
logger.debug(message)
return pattern
i = [m.start() for m in re.finditer(r"@", pattern)]
try:
sub_patterns = [pattern[i[j] + 1 : i[j + 1]] for j in range(0, len(i), 2)]
except IndexError as error:
raise IndexError(f"Could not separate pattern:{pattern} by '@'") from error
for sub_pattern in sub_patterns:
with contextlib.suppress(KeyError):
val = self.macros[sub_pattern.upper()]
try:
if val.count("@") > 0:
val = self.substitute(val, basetime, validtime, keyval)
except AttributeError:
pass
logger.debug("before replace macro={} pattern={}", sub_pattern, pattern)
pattern = self.sub_value(pattern, sub_pattern, val)
logger.debug("after replace macro={} pattern={}", sub_pattern, pattern)
# LBC number handling
try:
bd_nr = int(self.config["task.args.bd_nr"])
pattern = self.sub_value(pattern, "NNN", f"{bd_nr:03d}")
except KeyError:
pass
# Time handling
if basetime is None:
basetime = self.config.get("general.times.basetime", None)
if validtime is None:
validtime = self.config.get("general.times.validtime", None)
if isinstance(basetime, str):
basetime = as_datetime(basetime)
if isinstance(validtime, str):
validtime = as_datetime(validtime)
if basetime is not None:
pattern = self.substitute_datetime(pattern, basetime)
one_decade_pattern = (
get_decade(basetime) if self.config["pgd.one_decade"] else ""
)
pattern = self.sub_value(pattern, "ONE_DECADE", one_decade_pattern)
if basetime is not None and validtime is not None:
logger.debug(
"Substituted date/time info: basetime={} validtime={}",
basetime.strftime("%Y%m%d%H%M"),
validtime.strftime("%Y%m%d%H%M"),
)
pattern = self.substitute_datetime(pattern, validtime, "_LL")
lead_time = validtime - basetime
lead_seconds = int(lead_time.total_seconds())
lh = int(lead_seconds / 3600)
lm = int((lead_seconds % 3600 - lead_seconds % 60) / 60)
ls = int(lead_seconds % 60)
pattern = self.sub_value(pattern, "LH", f"{lh:02d}")
pattern = self.sub_value(pattern, "LL", f"{lh:02d}")
pattern = self.sub_value(pattern, "LLH", f"{lh:03d}")
pattern = self.sub_value(pattern, "LLL", f"{lh:03d}")
pattern = self.sub_value(pattern, "LLLH", f"{lh:04d}")
pattern = self.sub_value(pattern, "LLLL", f"{lh:04d}")
pattern = self.sub_value(pattern, "LM", f"{lm:02d}")
pattern = self.sub_value(pattern, "LS", f"{ls:02d}")
tstep = self.config["domain.tstep"]
if tstep is not None:
lead_step = lead_seconds // tstep
pattern = self.sub_value(pattern, "TTT", f"{lead_step:03d}")
pattern = self.sub_value(pattern, "TTTT", f"{lead_step:04d}")
start = self.config.get("general.times.start", None)
if start is not None:
pattern = self.substitute_datetime(pattern, as_datetime(start), "_START")
end = self.config.get("general.times.end", None)
if end is not None:
pattern = self.substitute_datetime(pattern, as_datetime(end), "_END")
logger.debug("Return pattern={}", pattern)
return pattern
[docs]
def evaluate(self, command_string: str, object_: Union[str, object]) -> Any:
"""Evaluate command string, by applying corresponding command of object.
Args:
command_string (str): Command string to evaluate
object_ (Union[str, object]): Object to apply command from (if command
is function of object). If str, the object is assumed to be a
module. If a class, the command is assumed to be a method of
the class.
Raises:
ModuleNotFoundError: If module {object_} not found
AttributeError: If module/class {object_} has no attribute named {func}
TypeError: If object is not a class or a string
TypeError: If the command to evaluate is not a function
Returns:
any: Return original command string if it is not a function call,
otherwise return the result of the function call.
"""
# Check if command string is a function call
match = re.match(r"(\w+)\((.*)\)", command_string)
if match:
# Get function name and arguments
func = match.group(1)
args = ast.literal_eval(match.group(2))
# Get function from object, if object is a string, i.e. a module
if isinstance(object_, str):
# Try getting module
module = sys.modules.get(object_)
if module:
function = getattr(module, func)
else:
raise ModuleNotFoundError(f"Module {object_} not found")
# Get function from object, if object is a class
elif inspect.isclass(object_):
function = getattr(object_, func)
else:
raise TypeError(f"Object '{object_}' is not a class or a string")
# Call function with arguments if function is callable
if inspect.isfunction(function):
return function(*args)
raise TypeError(f"Object '{function}' is not a function")
# Return original command string if it is not a function call
return command_string
[docs]
class FileManager:
"""FileManager class.
Default DEDODE provider.
Platform specific.
"""
def __init__(self, config):
"""Construct the object.
Args:
config (deode.ParsedConfig): Configuration
"""
self.config = config
self.platform = Platform(config)
logger.debug("Constructed FileManager object.")
[docs]
def get_input(
self,
target,
destination,
basetime=None,
validtime=None,
check_archive=False,
provider_id="symlink",
):
"""Set input data to deode.
Args:
target (str): Input file pattern
destination (str): Destination file pattern
basetime (datetime.datetime, optional): Base time. Defaults to None.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
check_archive (bool, optional): Also check archive. Defaults to False.
provider_id (str, optional): Provider ID. Defaults to "symlink".
Raises:
ProviderError: "No provider found for {target}"
NotImplementedError: "Checking archive not implemented yet"
Returns:
tuple: provider, resource
"""
destination = LocalFileOnDisk(
self.config, destination, basetime=basetime, validtime=validtime
)
dest_file = destination.identifier
logger.debug("Set input for target={} to destination={}", target, dest_file)
if os.path.exists(dest_file):
logger.debug("Destination file already exists.")
return None, destination
logger.debug("Checking provider_id {}", provider_id)
sub_target = self.platform.substitute(
target, basetime=basetime, validtime=validtime
)
provider = self.platform.get_provider(provider_id, sub_target)
if provider.create_resource(destination):
logger.debug("Using provider_id {}", provider_id)
return provider, destination
# TODO check archive for file
if check_archive:
raise NotImplementedError("Checking archive not implemented yet")
# Substitute based on ecfs
sub_target = self.platform.substitute(
target, basetime=basetime, validtime=validtime
)
logger.debug("Checking archiving provider_id {}", provider_id)
provider = self.platform.get_provider(provider_id, sub_target)
if provider.create_resource(destination):
logger.debug("Using provider_id {}", provider_id)
return provider, destination
logger.info("Could not find in archive {}", destination.identifier)
# Else raise exception
raise ProviderError(
f"No provider found for {sub_target} and provider_id {provider_id}"
)
[docs]
def input_data_iterator(self, input_data_definition, provider_id="symlink"):
"""Handle input data spec dict.
Loop through the defined data types and fetch them.
Args:
input_data_definition (dict): Input data spec
provider_id (str): Provider id. Defaults to "symlink"
Raises:
RuntimeError: "Invalid data handle type"
"""
for data_type, data in input_data_definition.items():
logger.info("Link data type: {}", data_type)
path = self.platform.substitute(data["path"])
files = data["files"]
if isinstance(files, list):
for filenames in files:
self.input(f"{path}/{filenames}", filenames, provider_id=provider_id)
elif isinstance(files, dict):
for _outfile, _infile in files.items():
infile = self.platform.substitute(_infile)
outfile = self.platform.substitute(_outfile)
self.input(f"{path}/{infile}", outfile, provider_id=provider_id)
else:
raise RuntimeError(f"Unknown data type in {input_data_definition}")
[docs]
def input(
self,
target,
destination,
basetime=None,
validtime=None,
check_archive=False,
provider_id="symlink",
):
"""Set input data to deode.
Args:
target (str): Input file pattern
destination (str): Destination file pattern
basetime (datetime.datetime, optional): Base time. Defaults to None.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
check_archive (bool, optional): Also check archive. Defaults to False.
provider_id (str, optional): Provider ID. Defaults to "symlink".
"""
__, __ = self.get_input(
target,
destination,
basetime=basetime,
validtime=validtime,
check_archive=check_archive,
provider_id=provider_id,
)
[docs]
def get_output(
self,
target,
destination,
basetime=None,
validtime=None,
archive=False,
provider_id="move",
):
"""Set output data from deode.
Args:
target (str): Input file pattern
destination (str): Destination file pattern
basetime (datetime.datetime, optional): Base time. Defaults to None.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
archive (bool, optional): Also archive data. Defaults to False.
provider_id (str, optional): Provider ID. Defaults to "move".
Returns:
tuple: provider, aprovider, resource
Raises:
ArchiveError: Could not archive data
NotImplementedError: Archive = True is not implemented
"""
sub_target = self.platform.substitute(
target, basetime=basetime, validtime=validtime
)
sub_destination = self.platform.substitute(
destination, basetime=basetime, validtime=validtime
)
logger.debug(
"Set output for target={} to destination={}", sub_target, sub_destination
)
target_resource = LocalFileOnDisk(
self.config, sub_target, basetime=basetime, validtime=validtime
)
logger.debug(
"Checking provider_id={} for destination={} ", provider_id, sub_destination
)
provider = self.platform.get_provider(provider_id, sub_destination, fetch=False)
if provider.create_resource(target_resource):
target = destination
logger.debug("Using provider_id {}", provider_id)
aprovider = None
if archive:
# TODO check for archive and modify macros
raise NotImplementedError("Archive = True is not implemented")
sub_target = self.platform.substitute(
target, basetime=basetime, validtime=validtime
)
sub_destination = self.platform.substitute(
destination, basetime=basetime, validtime=validtime
)
target_resource = LocalFileOnDisk(
self.config, sub_target, basetime=basetime, validtime=validtime
)
logger.debug(
"Set output for target={} to destination={}", sub_target, sub_destination
)
logger.info("Checking archive provider_id {}", provider_id)
aprovider = self.platform.get_provider(
provider_id, sub_destination, fetch=False
)
if aprovider.create_resource(target_resource):
logger.debug("Using provider_id {}", provider_id)
else:
raise ArchiveError("Could not archive data")
return provider, aprovider, target_resource
[docs]
def output(
self,
target,
destination,
basetime=None,
validtime=None,
archive=False,
provider_id="move",
):
"""Set output data from deode.
Args:
target (str): Input file pattern
destination (str): Destination file pattern
basetime (datetime.datetime, optional): Base time. Defaults to None.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
archive (bool, optional): Also archive data. Defaults to False.
provider_id (str, optional): Provider ID. Defaults to "move".
"""
__, __, __ = self.get_output(
target,
destination,
basetime=basetime,
validtime=validtime,
archive=archive,
provider_id=provider_id,
)
[docs]
def set_resources_from_dict(self, res_dict):
"""Set resources from dict.
Args:
res_dict (_type_): _description_
Raises:
ValueError: If the passed file type is neither 'input' nor 'output'.
"""
for ftype, fobj in res_dict.items():
for target, settings in fobj.items():
logger.debug("ftype={} target={}, settings={}", ftype, target, settings)
kwargs = {"basetime": None, "validtime": None, "provider_id": None}
keys = []
if ftype == "input":
keys = ["basetime", "validtime", "check_archive", "provider_id"]
kwargs.update({"check_archive": False})
elif ftype == "output":
keys = ["basetime", "validtime", "archive", "provider_id"]
kwargs.update({"archive": False})
destination = settings["destination"]
for key in keys:
if key in settings:
kwargs.update({key: settings[key]})
logger.debug("kwargs={}", kwargs)
if ftype in ["input", "output"]:
self.input(target, destination, **kwargs)
else:
raise ValueError(
f"Unknown file type '{ftype}'. Must be either 'input' or 'output'"
)
[docs]
def create_list(self, basetime, forecast_range, input_template, output_settings):
"""Create list of files to process.
Args:
basetime (datetime.datetime): Base time,
forecast_range (datetime.datetime): forecast range,
input_template (str): Input template,
output_settings (str): Output settings
Returns:
dict: dict of validates and grib fiels
"""
logger.info("template: {}, settings: {}", input_template, output_settings)
files = {}
# Store the output
dt_list = oi2dt_list(output_settings, forecast_range)
for dt in dt_list:
validtime = basetime + dt
fname = self.platform.substitute(input_template, validtime=validtime)
files[validtime] = f"{self.archive}/{fname}"
return files
[docs]
class LocalFileSystemSymlink(Provider):
"""Local file system."""
def __init__(self, config, pattern, fetch=True):
"""Construct the object.
Args:
config (deode.ParsedConfig): Configuration
pattern (str): Identifier string
fetch (bool, optional): Fetch data. Defaults to True.
"""
Provider.__init__(self, config, pattern, fetch=fetch)
[docs]
def create_resource(self, resource):
"""Symlink the resource.
Args:
resource (Resource): Resource.
Returns:
bool: True if success
"""
if self.fetch:
if os.path.exists(self.identifier):
logger.info("ln -sf {} {} ", self.identifier, resource.identifier)
os.system(f"ln -sf {self.identifier} {resource.identifier}") # noqa S605
return True
logger.warning("File is missing {} ", self.identifier)
return False
if os.path.exists(resource.identifier):
logger.info("ln -sf {} {} ", resource.identifier, self.identifier)
os.system(f"ln -sf {resource.identifier} {self.identifier}") # noqa S605
return True
logger.warning("File is missing {} ", resource.identifier)
return False
[docs]
class LocalFileSystemCopy(Provider):
"""Local file system copy."""
def __init__(self, config, pattern, fetch=True):
"""Construct the object.
Args:
config (deode.ParsedConfig): Configuration
pattern (str): Identifier string
fetch (bool, optional): Fetch data. Defaults to False.
"""
Provider.__init__(self, config, pattern, fetch=fetch)
[docs]
def create_resource(self, resource):
"""Create the resource.
Args:
resource (Resource): Resource.
Returns:
bool: True if success
"""
if self.fetch:
if os.path.exists(self.identifier):
self.create_missing_dir(resource.identifier)
logger.info("cp {} {} ", self.identifier, resource.identifier)
os.system(f"cp {self.identifier} {resource.identifier}") # noqa S605
return True
logger.warning("File is missing {} ", self.identifier)
return False
if os.path.exists(resource.identifier):
self.create_missing_dir(self.identifier)
logger.info("cp {} {} ", resource.identifier, self.identifier)
os.system(f"cp {resource.identifier} {self.identifier}") # noqa S605
return True
logger.warning("File is missing {} ", resource.identifier)
return False
[docs]
class LocalFileSystemMove(Provider):
"""Local file system move."""
def __init__(self, config, pattern, fetch=False):
"""Construct the object.
Args:
config (deode.ParsedConfig): Configuration
pattern (str): Identifier string
fetch (bool, optional): Fetch data. Defaults to False.
"""
Provider.__init__(self, config, pattern, fetch=fetch)
[docs]
def create_resource(self, resource):
"""Create the resource.
Args:
resource (Resource): Resource.
Returns:
bool: True if success
"""
if self.fetch:
if os.path.exists(self.identifier):
self.create_missing_dir(resource.identifier)
logger.info("mv {} {} ", self.identifier, resource.identifier)
os.system(f"mv {self.identifier} {resource.identifier}") # noqa S605
return True
logger.warning("File is missing {} ", self.identifier)
return False
if os.path.exists(resource.identifier):
self.create_missing_dir(self.identifier)
logger.info("mv {} {} ", resource.identifier, self.identifier)
os.system(f"mv {resource.identifier} {self.identifier}") # noqa S605
return True
logger.warning("File is missing {} ", resource.identifier)
return False
[docs]
class ArchiveProvider(Provider):
"""Data from ECFS."""
def __init__(self, config, pattern, fetch=True):
"""Construct the object.
Args:
config (deode.ParsedConfig): Configuration
pattern (str): Filepattern
fetch (bool, optional): Fetch the data. Defaults to True.
"""
Provider.__init__(self, config, pattern, fetch=fetch)
[docs]
def create_resource(self, resource):
"""Create the resource.
Args:
resource (Resource): Resource.
Returns:
bool: True if success
"""
return Provider.create_resource(self, resource)
[docs]
class ECFS(ArchiveProvider):
"""Data from ECFS."""
def __init__(self, config, pattern, fetch=True):
"""Construct ECFS provider.
Args:
config (deode.ParsedConfig): Configuration
pattern (str): Filepattern
fetch (bool, optional): Fetch the data. Defaults to True.
"""
ArchiveProvider.__init__(self, config, pattern, fetch=fetch)
[docs]
def create_resource(self, resource):
"""Create the resource.
Args:
resource (Resource): Resource.
Returns:
bool: True if success
"""
# TODO: Address the noqa check disablers
if self.fetch:
logger.info("ecp -pu {} {}", self.identifier, resource.identifier)
os.system(
f"ecp -pu {self.identifier} {resource.identifier}" # noqa S605, E800
)
else:
logger.info("ecp -pu {} {}", resource.identifier, self.identifier)
os.system(
f"ecp -pu {resource.identifier} {self.identifier}" # noqa S605, E800
)
return True
[docs]
class SCP(ArchiveProvider):
"""Transfer data with SCP."""
def __init__(self, config, pattern, fetch=True):
"""Construct SCP provider.
Args:
config (deode.ParsedConfig): Configuration
pattern (str): Filepattern
fetch (bool, optional): Fetch the data. Defaults to True.
"""
ArchiveProvider.__init__(self, config, pattern, fetch=fetch)
[docs]
def create_resource(self, resource):
"""Create the resource.
Args:
resource (Resource): Resource.
Raises:
RuntimeError: If directory is not created
Returns:
bool: True if success
"""
# Assumes self.identifier=host:full_file_path
remote_host, remote_file = str(self.identifier).split(":")
remote_dir = os.path.dirname(remote_file)
ssh = SSHConnection({"host": remote_host}, None)
if self.fetch:
logger.info("scp src={} to dst={}", self.identifier, resource.identifier)
ssh.getfile(remote_file, resource.identifier)
else:
if len(remote_dir) > 0:
iret = 1
tries = 0
while iret != 0 and tries < 5:
cmd = ssh.execute(["ls", remote_dir])
cmd.communicate()
iret = cmd.returncode
if iret != 0:
ssh.execute(["mkdir", "-p", f"{remote_dir}"])
tries += 1
if iret != 0:
raise RuntimeError(f"Could not create remote directory: {remote_dir}")
logger.info("scp src={} to dst={}", resource.identifier, self.identifier)
ssh.sendfile(resource.identifier, remote_file)
return True
[docs]
class FDB(ArchiveProvider):
"""Dummy FDB class."""
def __init__(self, config, pattern, fetch=True):
"""Construct FDB provider.
Args:
config (deode.ParsedConfig): Configuration
pattern (str): Filepattern
fetch (bool, optional): Fetch the data. Defaults to True.
"""
import pyfdb
ArchiveProvider.__init__(self, config, pattern, fetch=fetch)
self.fdb = pyfdb.FDB()
[docs]
def create_resource(self, resource):
"""Create the resource.
Args:
resource (Resource): Resource.
Raises:
RuntimeError: If expver not set
Returns:
bool: True if success
"""
rules = dict(self.config["fdb.negative_rules"])
grib_set = dict(self.config["fdb.grib_set"])
if "expver" not in grib_set:
msg = """
Please set expver in the config section fdb.grib_set before archiving to FDB
and consult documentation before selecting expver
"""
logger.error(msg)
raise RuntimeError(msg)
grib_set["georef"] = compute_georef(self.config["domain"])
if self.fetch:
logger.warning("FDB not yet implemented for {}", resource)
else:
logger.info("Archiving {} with pyfdb", resource.identifier)
filename = os.path.basename(resource.identifier)
# Create rules-file, temp files to replace $2 and temp.grib
temp1 = f"{filename}_temp1.grib"
temp2 = f"{filename}_temp2.grib"
rules_file = "temp_rules"
self._write_rules_file(rules_file, rules, neg="!")
os.system(
f"grib_filter {rules_file} {resource.identifier} -o {temp1}" # noqa S605
)
set_values = ",".join([f"{key}={value}" for key, value in grib_set.items()])
cmd_for_grib = "grib_set -s " + set_values + f" {temp1} {temp2}"
logger.debug(cmd_for_grib)
os.system(cmd_for_grib) # noqa S605
with open(temp2, "rb") as infile:
self.fdb.archive(infile.read())
self.fdb.flush()
create_inv = self.config.get("fdb.create_filter_inverse", False)
if create_inv:
# Create example files for parameters not archived
inv_temp1 = f"{filename}_temp1_inv.grib"
inv_rules_file = "inv_temp_rules"
self._write_rules_file(inv_rules_file, rules, oper=" || ")
os.system(
f"grib_filter {inv_rules_file} {resource.identifier} -o {inv_temp1}" # noqa S605
)
if os.path.isfile(inv_temp1):
logger.info("Created file with non archived fields as {}", inv_temp1)
else:
os.remove(temp1)
os.remove(temp2)
os.remove(rules_file)
return True
def _write_rules_file(self, filename, rules, neg="", oper=" && "):
x = [
f'{neg}({name} is "{value}")'
for name, values in rules.items()
for value in values
]
rule = "if (" + oper.join(x) + ") { append; }\n"
with open(filename, "w") as outfile:
outfile.write("set edition = 2;\n")
outfile.write(rule)
[docs]
def compute_georef(domain_config):
"""Computes georef from domain_config."""
lat_center = domain_config["xlatcen"]
lon_center = domain_config["xloncen"]
return geohash_hilbert.encode(lng=lon_center, lat=lat_center, precision=6)
[docs]
class Resource:
"""Resource container."""
def __init__(self, _config, identifier):
"""Construct resource.
Args:
config (deode.ParsedConfig): Configuration
identifier (str): Resource identifier
"""
self.identifier = identifier
logger.debug("Base resource")
[docs]
class LocalFileOnDisk(Resource):
"""Local file on disk."""
def __init__(self, config, pattern, basetime=None, validtime=None):
"""Construct local file on disk.
Args:
config (deode.ParsedConfig): Configuration
pattern (str): Identifier pattern
basetime (datetime.datetime, optional): Base time. Defaults to None.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
"""
platform = Platform(config)
identifier = platform.substitute(pattern, basetime=basetime, validtime=validtime)
Resource.__init__(self, config, identifier)