Source code for deode.os_utils

"""Utilities for simple tasks on OS level."""

import contextlib
import os
import pathlib
import re
import shutil
import subprocess
import time
from pathlib import Path
from typing import Union

from . import GeneralConstants
from .logs import logger






[docs] def filepath_iterator(paths, filename_pattern="*"): """Return iterator of paths to files given a list of file or directory paths. Given a path or list of paths, yield Path objects corresponding to them. If a path points to a directory, then the directory is searched recursively and the paths to the files found in this process will be yielded. Args: paths (typing.Union[pathlib.Path, List[pathlib.Path], str, List[str]]): A single path or a collection of paths. filename_pattern (str, optional): Pattern used in the recursive glob in order to select the names of the files to be yielded. Defaults to "*". Yields: pathlib.Path: Path to located files. """ if isinstance(paths, (str, Path)): paths = [paths] for path_ in paths: path = Path(path_).expanduser().resolve() if path.is_dir(): for subpath in path.rglob(filename_pattern): if subpath.is_file(): yield subpath else: yield path
[docs] def deodemakedirs(path, unixgroup="", exist_ok=True, def_dir_mode=0o755): """Create directories and change unix group as required. For a given path the top directory that does not yet exist is searched for, created and unix group is set, if required. Permissions are set such that all subdirectories and new files inherit the unix group. Args: path (str): directory path that should be created if it doesn't already exist. unixgroup (str, optional): unix group the newly created dirs should belong to. exist_ok (boolean, optional): Define whether directories may already exist or whether an error should be raised. def_dir_mode(int, optional): Default directory persmission mode. Defaults to 0o755 Raises: OSError: If cannot create the directory. """ p = Path(path).resolve() dir_mode = def_dir_mode if unixgroup != "": dir_mode = 0o2750 if p.parents[0].is_dir(): try: os.makedirs(path, mode=dir_mode, exist_ok=exist_ok) if unixgroup and (str(Path(path).group()) != str(unixgroup)): shutil.chown(path, group=unixgroup) # TODO: Check if we really need this permissive mask os.chmod(path, mode=dir_mode) except OSError as err: raise OSError(f"Cannot create {path} properly") from err else: # check directory tree for top dir that has to be created try: idx = 0 while not p.parents[idx + 1].is_dir(): idx += 1 os.makedirs(p.parents[idx], mode=0o2750, exist_ok=exist_ok) if unixgroup and str(p.parents[idx].group()) != str(unixgroup): shutil.chown(p.parents[idx], group=unixgroup) # TODO: Check if we really need this permissive mask os.chmod(p.parents[idx], mode=0o2750) # noqa S103 os.makedirs(path) except OSError as err: raise OSError(f"Cannot create {path} properly") from err
[docs] def remove_empty_dirs(src, dry_run=False): """Remove directories. Recursively and permanently removes the specified directory, and all of its empty subdirectories. Args: src (str or Path): Top search directory dry_run (boolean): Flag for execution of cleaning or not Returns: found_files (boolean): True if any files found """ cwd = os.getcwd() src_dir = Path(src) found_files = False if not src_dir.exists(): return found_files for path in src_dir.iterdir(): realpath = os.path.realpath(path) if path.is_file() or realpath == cwd: found_files = True continue found_files = remove_empty_dirs(path) or found_files if found_files: logger.debug(f"Keep:{src_dir}") else: logger.info(f"Remove:{src_dir}") if not dry_run: src_dir.rmdir() return found_files
[docs] def ping(host): """Ping host. Args: host(str): Host to ping Returns: (boolean): True if host responded """ cmd = ["ping", "-c", "1", host] try: subprocess.check_output(cmd, stderr=subprocess.STDOUT) # noqa S603 return True except subprocess.CalledProcessError: return False
[docs] def strip_off_mount_path(path: Union[str, Path]) -> Path: """Strip off the mount path from a given path. Assumptions: - the path contains the user name as a directory. - the parent of the user directory is of the format "<new-dir-name>" or "*_<new-dir-name>_*", where "*" contains no underscore(s) and where the <new-dir-name> will be used as the new parent directory name relative to the user directory. Args: path (Union[str, Path]): Path to strip off the mount path from. Returns: path: Path with the mount path stripped off. Raises: ValueError: If the parent of the user directory does not contain 0 or 2 underscores. Example: >>> strip_off_mount_path("/etc/ecmwf/nfs/dh1_home_b/$USER/Deode-Workflow/deode") Path("/home/$USER/Deode-Workflow/deode") """ file_parts = Path(path).parts user = os.environ.get("USER") if user is None: return Path(path) index_of_user = file_parts.index(user) parent_of_user = file_parts[max(0, index_of_user - 1)] # Get number of underscores in parent_of_user n_underscores = parent_of_user.count("_") if n_underscores not in [0, 2]: raise ValueError( "Parent of user directory must contain 0 or 2 underscores, " + f"but found {n_underscores}. Path: {path}" ) # Get middle part of parent_of_user if it contains 2 underscores if n_underscores == 2: parent_of_user_parts = parent_of_user.split("_") parent_of_user = parent_of_user_parts[1] return Path(pathlib.os.sep, parent_of_user, *file_parts[index_of_user:])
[docs] def resolve_path_relative_to_package(path: Path, ignore_errors: bool = False) -> Path: """Resolve path relative to package directory. If the path exists as is, return it. If not, check if it exists in the package directory and return path relative to package Args: path (Path): Path to resolve. ignore_errors (bool, optional): Option to ignore errors. Defaults to False. Raises: FileNotFoundError: If it was impossible to determine path relative to package. FileNotFoundError: If file does not exist locally or in the package directory. Returns: Path: Original path (if exists locally), or resolved path relative to package directory. """ path = path.resolve() # First check if path exists as is if not os.path.exists(path): # Get path relative to package. Needed when Deode-Workflow is installed as # a site-package e.g. when creating plugins. if GeneralConstants.PACKAGE_NAME in path.parts: # Find last occurence of package name in path package_index_in_path = ( len(path.parts) - path.parts[::-1].index(GeneralConstants.PACKAGE_NAME) - 1 ) # Stick together the path in the package directory config_parts = path.parts[package_index_in_path:] path = GeneralConstants.PACKAGE_DIRECTORY.parent / Path(*config_parts) elif not ignore_errors: raise FileNotFoundError( f"Could not determine path {path} relative to the " + f"{GeneralConstants.PACKAGE_NAME} package." ) # If not, check if it exists in the package directory (used when # deode is installed as package) if not os.path.exists(path) and not ignore_errors: raise FileNotFoundError( f"Config file {path} not found locally or in package directory" ) return path return path