Source code for deode.cleaning

"""Clean deode file systems."""

import os
import re
import shutil
import subprocess

from .datetime_utils import as_datetime, as_timedelta
from .logs import logger
from .os_utils import Search, remove_empty_dirs
from .toolbox import Platform


[docs] def wipe_ecfs(ecfs_path): """Remove a full ecfs directory tree.""" command = ["erm", "-R", ecfs_path] try: result = subprocess.check_output(command, text=True) # noqa S603 logger.info(result) if result != "": logger.error(result) raise RuntimeError("Error running command: {}".format(command)) except subprocess.CalledProcessError as err: logger.warning(err)
[docs] class CleanDeode: """Clean data.""" def __init__(self, config, defaults=None, basetime=None): """Construct object. Args: config (deode.ParsedConfig): Configuration defaults (dict): Default cleaning settings basetime (dateTime object): Reference time Raises: RuntimeError: If erroneous defaults """ self.CLEANING_DEFAULTS = {"path": "", "ecfs_prefix": None} if defaults is None: self.defaults = {} elif isinstance(defaults, dict): self.defaults = defaults else: self.defaults = defaults.dict() self.basetime = ( as_datetime(config["general.times.basetime"]) if basetime is None else basetime ) self.cycle_length = as_timedelta(config["general.times.cycle_length"]) self.platform = Platform(config) self._check_choice(self.defaults, "defaults") archiving = config.get("archiving").dict() archiving.pop("prefix") self.has_ecfs = False for values in archiving.values(): if "ecfs" in values: self.has_ecfs = self.has_ecfs or any( x["active"] for x in values["ecfs"].values() if "active" in x ) def _set_defaults(self, choice): """Copy default settings. Args: choice (dict): Dict with cleaning settings Returns: x (dict): Updated cleaning dict including default settings """ x = choice.copy() y = self.defaults.copy() # Do not copy competing settings if "ncycles_delay" in y and "cleaning_delay" in x: y.pop("ncycles_delay") if "ncycles_delay" in x and "cleaning_delay" in y: y.pop("cleaning_delay") for key, value in self.CLEANING_DEFAULTS.items(): if key not in y: y[key] = value for k, v in y.items(): if k not in choice: x[k] = v return x def _check_choice(self, choice, name=""): """Check choices. Args: choice (dict): Dict with cleaning settings name (str): Optional name for better logging Raises: RuntimeError: """ if "cleaning_delay" in choice and "ncycles_delay" in choice: raise RuntimeError( f"Define either ncycles_delay or cleaning_delay for {name}" ) for tag in ["cleaning_delay", "cleanin_max_delay", "step"]: if tag not in choice: continue delay = choice[tag] if as_timedelta(delay) % self.cycle_length != as_timedelta("PT0H"): logger.error(f"{tag} must be a multiple of cycle_length") logger.error(f"{tag}:{delay}") logger.error(f"cycle_length:{self.cycle_length}") raise RuntimeError
[docs] def prep_cleaning(self, choices, basetime=None, dry_run=None): """Prepare tasks for cleaning. Args: choices (dict): Dict with cleaning settings basetime (dateTime object): Reference time dry_run (boolean): Flag for dry runs """ if basetime is not None: self.basetime = basetime self.clean_tasks = {} for name, _choice in choices.items(): choice = self._set_defaults(_choice) if choice["active"]: choice.pop("active") self.clean_tasks[name] = choice # Check consistency of settings if "cleaning_delay" not in choice: choice["cleaning_delay"] = choice["ncycles_delay"] * self.cycle_length choice.pop("ncycles_delay") else: choice["cleaning_delay"] = as_timedelta(choice["cleaning_delay"]) if "cleaning_max_delay" not in choice: choice["cleaning_max_delay"] = choice["cleaning_delay"] else: choice["cleaning_max_delay"] = as_timedelta( choice["cleaning_max_delay"] ) if "step" not in choice: choice["step"] = self.cycle_length else: choice["step"] = as_timedelta(choice["step"]) if dry_run: choice["dry_run"] = dry_run self._check_choice(choice, name) self.clean_tasks[name] = choice
[docs] def clean(self): """Perform the cleaning.""" for name, choice in self.clean_tasks.items(): logger.info("Cleaning {}:{}", name, choice) basetime = self.basetime - choice["cleaning_delay"] endtime = self.basetime - choice["cleaning_max_delay"] dry_run = choice["dry_run"] wipe = choice["wipe"] while basetime >= endtime: inp = self.platform.substitute(choice["path"], basetime=basetime) if choice["path"]: logger.info("Clean path:{}", inp) if wipe: if choice["ecfs_prefix"] is not None and self.has_ecfs: ecfs_path = self.platform.substitute( f"{choice['ecfs_prefix']}/{inp}", basetime=basetime ) if dry_run: logger.info(" would have removed {}", ecfs_path) else: wipe_ecfs(ecfs_path) elif choice["ecfs_prefix"] is None: if os.path.isdir(inp): if dry_run: logger.info(" would have removed {}", inp) else: shutil.rmtree(inp) else: logger.warning("Path {} does not exist", inp) else: pattern = ( [choice["include"]] if isinstance(choice["include"], str) else choice["include"] ) do_exclude = "exclude" in choice if do_exclude: exclude = re.compile(choice["exclude"]) for ptrn in pattern: files = Search.find_files(inp, recursive=True, pattern=ptrn) for filename in files: if do_exclude and exclude.match(filename): logger.info("Exclude:{}", filename) continue if dry_run: logger.info(" would have removed:{}", filename) else: logger.info("Remove :{}", filename) os.remove(filename) # Remove possible empty directories remove_empty_dirs(inp, dry_run=dry_run) basetime -= choice["step"]