Source code for deode.tasks.batch

"""Batch process."""
import subprocess
import sys

from ..logs import logger


[docs] class BatchJob(object): """Batch job.""" def __init__(self, rte, wrapper=""): """Construct batch job. Args: rte (dict): Run time environment. wrapper (str, optional): Wrapper around command. Defaults to "". """ self.rte = rte self.wrapper = wrapper logger.debug("Constructed BatchJob")
[docs] def run(self, cmd): """Run command. Args: cmd (str): Command to run. Raises: TypeError: If the provided command is not a string CalledProcessError: Execution error """ if not isinstance(cmd, str): raise TypeError(f"Command must be a string. Got {type(cmd)} instead.") cmd = self.wrapper + " " + cmd if "OMP_NUM_THREADS" in self.rte: logger.info("BATCH: {}", self.rte["OMP_NUM_THREADS"]) logger.info("Batch running {}", cmd) process = subprocess.Popen( cmd, shell=True, # noqa env=self.rte, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1, errors="replace", ) # Poll process for new output until finished while True: nextline = process.stdout.readline() if nextline == "" and process.poll() is not None: break sys.stdout.write(nextline) sys.stdout.flush() return_code = process.wait() if return_code != 0: raise subprocess.CalledProcessError(return_code, cmd)