import logging
from abc import ABC, abstractmethod
from distutils.version import Version
from threading import RLock
from typing import Optional, Dict, List, Type, cast, Union, Callable, Set
import psij
from psij.descriptor import Descriptor, _VersionEntry
from psij._plugins import _register_plugin, _get_plugin_class, _print_plugin_status
from psij.job_status import JobStatus
from psij.job import Job, JobStatusCallback, FunctionJobStatusCallback
from psij.job_executor_config import JobExecutorConfig
from psij.job_launcher import Launcher
logger = logging.getLogger(__name__)
[docs]class JobExecutor(ABC):
"""An abstract base class for all JobExecutor implementations."""
_executors = {} # type: Dict[str, List[_VersionEntry['JobExecutor']]]
def __init__(self, url: Optional[str] = None,
config: Optional[JobExecutorConfig] = None):
"""
Initializes this executor using an optional `url` and an optional configuration.
:param url: The URL is a string that a `JobExecutor` implementation can interpret as the
location of a backend.
:param config: An configuration specific to each `JobExecutor` implementation. This
parameter is marked as optional such that concrete `JobExecutor` classes can be
instantiated with no `config` parameter. However, concrete `JobExecutor` classes
must pass a default configuration up the inheritance tree and ensure that the
`config` parameter of the ABC constructor is non-null.
"""
self.url = url
assert config
self.config = config
# _cb is not thread-safe; changing it while jobs are running could lead to badness
self._cb = None # type: Optional[JobStatusCallback]
self._launchers_lock = RLock()
self._launchers = {} # type: Dict[str, Launcher]
@property
def name(self) -> str:
"""Returns the name of this executor."""
# The _NAME_ class attribute is set by the instantiation mechanism. This is only done to
# avoid name duplication/matching issues (name defined by the descriptor vs. name defined
# statically by a pre-set _NAME_attribute), but is not otherwise necessary to do so. In
# other words, one could very well return a static value here and ensure that the
# descriptor has the same name for this class.
return cast(str, getattr(self.__class__, '_NAME_'))
@property
def version(self) -> Version:
"""Returns the version of this executor."""
return cast(Version, getattr(self.__class__, '_VERSION_'))
[docs] @abstractmethod
def submit(self, job: Job) -> None:
"""
Submits a Job to the underlying implementation.
Successful return of this method indicates that the job has been sent to the underlying
implementation and all changes in the job status, including failures, are reported using
notifications. Conversely, if one of the two possible exceptions is thrown, then the job
has not been successfully sent to the underlying implementation, the job status remains
unchanged, and no status notifications about the job will be fired.
:raises ~psij.InvalidJobException: Thrown if the job specification cannot be understood.
This exception is fatal in that submitting another job with the exact same details will
also fail with an `~psij.InvalidJobException`. In principle, the underlying
implementation / LRM is the entity ultimately responsible for interpreting a
specification and reporting any errors associated with it. However, in many cases, this
reporting may come after a significant delay. In the interest of failing fast, library
implementations should make an effort of validating specifications early and throwing
this exception as soon as possible if that validation fails.
:raises ~psij.SubmitException: Thrown if the request cannot be sent to the underlying
implementation. Unlike `~psij.InvalidJobException`, this exception can occur for
reasons that are transient.
"""
pass
[docs] @abstractmethod
def cancel(self, job: Job) -> None:
"""
Cancels a job that has been submitted to underlying executor implementation.
A successful return of this method only indicates that the request for cancelation has been
communicated to the underlying implementation. The job will then be canceled at the
discretion of the implementation, which may be at some later time. A successful
cancelation is reflected in a change of status of the respective job to
:attr:`~psij.JobState.CANCELED`. User code can synchronously wait until the
:attr:`~psij.JobState.CANCELED` state is reached using `job.wait(JobState.CANCELED)` or
even `job.wait()`, since the latter would wait for all final states, including
`JobState.CANCELED`. In fact, it is recommended that `job.wait()` be used because it is
entirely possible for the job to complete before the cancelation is communicated to the
underlying implementation and before the client code receives the completion notification.
In such a case, the job will never enter the `CANCELED` state and
`job.wait(JobState.CANCELED)` would hang indefinitely.
:param job: The job to be canceled.
:raises ~psij.SubmitException: Thrown if the request cannot be sent to the underlying
implementation.
"""
pass
[docs] @abstractmethod
def list(self) -> List[str]:
"""List native IDs of all jobs known to the backend."""
pass
[docs] @abstractmethod
def attach(self, job: Job, native_id: str) -> None:
"""
Attaches a job to a native job.
:param job: A job to attach. The job must be in the :attr:`~psij.JobState.NEW` state.
:param native_id: The native ID to attach to as returned by :attr:`~psij.Job.native_id`.
"""
pass
[docs] def set_job_status_callback(self,
cb: Union[JobStatusCallback,
Callable[[Job, 'psij.JobStatus'], None]]) -> None:
"""
Registers a status callback with this executor.
The callback can either be a subclass of :class:`~psij.JobStatusCallback` or a function
accepting two arguments: a :class:`~psij.Job` and a :class:`~psij.JobStatus` and
returning nothing.
The callback will be invoked whenever a status change occurs for any of the jobs submitted
to this job executor, whether they were submitted with an individual job status callback or
not. To remove the callback, set it to `None`.
:param cb: An instance of :class:`~psij.JobStatusCallback` or a callable with two
parameters, job of type :class:`~psij.Job` and job_status of type
:class:`~psij.JobStatus` returning nothing.
"""
if isinstance(cb, JobStatusCallback):
self._cb = cb
else:
self._cb = FunctionJobStatusCallback(cb)
def __str__(self) -> str:
"""Returns a string representation of this executor."""
if self.url is not None:
return 'JobExecutor[{}, {}, url={}]'.format(getattr(self.__class__, '_NAME_'),
getattr(self.__class__, '_VERSION_'),
self.url)
else:
return 'JobExecutor[{}, {}]'.format(getattr(self.__class__, '_NAME_'),
getattr(self.__class__, '_VERSION_'))
[docs] @staticmethod
def get_instance(name: str, version_constraint: Optional[str] = None, url: Optional[str] = None,
config: Optional[JobExecutorConfig] = None) -> 'JobExecutor':
"""
Returns an instance of a `JobExecutor`.
:param name: The name of the executor to return. This must be one of the values returned
by :func:`~psij.JobExecutor.get_executor_names`. If the value of the `name` parameter
is not one of the valid values returned by
:func:`~psij.JobExecutor.get_executor_names`, `ValueError` is raised.
:param version_constraint: A version constraint for the executor in the form
'(' <op> <version>[, <op> <version[, ...]] ')', such as "( > 0.0.2, != 0.0.4)".
:param url: An optional URL to pass to the `JobExecutor` instance.
:param config: An optional configuration to pass to the instance.
:return: A JobExecutor.
"""
# might want to cache these instances if url and config match
selected = _get_plugin_class(name, version_constraint, 'executor', JobExecutor._executors)
assert selected.ecls is not None
assert issubclass(selected.ecls, JobExecutor)
setattr(selected.ecls, '_NAME_', name)
setattr(selected.ecls, '_VERSION_', selected.version)
instance = selected.ecls(url=url, config=config)
return instance
[docs] @staticmethod
def register_executor(desc: Descriptor, root: str) -> None:
"""
Registers a `JobExecutor` class through a :class:`~psij.descriptor.Descriptor`.
The class can then be later instantiated using :func:`~psij.JobExecutor.get_instance`.
Parameters
----------
desc
A :class:`~psij.descriptor.Descriptor` with information about the executor to
be registered.
root
A filesystem path under which the implementation of the executor is to be loaded from.
Executors from other locations, even if under the correct package, will not be
registered by this method. If an executor implementation is only available under a
different root path, this method will throw an exception.
"""
_register_plugin(desc, root, 'executor', JobExecutor._executors)
@staticmethod
def _print_plugin_status() -> None:
_print_plugin_status(JobExecutor._executors, 'executor')
@staticmethod
def _check_cls_attr(ecls: Type['JobExecutor'], attr: str, name: str) -> None:
if not hasattr(ecls, attr):
raise ValueError('Class {} is missing the executor {} attribute, "{}"'.
format(ecls, name, attr))
[docs] @staticmethod
def get_executor_names() -> Set[str]:
"""
Returns a set of registered executor names.
Names returned by this method can be passed to :func:`~psij.JobExecutor.get_instance` as
the `name` parameter.
Returns
-------
A set of executor names corresponding to the known executors.
"""
return set(JobExecutor._executors.keys())
def _get_launcher(self, name: str, version_constraint: Optional[str] = None) -> Launcher:
with self._launchers_lock:
if name not in self._launchers:
self._launchers[name] = Launcher.get_instance(name,
version_constraint=version_constraint,
config=self.config)
return self._launchers[name]
def _set_job_status(self, job: Job, status: JobStatus) -> None:
try:
job.status = status
except Exception as ex:
logger.warning('Failed to set status for job %s: %s', job.id, ex)
def _notify_callback(self, job: Job, status: JobStatus) -> None:
if self._cb:
try:
self._cb.job_status_changed(job, status)
except Exception as ex:
logger.warning('Job status callback for %s threw an exception: %s', job.id, ex)