Source code for deode.tasks.clean_old_data

"""Clean old data."""

import os
import shutil
from argparse import Namespace
from datetime import datetime

from ..commands_functions import remove_cases
from ..datetime_utils import as_timedelta
from ..logs import logger
from ..scheduler import EcflowServer
from .base import Task


[docs] class CleanOldData(Task): """Interface class to the cleaning of the old data.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration name (str): Name of task """ Task.__init__(self, config, __class__.__name__) self.now = datetime.now() self.force_delete_time = self.now - as_timedelta( config["clean_old_data.force_delete_period"] ) self.scratch = ( self.platform.get_platform_value("scratch") + config["clean_old_data.scratch_ext"] ) self.ecf_jobout = self.platform.get_value("scheduler.ecfvars.ecf_jobout") self.ecf_files = self.platform.get_value("scheduler.ecfvars.ecf_files")
[docs] def cutoff(self, delay): """Calculates the time to determine the old data. Args: delay (int): delay in days Returns: cutoff (int): time in seconds """ cutoff = self.now - delay return cutoff.timestamp()
[docs] def find_top_empty_directories(self, path): """Find highest-level directories whose subtree contains only empty directories. Args: path (str): Root directory to search. Returns: List[str]: Full paths to the highest-level empty directory trees. """ if not path: return [] empty_tree_dirs = set() for root, dirs, files in os.walk(path, topdown=False): if not files and ( not dirs or all(os.path.join(root, d) in empty_tree_dirs for d in dirs) ): empty_tree_dirs.add(root) result = [] for d in empty_tree_dirs: parent = os.path.dirname(d) if parent not in empty_tree_dirs: result.append(d) return sorted(result)
[docs] def remove_list(self, dir_list, files=False): """Remove directories/files from the list. Args: dir_list (list): Dictionary of directories/files files (boolean): If in list are files, Default False """ for dir_file in dir_list: if files: os.remove(dir_file) else: shutil.rmtree(dir_file) logger.info("Removing {}", dir_file)
[docs] class CleanCases(CleanOldData): """Clean old data from scratch.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ CleanOldData.__init__(self, config) self.name = "CleanCases" self.delay = as_timedelta(config["clean_old_data.cases_period"]) self.cutoff_time = self.cutoff(self.delay) ignore = list(config["clean_old_data.ignore"]) self.dw_case_prefixes = list(config["clean_old_data.dw_case_prefixes"]) self.ignore_suite = [*ignore] logger.info("Ignore: {}", ignore)
[docs] def execute(self): """Run clean cases.""" # Get suites server = EcflowServer(self.config) suites = server.get_suites_from_server(ignore=self.ignore_suite, complete=True) case_configs = [] for suite in suites: finish_time = server.suite_finish_time( suite, self.force_delete_time, last_task_name="PostMortem" ) if finish_time == self.force_delete_time.timestamp(): logger.info("There are no ecf files for suite {}", suite.name()) if finish_time < self.cutoff_time: # get config file of suite logger.info("suite {}", suite.name()) if suite.name().startswith(tuple(self.dw_case_prefixes)): try: case_config = server.get_config_of_suite(suite) case_configs.append(case_config) except FileNotFoundError: logger.warning("Config for suite {} does not exist", suite.name()) else: logger.warning("{} is not DW case", suite.name()) args = Namespace( config_files=case_configs, dry_run=self.config["remove.defaults.dry_run"], force_remove=False, execute_removal=self.config["remove.defaults.execute_removal"], ) remove_cases(args, self.config)
[docs] class CleanEmptyDirectories(CleanOldData): """Clean empty dirctories.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ CleanOldData.__init__(self, config) self.name = "CleanEmpryDirectories" self.ehype_work = self.platform.get_value("impact.ehype.communicate.work_root")
[docs] def execute(self): """Run clean empty directories.""" self.remove_list(self.find_top_empty_directories(self.scratch)) self.remove_list(self.find_top_empty_directories(self.ecf_jobout)) self.remove_list(self.find_top_empty_directories(self.ecf_files)) try: self.remove_list(self.find_top_empty_directories(self.ehype_work)) except KeyError: logger.warning("Switch of ehype cleaning as config is incomplete")