Source code for deode.tasks.archive

"""ARCHIVEHOUR // ARCHIVESTATIC."""

import os

from deode.archive import Archive
from deode.tasks.base import Task


[docs] class ArchiveTask(Task): """Archving data.""" def __init__(self, config, datatype=None, include=None, exclude=None): """Construct the archive object.""" Task.__init__(self, config, __class__.__name__) self.da = Archive(config, datatype, include=include, exclude=exclude)
[docs] def execute(self): """Loops over archive choices.""" self.da.execute()
[docs] class ArchiveStaticMember(ArchiveTask): """Archving task for static data.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ ArchiveTask.__init__(self, config, "staticmember")
[docs] class ArchiveStatic(ArchiveTask): """Archving task for static data.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ ArchiveTask.__init__(self, config, "static")
[docs] class ArchiveHour(ArchiveTask): """Archving task for time dependent data.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ ArchiveTask.__init__(self, config, "hour", exclude=["fdb"])
[docs] class ArchiveFDB(ArchiveTask): """Archving task for time dependent data dedicated for FDB.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ ArchiveTask.__init__(self, config, "FDB", include=["fdb"])
[docs] class ArchiveDataBridge(ArchiveTask): """Archving task for time dependent data dedicated for the databridge.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ ArchiveTask.__init__(self, config, "DataBridge", include=["fdb"]) self._check_user() def _check_user(self): """Check if we are allowed to archive to the databridge.""" user = os.environ.get("USER") try: databridge_user = self.platform.substitute(self.config["fdb.databridge_user"]) except KeyError as error: raise KeyError("No databridge user defined") from error if user != databridge_user: raise ValueError( f"Archiving to the databridge is only allowed for user {databridge_user}" )
[docs] class ArchiveMergedSQLites(ArchiveTask): """Archving task for time dependent data.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ ArchiveTask.__init__(self, config, "merged_sqlite", exclude=["fdb"])