Source code for deode.cleaning

"""Clean deode file systems."""

import os
import re

from .datetime_utils import as_datetime, as_timedelta
from .logs import logger
from .os_utils import Search, remove_empty_dirs
from .toolbox import Platform


[docs] class CleanDeode: """Clean data.""" def __init__(self, config, defaults=None, basetime=None): """Construct object. Args: config (deode.ParsedConfig): Configuration defaults (dict): Default cleaning settings basetime (dateTime object): Reference time Raises: RuntimeError: If erroneous defaults """ if defaults is None: self.defaults = {} elif isinstance(defaults, dict): self.defaults = defaults else: self.defaults = defaults.dict() self.basetime = ( as_datetime(config["general.times.basetime"]) if basetime is None else basetime ) self.cycle_length = as_timedelta(config["general.times.cycle_length"]) self.platform = Platform(config) self._check_choice(self.defaults, "defaults") def _set_defaults(self, choice): """Copy default settings. Args: choice (dict): Dict with cleaning settings Returns: x (dict): Updated cleaning dict including default settings """ x = choice.copy() y = self.defaults.copy() # Do not copy competing settings if "ncycles_delay" in y and "cleaning_delay" in x: y.pop("ncycles_delay") if "ncycles_delay" in x and "cleaning_delay" in y: y.pop("cleaning_delay") for k, v in y.items(): if k not in choice: x[k] = v return x def _check_choice(self, choice, name=""): """Check choices. Args: choice (dict): Dict with cleaning settings name (str): Optional name for better logging Raises: RuntimeError: """ if "cleaning_delay" in choice and "ncycles_delay" in choice: raise RuntimeError( f"Define either ncycles_delay or cleaning_delay for {name}" ) for tag in ["cleaning_delay", "cleanin_max_delay", "step"]: if tag not in choice: continue delay = choice[tag] if as_timedelta(delay) % self.cycle_length != as_timedelta("PT0H"): logger.error(f"{tag} must be a multiple of cycle_length") logger.error(f"{tag}:{delay}") logger.error(f"cycle_length:{self.cycle_length}") raise RuntimeError
[docs] def prep_cleaning(self, choices, basetime=None): """Prepare tasks for cleaning. Args: choices (dict): Dict with cleaning settings basetime (dateTime object): Reference time """ if basetime is not None: self.basetime = basetime self.clean_tasks = {} for name, _choice in choices.items(): choice = self._set_defaults(_choice) if choice["active"]: choice.pop("active") self.clean_tasks[name] = choice # Check consistency of settings if "cleaning_delay" not in choice: choice["cleaning_delay"] = choice["ncycles_delay"] * self.cycle_length choice.pop("ncycles_delay") else: choice["cleaning_delay"] = as_timedelta(choice["cleaning_delay"]) if "cleaning_max_delay" not in choice: choice["cleaning_max_delay"] = choice["cleaning_delay"] else: choice["cleaning_max_delay"] = as_timedelta( choice["cleaning_max_delay"] ) if "step" not in choice: choice["step"] = self.cycle_length else: choice["step"] = as_timedelta(choice["step"]) self._check_choice(choice, name) self.clean_tasks[name] = choice
[docs] def clean(self): """Perform the cleaning.""" for name, choice in self.clean_tasks.items(): logger.info("Cleaning {}:{}", name, choice) basetime = self.basetime - choice["cleaning_delay"] endtime = self.basetime - choice["cleaning_max_delay"] dry_run = choice["dry_run"] while basetime >= endtime: inp = self.platform.substitute(choice["path"], basetime=basetime) pattern = ( [choice["include"]] if isinstance(choice["include"], str) else choice["include"] ) do_exclude = "exclude" in choice if do_exclude: exclude = re.compile(choice["exclude"]) logger.info("Clean path:{}", inp) for ptrn in pattern: files = Search.find_files(inp, recursive=True, pattern=ptrn) for filename in files: if do_exclude and exclude.match(filename): logger.info("Exclude:{}", filename) continue if dry_run: logger.info("Would have removed:{}", filename) else: logger.info("Remove :{}", filename) os.remove(filename) # Remove possible empty directories remove_empty_dirs(inp, dry_run=dry_run) basetime -= choice["step"]