Source code for deode.tasks.clean_old_data

"""Clean old data."""

import glob
import os
import re
import shutil
from datetime import datetime

from ..datetime_utils import as_timedelta
from ..logs import logger
from ..scheduler import EcflowServer
from .base import Task


[docs] class CleanOldData(Task): """Interface class to the cleaning of the old data.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration name (str): Name of task """ Task.__init__(self, config, __class__.__name__) self.now = datetime.now()
[docs] def cutoff(self, delay): """Calculates the time to determine the old data. Args: delay (int): delay in days Returns: cutoff (int): time in seconds """ cutoff = self.now - delay return cutoff.timestamp()
[docs] def get_old(self, path, form, cutoff_time, ignore=None): """Get directories which are older cutoff. Args: path (str): Path to start of searching old directories form (str): regex for path cutoff_time (int): Directories or files older then cutoff ignore (list): List of directories to ignore Returns: list_to_remove (list): List of directories for remove """ list_to_remove = [] glob_str = "*/".join(["*"] * (form.count("/") - form.count("^/"))) logger.info(glob_str) logger.info(path) glob_pattern = os.path.join(path, glob_str) initial_matches = glob.glob(glob_pattern) logger.info("initial: {}", initial_matches) pattern = r"^" + re.escape(path) + r"{}".format(form) pattern_no_whitespace = re.sub(r"\s+", "", pattern) logger.info("patteren: {}", pattern_no_whitespace) pattern_compiled = re.compile(pattern_no_whitespace) matched_directories = [d for d in initial_matches if pattern_compiled.match(d)] logger.info("Matched: {}", matched_directories) for dir_path in matched_directories: dir_mtime = os.path.getmtime(dir_path) logger.debug( "path: {}, time: {}, cutoff: {}", dir_path, dir_mtime, cutoff_time, ) if dir_mtime < cutoff_time: if ignore is not None: if os.path.basename(dir_path) not in ignore: list_to_remove.append(dir_path) else: list_to_remove.append(dir_path) return list_to_remove
[docs] def remove_list(self, dir_list, files=False): """Remove directories/files from the list. Args: dir_list (list): Dictionary of directories/files files (boolean): If in list are files, Default False """ logger.info(dir_list) for dir_file in dir_list: if files: os.remove(dir_file) else: shutil.rmtree(dir_file) logger.info("Removing {}", dir_file)
[docs] class CleanScratchData(CleanOldData): """Clean old data from scratch.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ CleanOldData.__init__(self, config) self.name = "CleanScratchData" self.delay = as_timedelta(config["clean_old_data.scratch_data_period"]) logger.info(self.platform.get_platform_value("scratch")) self.scratch = ( self.platform.get_platform_value("scratch") + config["clean_old_data.scratch_ext"] ) self.cutoff_time = self.cutoff(self.delay) self.scratch_form = config["clean_old_data.scratch_format"] ignore = list(config["clean_old_data.ignore"]) logger.info("Ignore: {}", ignore) self.ignore_dir = ["IFS", "Clean_old_data", *ignore]
[docs] def execute(self): """Run clean data from scratch.""" list_to_remove = self.get_old( self.scratch, self.scratch_form, self.cutoff_time, ignore=self.ignore_dir, ) self.remove_list(list_to_remove)
[docs] class CleanSuites(CleanOldData): """Clean old suites.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ CleanOldData.__init__(self, config) self.name = "CleanSuites" self.ecf_jobout = self.platform.get_value("scheduler.ecfvars.ecf_jobout") self.ecf_files = self.platform.get_value("scheduler.ecfvars.ecf_files") self.suite_form = config["clean_old_data.suite_format"] self.delay = as_timedelta(config["clean_old_data.suites_period"]) self.cutoff_time = self.cutoff(self.delay) ignore = list(config["clean_old_data.ignore"]) logger.info("Ignore: {}", ignore) self.ignore_suite = ["IFS", "Clean_old_data", "DE_NWP", *ignore]
[docs] def execute(self): """Run clean suites.""" logger.info("Delay: {}, Cutoff:{}", self.delay, self.cutoff_time) list_to_remove_files = self.get_old( self.ecf_files, self.suite_form, self.cutoff_time, ignore=self.ignore_suite ) list_to_remove_jobout = self.get_old( self.ecf_jobout, self.suite_form, self.cutoff_time, ignore=self.ignore_suite ) self.remove_list(list_to_remove_files) self.remove_list(list_to_remove_jobout) suites = [ os.path.basename(x) for x in set(list(list_to_remove_files) + list(list_to_remove_jobout)) ] EcflowServer(self.config).remove_suites(suites)
[docs] class CleanIFSData(CleanOldData): """Clean IFS data.""" def __init__(self, config): """Construct object. Args: config (deode.ParsedConfig): Configuration """ CleanOldData.__init__(self, config) self.name = "CleanIFSData" self.marsdir = ( self.platform.get_platform_value("scratch") + config["clean_old_data.scratch_ext"] ) self.ifs_form = config["clean_old_data.ifs_format"] self.delay = as_timedelta(config["clean_old_data.IFS_period"]) self.cutoff_time = self.cutoff(self.delay)
[docs] def execute(self): """Run clean IFS data.""" list_to_remove = self.get_old(self.marsdir, self.ifs_form, self.cutoff_time) self.remove_list(list_to_remove, files=True)