"""Clean old data."""
import glob
import os
import re
import shutil
from datetime import datetime
from ..datetime_utils import as_timedelta
from ..logs import logger
from ..scheduler import EcflowServer
from .base import Task
[docs]
class CleanOldData(Task):
"""Interface class to the cleaning of the old data."""
def __init__(self, config):
"""Construct object.
Args:
config (deode.ParsedConfig): Configuration
name (str): Name of task
"""
Task.__init__(self, config, __class__.__name__)
self.now = datetime.now()
[docs]
def cutoff(self, delay):
"""Calculates the time to determine the old data.
Args:
delay (int): delay in days
Returns:
cutoff (int): time in seconds
"""
cutoff = self.now - delay
return cutoff.timestamp()
[docs]
def get_old(self, path, form, cutoff_time, ignore=None):
"""Get directories which are older cutoff.
Args:
path (str): Path to start of searching old directories
form (str): regex for path
cutoff_time (int): Directories or files older then cutoff
ignore (list): List of directories to ignore
Returns:
list_to_remove (list): List of directories for remove
"""
list_to_remove = []
glob_str = "*/".join(["*"] * (form.count("/") - form.count("^/")))
logger.info(glob_str)
logger.info(path)
glob_pattern = os.path.join(path, glob_str)
initial_matches = glob.glob(glob_pattern)
logger.info("initial: {}", initial_matches)
pattern = r"^" + re.escape(path) + r"{}".format(form)
pattern_no_whitespace = re.sub(r"\s+", "", pattern)
logger.info("patteren: {}", pattern_no_whitespace)
pattern_compiled = re.compile(pattern_no_whitespace)
matched_directories = [d for d in initial_matches if pattern_compiled.match(d)]
logger.info("Matched: {}", matched_directories)
for dir_path in matched_directories:
dir_mtime = os.path.getmtime(dir_path)
logger.debug(
"path: {}, time: {}, cutoff: {}",
dir_path,
dir_mtime,
cutoff_time,
)
if dir_mtime < cutoff_time:
if ignore is not None:
if os.path.basename(dir_path) not in ignore:
list_to_remove.append(dir_path)
else:
list_to_remove.append(dir_path)
return list_to_remove
[docs]
def remove_list(self, dir_list, files=False):
"""Remove directories/files from the list.
Args:
dir_list (list): Dictionary of directories/files
files (boolean): If in list are files, Default False
"""
logger.info(dir_list)
for dir_file in dir_list:
if files:
os.remove(dir_file)
else:
shutil.rmtree(dir_file)
logger.info("Removing {}", dir_file)
[docs]
class CleanScratchData(CleanOldData):
"""Clean old data from scratch."""
def __init__(self, config):
"""Construct object.
Args:
config (deode.ParsedConfig): Configuration
"""
CleanOldData.__init__(self, config)
self.name = "CleanScratchData"
self.delay = as_timedelta(config["clean_old_data.scratch_data_period"])
logger.info(self.platform.get_platform_value("scratch"))
self.scratch = (
self.platform.get_platform_value("scratch")
+ config["clean_old_data.scratch_ext"]
)
self.cutoff_time = self.cutoff(self.delay)
self.scratch_form = config["clean_old_data.scratch_format"]
ignore = list(config["clean_old_data.ignore"])
logger.info("Ignore: {}", ignore)
self.ignore_dir = ["IFS", "Clean_old_data", *ignore]
[docs]
def execute(self):
"""Run clean data from scratch."""
list_to_remove = self.get_old(
self.scratch,
self.scratch_form,
self.cutoff_time,
ignore=self.ignore_dir,
)
self.remove_list(list_to_remove)
[docs]
class CleanSuites(CleanOldData):
"""Clean old suites."""
def __init__(self, config):
"""Construct object.
Args:
config (deode.ParsedConfig): Configuration
"""
CleanOldData.__init__(self, config)
self.name = "CleanSuites"
self.ecf_jobout = self.platform.get_value("scheduler.ecfvars.ecf_jobout")
self.ecf_files = self.platform.get_value("scheduler.ecfvars.ecf_files")
self.suite_form = config["clean_old_data.suite_format"]
self.delay = as_timedelta(config["clean_old_data.suites_period"])
self.cutoff_time = self.cutoff(self.delay)
ignore = list(config["clean_old_data.ignore"])
logger.info("Ignore: {}", ignore)
self.ignore_suite = ["IFS", "Clean_old_data", "DE_NWP", *ignore]
[docs]
def execute(self):
"""Run clean suites."""
logger.info("Delay: {}, Cutoff:{}", self.delay, self.cutoff_time)
list_to_remove_files = self.get_old(
self.ecf_files, self.suite_form, self.cutoff_time, ignore=self.ignore_suite
)
list_to_remove_jobout = self.get_old(
self.ecf_jobout, self.suite_form, self.cutoff_time, ignore=self.ignore_suite
)
self.remove_list(list_to_remove_files)
self.remove_list(list_to_remove_jobout)
suites = [
os.path.basename(x)
for x in set(list(list_to_remove_files) + list(list_to_remove_jobout))
]
EcflowServer(self.config).remove_suites(suites)
[docs]
class CleanIFSData(CleanOldData):
"""Clean IFS data."""
def __init__(self, config):
"""Construct object.
Args:
config (deode.ParsedConfig): Configuration
"""
CleanOldData.__init__(self, config)
self.name = "CleanIFSData"
self.marsdir = (
self.platform.get_platform_value("scratch")
+ config["clean_old_data.scratch_ext"]
)
self.ifs_form = config["clean_old_data.ifs_format"]
self.delay = as_timedelta(config["clean_old_data.IFS_period"])
self.cutoff_time = self.cutoff(self.delay)
[docs]
def execute(self):
"""Run clean IFS data."""
list_to_remove = self.get_old(self.marsdir, self.ifs_form, self.cutoff_time)
self.remove_list(list_to_remove, files=True)