Source code for deode.tasks.archive

"""ARCHIVEHOUR // ARCHIVESTATIC."""

import glob
import os
import pathlib
import shutil

from ..logs import logger
from .base import Task


[docs] class Archive(Task): """Archving data.""" def __init__(self, config, datatype=None): """Construct the archive object. Loop over archive types (e.g. ecfs,fdb) for the current platform and store the active selections from config. Args: config (deode.ParsedConfig): Configuration datatype (str): Indicating data type (climate, hour) """ Task.__init__(self, config, __class__.__name__) self.archive_types = self.config.get("platform.archive_types", []) self.choices = {} self.archive_loc = {} if datatype is not None: choices_for_type = self.config[f"archiving.{datatype}"].dict() skipped_types = [] for archive_type, choices in choices_for_type.items(): d = { name: choice for name, choice in choices.items() if self.trigger(choice["active"]) } if len(d) > 0: if archive_type in self.archive_types: self.choices[archive_type] = d self.archive_loc[archive_type] = self.platform.get_value( f"archiving.prefix.{archive_type}", "" ) else: skipped_types.append(archive_type) if len(skipped_types) > 0: logger.warning( "Skipped archive types not defined for this host: {}", skipped_types )
[docs] def trigger(self, trigger): """Return trigger.""" if isinstance(trigger, bool): return trigger return self.config[trigger]
[docs] def execute(self): """Loops over archive choices.""" for archive_type, choices in self.choices.items(): logger.info("Archiving type: {}", archive_type) for name, choice in choices.items(): choice.pop("active") logger.info("Archiving {} with: {}", name, choice) outpath = choice.get("outpath", "") newname = choice.get("newname", None) self.archive( choice["pattern"], choice["inpath"], outpath, archive_type, newname, )
[docs] def archive(self, pattern, inpath, outpath, archive_type=None, newname=None): """Send files to the file manager. Args: pattern (str,list): string of list of patterns to search for inpath (str): Full path on the input archive outpath (str): relative path on the output archive archive_type (str, optional): Archive type. Defaults to None. newname (str, optional): Forces a rename of an identified file. Defaults to None. Raises: FileNotFoundError: If file not found """ out = self.platform.substitute(outpath) inp = self.platform.substitute(inpath) if newname is not None and isinstance(pattern, str): ptrn = self.platform.substitute(pattern) try: shutil.copy(ptrn, newname) except FileNotFoundError as error: raise FileNotFoundError( f"Could not find {ptrn}, incorrect pattern" ) from error _pattern = [pathlib.PurePath(os.getcwd(), newname)] logger.info("Copy {} to {}", ptrn, newname) elif isinstance(pattern, str): _pattern = [pattern] else: _pattern = pattern for ptrn in _pattern: search = str(pathlib.PurePath(inp, self.platform.substitute(ptrn))) files = [x for x in glob.glob(search) if os.path.isfile(x)] for filename in sorted(files): self.fmanager.output( filename, pathlib.PurePath( self.archive_loc[archive_type], out, os.path.basename(filename) ), provider_id=archive_type, )
[docs] class ArchiveStatic(Archive): """Archving task for static data.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ Archive.__init__(self, config, "static")
[docs] class ArchiveHour(Archive): """Archving task for time dependent data.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ Archive.__init__(self, config, "hour")