"""Scheduler module."""
import os
import platform
import signal
import sys
import time
import traceback
from abc import ABC, abstractmethod
from datetime import datetime
from .logs import logger
from .os_utils import ping
from .toolbox import Platform
try:
import ecflow
except ModuleNotFoundError:
ecflow = None
# Base Scheduler server class
[docs]
class Server(ABC):
"""Base server/scheduler class."""
def __init__(self, config):
"""Construct the server."""
self.settings = None
self.config = config
[docs]
@abstractmethod
def start_server(self):
"""Start the server.
Raises:
NotImplementedError: Must be implemented by the child server object.
"""
raise NotImplementedError
[docs]
@abstractmethod
def replace(self, suite_name, def_file):
"""Create or change the suite definition.
Args:
suite_name (str): Name of the suite.
def_file (str): Name of the definition file.
Raises:
NotImplementedError: Must be implemented by the child server object.
"""
raise NotImplementedError
[docs]
@abstractmethod
def begin_suite(self, suite_name):
"""Begin the suite in a server specific way.
Args:
suite_name (str): Name of the suite
Raises:
NotImplementedError: Must be implemented by the child server object.
"""
raise NotImplementedError
[docs]
def start_suite(self, suite_name, def_file, begin=True):
"""Start the suite.
All the servers have these methods implemented and can start the server in a
server specific way.
Args:
suite_name (str): Name of the suite
def_file (str): Name of the definition file.
begin (bool, optional): If the suite should begin. Defaults to True.
"""
self.start_server()
self.replace(suite_name, def_file)
if begin:
self.begin_suite(suite_name)
[docs]
class EcflowServer(Server):
"""Ecflow server."""
def __init__(self, config, start_command=None):
"""Construct the EcflowServer.
The values for ecf_host and ecf_port are taken from config as
strings/integer or the two functions _select_host_from_list()
or _set_port_from_user() defined below.
Args:
config (str): configuration settings.
start_command (str): Ecflow start server command.
Raises:
ModuleNotFoundError: If ecflow is not found.
RuntimeError: If ecf_port is not set
"""
if ecflow is None:
raise ModuleNotFoundError("Ecflow not found")
Server.__init__(self, config)
ecf_host = self.config["scheduler.ecfvars.ecf_host"]
ecf_host = Platform(config).substitute(ecf_host)
self.ecf_host = Platform(config).evaluate(ecf_host, object_=EcflowServer)
ecf_port = self.config["scheduler.ecfvars.ecf_port"]
try:
self.ecf_port = int(ecf_port)
except ValueError:
self.ecf_port = Platform(config).evaluate(ecf_port, object_=EcflowServer)
self.start_command = start_command
logger.debug("self.ecf_host={} self.ecf_port={}", self.ecf_host, self.ecf_port)
self.ecf_client = ecflow.Client(self.ecf_host, self.ecf_port)
logger.debug("self.ecf_client {}", self.ecf_client)
self.settings = {"ECF_HOST": self.ecf_host, "ECF_PORT": self.ecf_port}
@staticmethod
def _set_port_from_user(offset=0):
"""Set ecf_port from user id.
Arguments:
offset (int): Number to offset the user id with
Returns:
port (int): Derived port number
"""
port = os.getuid() + int(offset)
return port
@staticmethod
def _select_host_from_list(hosts, tries=3, delay=1):
"""Set ecf_host from list of options.
Try to ping server tries times before giving up.
Arguments:
hosts (list): list of host options
tries (int): number of times to try to find a host
delay (int): number of seconds to wait between each try
Returns:
host (str): Selected host
Raises:
RuntimeError: In case no or more than one host found
"""
found_hosts = []
ntry = 1
while ntry <= tries:
for _host in hosts:
host = _host.strip()
if ping(host):
found_hosts.append(host)
if len(found_hosts) == 0 and ntry == tries:
host_list = ",".join(hosts)
msg = f"No ecflow host found, tried:{host_list}"
logger.error(msg)
raise RuntimeError(msg)
if len(found_hosts) == 1:
break
if len(found_hosts) > 1:
host_list = ",".join(found_hosts)
msg = f"Ambigious host selection:{host_list}"
logger.error(msg)
raise RuntimeError(msg)
time.sleep(delay)
ntry += 1
return found_hosts[0]
[docs]
def start_server(self):
"""Start the server.
Raises:
RuntimeError: Server is not running or Could not restart server.
"""
logger.debug("Start EcFlow server")
try:
logger.info("ECF_HOST:{}, ECF_PORT:{}", self.ecf_host, self.ecf_port)
self.ecf_client.ping()
logger.info("EcFlow server is already running")
except RuntimeError:
logger.info("Re-Start EcFlow server")
try:
# Start server
start_command = self.start_command
if self.start_command is None:
start_command = f"ecflow_start.sh -p {self.ecf_port!s}"
logger.info(start_command)
# TODO
ret = os.system(start_command) # noqa
if ret != 0:
raise RuntimeError from RuntimeError
except RuntimeError as error:
raise RuntimeError("Could not restart server!") from error
[docs]
def begin_suite(self, suite_name):
"""Begin the suite.
Args:
suite_name (str): Nam eof the suite.
"""
self.ecf_client.begin_suite(suite_name)
[docs]
def force_complete(self, task):
"""Force the task complete.
Args:
task (scheduler.EcflowTask): Task to force complete.
"""
ecf_name = task.ecf_name
self.ecf_client.force_state(ecf_name, ecflow.State.complete)
[docs]
def force_aborted(self, task):
"""Force the task aborted.
Args:
task (scheduler.EcflowTask): Task to force aborted.
"""
ecf_name = task.ecf_name
self.ecf_client.force_state(ecf_name, ecflow.State.aborted)
[docs]
def replace(self, suite_name, def_file):
"""Replace the suite name from def_file.
Args:
suite_name (str): Suite name.
def_file (str): Definition file.
Raises:
RuntimeError: If suite cannot be replaced.
"""
logger.debug("{} {}", suite_name, def_file)
try:
self.ecf_client.replace("/" + suite_name, def_file)
except RuntimeError:
try:
self.ecf_client.delete("/" + suite_name)
self.ecf_client.replace("/" + suite_name, def_file)
except RuntimeError as err:
raise RuntimeError("Could not replace suite " + suite_name) from err
[docs]
def remove_suites(self, suite_list):
"""Remove suites selected from a list.
Args:
suite_list (list): Suite names.
"""
self.ecf_client.sync_local()
suites = self.ecf_client.get_defs().suites
for suite in suites:
if suite.name() in suite_list:
logger.info("Removing suite {}", suite.name())
self.ecf_client.delete(suite.name())
[docs]
class EcflowLogServer:
"""Ecflow log server."""
def __init__(self, config):
"""Constuct the ecflow log server.
Args:
config (dict): Configuration
"""
self.config = config
self.ecf_loghost = config.get("ECF_LOGHOST")
self.ecf_logport = config.get("ECF_LOGPORT")
[docs]
class EcflowTask:
"""Ecflow scheduler task."""
def __init__(self, ecf_name, ecf_tryno, ecf_pass, ecf_rid, ecf_timeout=20):
"""Construct a task running and communicating with ecflow server.
Args:
ecf_name (str): Full name of ecflow task.
ecf_tryno (int): Ecflow task try number
ecf_pass (str): Ecflow task password
ecf_rid (int): Ecflow runtime ID
ecf_timeout (int, optional): _description_. Defaults to 20.
"""
self.ecf_name = ecf_name
self.ecf_tryno = int(ecf_tryno)
self.ecf_pass = ecf_pass
if ecf_rid == "" or ecf_rid is None:
ecf_rid = os.getpid()
self.ecf_rid = int(ecf_rid)
self.ecf_timeout = int(ecf_timeout)
ecf_name_parts = self.ecf_name.split("/")
self.ecf_task = ecf_name_parts[-1]
ecf_families = None
if len(ecf_name_parts) > 2:
ecf_families = ecf_name_parts[1:-1]
self.ecf_families = ecf_families
self.family1 = None
if self.ecf_families is not None:
self.family1 = self.ecf_families[-1]
[docs]
class EcflowClient(object):
"""An ecflow client.
Encapsulate communication with the ecflow server. This will automatically call
the child command init()/complete(), for job start/finish. It will also
handle exceptions and signals, by calling the abort child command.
*ONLY* one instance of this class, should be used. Otherwise zombies will be created.
"""
def __init__(self, server, task):
"""Construct the ecflow client.
Args:
server (EcflowServer): Ecflow server object.
task (EcflowTask): Ecflow task object.
"""
logger.debug("Creating Client")
self.server = server
self.client = server.ecf_client
# self.ci.set_host_port("%ECF_HOST%", "%ECF_PORT%") #noqa E800
self.client.set_child_pid(task.ecf_rid)
self.client.set_child_path(task.ecf_name)
self.client.set_child_password(task.ecf_pass)
self.client.set_child_try_no(task.ecf_tryno)
logger.info(
" Only wait {} seconds, if the server cannot be contacted "
"(note default is 24 hours) before failing",
str(task.ecf_timeout),
)
self.client.set_child_timeout(task.ecf_timeout)
self.task = task
# Abort the task for the following signals
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGHUP, self.signal_handler)
signal.signal(signal.SIGQUIT, self.signal_handler)
signal.signal(signal.SIGILL, self.signal_handler)
signal.signal(signal.SIGTRAP, self.signal_handler)
signal.signal(signal.SIGIOT, self.signal_handler)
signal.signal(signal.SIGBUS, self.signal_handler)
signal.signal(signal.SIGFPE, self.signal_handler)
signal.signal(signal.SIGUSR1, self.signal_handler)
signal.signal(signal.SIGUSR2, self.signal_handler)
signal.signal(signal.SIGPIPE, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGXCPU, self.signal_handler)
if platform.system() != "Darwin":
signal.signal(signal.SIGPWR, self.signal_handler)
[docs]
@staticmethod
def at_time():
"""Generate time stamp.
Returns:
str: Time stamp.
"""
return datetime.fromtimestamp(time.time()).strftime("%H:%M:%S")
[docs]
def signal_handler(self, signum, extra=None):
"""Signal handler.
Args:
signum (_type_): _description_
extra (_type_, optional): _description_. Defaults to None.
"""
logger.info(" Aborting: Signal handler called with signal {}", str(signum))
self.__exit__(
InterruptedError, "Signal handler called with signal " + str(signum), extra
)
def __enter__(self):
"""Enter the object.
Returns:
_type_: _description_
"""
logger.info("Calling init at: {}", self.at_time())
if self.client is not None:
self.client.child_init()
return self.client
def __exit__(self, ex_type, value, tback):
"""Exit method.
Args:
ex_type (_type_): _description_
value (_type_): _description_
tback (_type_): _description_
Returns:
_type_: _description_
"""
logger.info(" Client:__exit__: ex_type: {} value: {}", str(ex_type), str(value))
if ex_type is not None:
logger.info("Calling abort {}", self.at_time())
self.client.child_abort(f"Aborted with exception type {ex_type!s}:{value!s}")
if tback is not None:
print(tback)
traceback.print_tb(tback, limit=1, file=sys.stdout)
print("*** print_exception:")
# exc_type below is ignored on 3.5 and later
print("*** print_exc:")
traceback.print_exc(limit=2, file=sys.stdout)
print("*** format_exc, first and last line:")
formatted_lines = traceback.format_exc().splitlines()
print(formatted_lines[0])
print(formatted_lines[-1])
print("*** format_exception:")
print("*** extract_tb:")
print(repr(traceback.extract_tb(tback)))
print("*** format_tb:")
print(repr(traceback.format_tb(tback)))
print("*** tb_lineno:", tback.tb_lineno)
return False
print("Calling complete at: " + self.at_time())
# self.server.update_log(self.task.ecf_name + " complete") #noqa E800
self.client.child_complete()
return False