# -*- coding: utf-8 -*-
# cython: language_level=3, always_allow_keywords=True
## Copyright 2000-2024 by LivingLogic AG, Bayreuth/Germany
## Copyright 2000-2024 by Walter Dörwald
##
## All Rights Reserved
##
## See ll/xist/__init__.py for the license
r"""
.. program:: sisyphus
:mod:`sisyphus` simplifies running Python stuff as jobs.
This can either be done under the direction of a cron daemon or a similar
process runner, then :mod:`sisyphus` makes sure that there will be no more
than one job of a certain name running at any given time.
Or :mod:`sisyphus` can be used as its own minimal cron daemon and can
execute the job repeatedly.
A job has a maximum allowed runtime. If this maximum is exceeded, the job will
kill itself. In addition to that, job execution can be logged and in case of
job failure an email can be sent, a message can be posted to a `Mattermost
chat channel`__ or an event can be emitted to a `Sentry server`__.
To use this module, you must derive your own class from :class:`Job`,
implement the :meth:`~Job.execute` method and then call the module level
function :func:`execute` or :func:`executewithargs` with your job object
(preferably in an ``if __name__ == "__main__"`` block).
Logs will (by default) be created in the :file:`~/ll.sisyphus` directory.
This can be changed by overwriting the appropriate methods in the subclass.
To execute a job, use the module level function :func:`execute` (or
:func:`executewithargs` when you want to support command line arguments).
__ https://mattermost.com/
__ https://sentry.io/
Example
-------
The following example illustrates the use of this module:
.. sourcecode:: python
import os
import urllib.request
from ll import sisyphus
class Fetch(sisyphus.Job):
projectname = "ACME.FooBar"
jobname = "Fetch"
argdescription = "fetch http://www.python.org/ and save it to a local file"
maxtime = 3 * 60
def __init__(self):
self.url = "http://www.python.org/"
self.tmpname = f"Fetch_Tmp_{os.getpid()}.html"
self.officialname = "Python.html"
def execute(self):
self.log(f"fetching data from {self.url!r}")
data = urllib.request.urlopen(self.url).read()
datasize = len(data)
self.log(f"writing file {self.tmpname!r} ({datasize:,} bytes)")
with open(self.tmpname, "wb") as f:
f.write(data)
self.log(f"renaming file {self.tmpname!r} to {self.officialname!r}")
os.rename(self.tmpname, self.officialname)
return f"cached {self.url!r} as {self.officialname!r} ({datasize:,} bytes)"
if __name__ == "__main__":
sisyphus.executewithargs(Fetch())
You will find the log files for this job in
:file:`~/ll.sisyphus/ACME.FooBar/Fetch/`.
Result status of a job run
--------------------------
The method :meth:`Job.execute` (which must be overwritten to implement the jobs
main functionality) should return a one-line summary of what the job did
(this is called a "successful run"). It can also return :const:`None` to report
that the job had nothing to do (this is called an "uneventful run").
Apart from "uneventful" and "successful" runs, the following results are
possible:
"interrupted"
The job failed with an :exc:`KeyboardInterrupt`.
"failed"
The job failed with an exception (other than :exc:`KeyboardInterrupt`).
"timeout"
The job ran longer than that the allowed maximum runtime.
Repeat mode
-----------
Normally sisyphus jobs run under the control of a cron daemon or similar process
runner. In this mode the method :meth:`Job.execute` is executed once and after
that, execution of the Python script ends.
However it is possible to activate repeat mode with the class/instance attribute
``repeat`` (or the command line option :option:`--repeat`).
If ``repeat`` is true, execution of the job will be repeated indefinitely.
By default the next job run starts immediately after the end of the previous
run, but it is possible to delay the next run. For this the class/instance
attribute ``nextrun`` (or the command line option :option:`--nextrun`) can be
used. In its simplest form this is the number of seconds to wait until the next
job run is started. It can also be a :class:`datetime.timedelta` object that
specifies the delay, or it can be a :class:`datetime.datetime` object specifying
the next job run. Furthermore ``nextrun`` can be callable (so it can be
implemented as a method) and can return any of the types :class:`int`,
:class:`float`, :class:`datetime.timedelta` or :class:`datetime.datetime`.
And, if ``Job.nextrun`` is :const:`None`, the job run will be repeated
immediately.
Logging and tags
----------------
Logging itself is done by calling :meth:`~Job.log`:
.. sourcecode:: python
self.log(f"can't parse XML file {filename}")
This logs the argument without tagging the line.
It is possible to add tags to the logging call. This is done by accessing
attributes of the ``log`` pseudo method. I.e. to add the tags ``xml`` and
``warning`` to a log call you can do the following:
.. sourcecode:: python
self.log.xml.warning(f"can't parse XML file {filename}")
It's also possible to do this via ``__getitem__`` calls, i.e. the above can be
written like this:
.. sourcecode:: python
self.log['xml']['warning'](f"can't parse XML file {filename}")
:mod:`sisyphus` itself uses the following tags:
``sisyphus``
This tag will be added to all log lines produced by :mod:`sisyphus`
itself.
``init``
This tag is used for the log lines output at the start of the job.
``report``
This tag will be added for all log messages related to sending the
failure report email.
``result``
This tag is used for the final line written to the log files that shows a
summary of what the job did (or why it failed).
``fail``
This tag is used in the result line if the job failed with an exception.
``errors``
This tag is used in the result line if the job ran to completion, but some
exceptions where logged.
``ok``
This tag is used in the result line if the job ran to completion without
any exceptions.
``kill``
This tag is used in the result line if the job was killed because it
exceeded the maximum allowed runtime.
``info``
This tag is used for all other informational log messages output by
:mod:`sisyphus` itself (like log file cleanup etc.).
Exceptions
----------
When an exception object is passed to ``self.log`` the tag ``exc`` will be added
to the log call automatically.
Delayed logs
------------
If a log message has the tag ``delay`` it is considered a delayed message.
Delayed messages will be buffered up until the first log message that isn't
delayed is encountered (:mod:`sisyphus`\s messages all are delayed).
Then all buffered messages will be output. If only delayed messages are output
during the complete job run, only the result of the job run will be output.
If this output is ``None`` nothing will be output. This means that you will get
no log entries until something "interesting" happens.
Log files
---------
By default logging is done to the log file (whose name changes from run to run
as it includes the start time of the job).
However logging to ``stdout`` and ``stderr`` can also be activated.
Logfiles for uneventful runs wil be deleted after the run.
Multiple links will be created that automatically point to the last log file.
The "current" link (by default named :file:`current.sisyphuslog`) will always
point to the log file of the currently running job. If no job is running,
but the last run was eventful, it will point to the newest log file. If the last
run was uneventful the link will point to a nonexistent log file (whose name can
be used to determine the date of the last run).
The following links will be created at the end of the job run and will only
start to point to non-existent files when the log files they point to get
cleaned up:
* The "last successful" link (by default named
:file:`last_successful.sisyphuslog`) will always point to the last
successful job run,
* :file:`last_failed.sisyphuslog` points to the last failed run,
* :file:`last_interrupted.sisyphuslog` points to the last interrupted run and
* :file:`last_timeout.sisyphuslog` points to the last run that timed out.
Email
-----
It is possible to send an email when a job fails. For this, the options
:option:`--fromemail`, :option:`--toemail` and :option:`--smtphost` (or the
appropriate class attributes) have to be set. If the job terminates because of
an exception or exceeds its maximum runtime (and the option
:option:`--noisykills` is set) or any of the calls to :meth:`~Job.log` include
the tag ``email`` or ``external``, an email will be sent. This email includes
the last 10 logging calls and the final exception (if there is any) in plain
text and HTML format as well as as a JSON attachment.
Mattermost
----------
It is possible to send log entries to a Mattermost_ chat channel. For this the
options :option:`--mattermost_url`, :option:`--mattermost_channel` and
:option:`--mattermost_token` (or the appropriate class attributes) must be
specified. All log entries including the tag ``mattermost`` or ``external``,
as well as all exceptions that abort the job will be sent to the Mattermost
channel.
.. _Mattermost: https://mattermost.com/
Sentry
------
It is possible to send log entries to a Sentry_ server. For this the
option :option:`--sentry_dsn` (or the appropriate class attribute) must be
specified. All log entries including the tag ``sentry`` or ``external``, as
well as all exceptions that abort the job will be sent to the Sentry server.
.. _Sentry: https://sentry.io/
If the logging call includes any of the tags ``fatal``, ``error``, ``warning``,
``info``, ``debug`` this will be used as the event level. If the log argument
is an exception the event level will be ``fatal``. Otherwise it wil default to
``info``.
All tags will be converted to Sentry tags like this: A sisyphus tag ``foo``
will be converted into a Sentry tag ``sisypus.tag.foo`` with a value of ``true``.
Active tasks will be converted into Sentry breadcrumbs (See the methods
:meth:`~Job.task` and :meth:`~Job.tasks` for more info).
Health checks
-------------
When a job is started with the option :option:`--healthcheck`, instead of
running the job normally a health check is done. This bypasses the normal
mechanism that prevents multiple instances of the job from running (i.e. you can
have a normal job execution and a health check running in parallel).
If the job is healthy this will exit with an exit status of 0, otherwise it will
exit with an exit status of 1 and an error message on ``stdout`` stating the
reason why the job is considered unhealthy. There are three possible scenarios
for this:
1. The job has never been run.
2. The last run has ended with an error.
3. The last run was too long ago.
To configure how scenario 3 is handled the class/instance attribute
``maxhealthcheckage`` (or the command line option
:option:`--maxhealthcheckage`) can be used. In its simplest form this is a
number of seconds or a :class:`datetime.timedelta` object. A job run that is
older that this value triggers scenario 3. ``maxhealthcheckage`` can be also be
a :class:`datetime.datetime` object specifying the cut-off date.
Furthermore ``maxhealthcheckage`` can be callable (so it can be implemented
as a method) and can return any of the types :class:`int`, :class:`float`,
:class:`datetime.timedelta` or :class:`datetime.datetime`.
And if ``Job.maxhealthcheckage`` is :const:`None`, scenario 3 will never trigger.
Requirements
------------
To reliably stop the job after the allowed maximum runtime, :mod:`sisyphus`
forks the process and kills the child process after the maximum runtime is
expired (via :func:`os.fork` and :func:`signal.signal`). This won't work on
Windows. So on Windows the job will always run to completion without being
killed after the maximum runtime.
To make sure that only one job instance runs concurrently, :mod:`sisyphus`
uses :mod:`fcntl` to create an exclusive lock on the file of the running script.
This won't work on Windows either. So on Windows you might have multiple
running instances of the job.
:mod:`sisyphus` uses the module :mod:`setproctitle` to change the process
title during various phases of running the job. If :mod:`setproctitle` is not
available the process title will not be changed.
If the module :mod:`psutil` is available it will be used to kill the child
process and any of its own child processes after the maximum runtime of the job
is exceeded. If :mod:`psutil` isn't available just the child process will be
killed (which is no problem as long as the child process doesn't spawn any
other processes).
If logging to Mattermost is used, :mod:`requests` has to be installed.
If logging to Sentry is used, :mod:`sentry_sdk` has to be installed.
For compressing the log files one of the modules :mod:`gzip`, :mod:`bz2` or
:mod:`lzma` is required (which might not be part of your Python installation).
Module documentation
--------------------
"""
import sys, os, argparse, time, datetime, pathlib, enum, types
import signal, traceback, pprint, tokenize, json, smtplib, operator, itertools
from typing import *
from typing import TextIO
try:
import fcntl
except ImportError:
fcntl = None
try:
import gzip
except ImportError:
gzip = None
try:
import bz2
except ImportError:
bz2 = None
try:
import lzma
except ImportError:
lzma = None
from email.mime import text, application, multipart
from email import encoders
try:
import psutil
except ImportError:
psutil = None
try:
import setproctitle
except ImportError:
setproctitle = None
from ll import url, ul4c, ul4on, misc
__docformat__ = "reStructuredText"
###
### Typing stuff
###
T = TypeVar("T")
OptStr = Optional[str]
OptInt = Optional[int]
OptStrFromCall = str | None | Callable[..., str | None]
OptDictFromCall = dict | None | Callable[..., dict | None]
Tags = Tuple[str, ...]
LogList = List[Tuple[datetime.datetime, Tags, List["Task"], Any]]
###
### Helper functions and classes
###
def _formattraceback(exc: BaseException) -> str:
return "".join(traceback.format_exception(exc.__class__, exc, exc.__traceback__))
def _formatlines(obj: Any) -> List[str]:
if isinstance(obj, BaseException):
obj = _formattraceback(obj)
elif not isinstance(obj, str):
obj = pprint.pformat(obj)
lines = obj.splitlines()
while lines and not lines[0].strip():
del lines[0]
while lines and not lines[-1].strip():
del lines[-1]
return lines
def argdays(value: str | int | datetime.timedelta) -> datetime.timedelta:
if isinstance(value, str):
value = int(value)
if not isinstance(value, datetime.timedelta):
value = datetime.timedelta(days=value)
return value
def argseconds(value: str | int | datetime.timedelta) -> datetime.timedelta:
if isinstance(value, str):
value = int(value)
if not isinstance(value, datetime.timedelta):
value = datetime.timedelta(seconds=value)
return value
def env(varname: str) -> OptStr:
return os.environ.get(varname, None)
def get_mtime(filename: pathlib.Path) -> datetime.datetime:
return datetime.datetime.fromtimestamp(filename.stat().st_mtime)
def get_utime(filename: pathlib.Path) -> Tuple[datetime.datetime, datetime.datetime]:
stat = filename.stat()
return (datetime.datetime.fromtimestamp(stat.st_atime), datetime.datetime.fromtimestamp(stat.st_mtime))
def set_utime(filename: pathlib.Path, atime: datetime.datetime, mtime: datetime.datetime) -> None:
os.utime(str(filename), times=(atime.timestamp(), mtime.timestamp()))
class DatetimeEncoder(json.JSONEncoder):
def default(self, obj: Any) -> str:
if isinstance(obj, datetime.datetime):
return obj.isoformat()
return super().default(obj)
[docs]
class Status(enum.IntEnum):
"""
The result status of a job run.
Possible values are:
* ``UNEVENTFUL``,
* ``SUCCESSFUL``,
* ``FAILED``,
* ``INTERRUPTED``,
* ``TIMEOUT``.
"""
UNEVENTFUL = 0
SUCCESSFUL = 1
FAILED = 2
INTERRUPTED = 3
TIMEOUT = 4
[docs]
class Process(enum.Enum):
"""
The type of a running :mod:`!sisyphus` process.
Possible values are:
* ``SOLO`` (when in non-forking mode),
* ``PARENT`` (the parent process in forking mode),
* ``CHILD`` (the child process in forking mode).
"""
SOLO = 0
PARENT = 1
CHILD = 2
###
### The main class
###
[docs]
class Job:
"""
A Job object executes a task (either once or repeatedly).
To use this class, derive your own class from it and overwrite the
:meth:`~Job.execute` method.
The job can be configured in three ways: By class attributes in the
:class:`Job` subclass, by attributes of the :class:`Job` instance (e.g. set
in :meth:`__init__`) and by command line arguments (if :func:`executewithargs`
is used). The following command line arguments are supported (the name of the
attribute is the same as the long command line argument name):
.. option:: -p <projectname>, --projectname <projectname>
The name of the project this job belongs to. This might be a dot-separated
hierarchical project name (e.g. including customer names or similar stuff).
.. option:: -j <jobname>, --jobname <jobname>
The name of the job itself (defaulting to the name of the class if none
is given).
.. option:: --identifier <identifier>
An additional identifier that will be added to the failure report email.
.. option:: --fromemail <emailadress>
The sender email address for the failure report email.
This email will only be sent if the options :option:`--fromemail`,
:option:`--toemail` and :option:`--smtphost` are set (and any error
or output to the email log occured, which only happens when the log entry
has the tag ``email`` or ``external``, or if it is an exception that
aborts the job run).
.. option:: --toemail <emailadress>
An email address where an email will be sent in case of a failure.
.. option:: --smtphost <servername>
The SMTP server to be used for sending the failure report email.
.. option:: --smtpport <integer>
The port number used for the connection to the SMTP server.
.. option:: --smtpuser <username>
The user name used to log into the SMTP server. (Login will only be done
if both :option:`--smtpuser` and :option:`--smtppassword` are given)
.. option:: --smtppassword <password>
The password used to log into the SMTP server.
.. option:: --mattermost_url <url>
The URL where log entries can be posted to a Mattermost chat. For
example:
.. sourcecode:: text
https://mattermost.example.org/api/v4/posts
A log entry will only be posted to the Mattermost chat channel if the
options :option:`--mattermost_url`, :option:`--mattermost_channel` and
:option:`--mattermost_token` are set (and the log entry has the tag
``mattermost`` or ``external`` or is an exception that aborts the job run).
Note that using this feature requires :mod:`requests`.
.. option:: --mattermost_channel <id>
The channel id of the Mattermost chat channel where log entries should be
posted. For example:
.. sourcecode:: text
4cnszmopr3ntjexi4qmx499inc
.. option:: --mattermost_token <auth>
The "Personal Access Token" used for authorizing the post with the
Mattermost server. For example:
.. sourcecode:: text
9xuqwrwgstrb3mzrxb83nb357a
.. option:: --sentry_dsn <dsn>
Sentry DSN for logging to a Sentry server. Something like:
.. sourcecode:: text
https://examplePublicKey@o0.ingest.sentry.io/0
.. option:: --sentry_environment <environment>
Environment reported to Sentry.
.. option:: --sentry_release <release>
Release reported to Sentry.
.. option:: --sentry_debug <flag>
Activates/deactivates Sentry debug mode.
(Allowed ``<flag>`` values are ``false``, ``no``, ``0``, ``true``,
``yes`` or ``1``)
A log entry will only be sent to Sentry if the options
:option:`--sentry_dsn` is are set (and the log entry has the tag
``sentry`` or ``external``, or is an exception that aborts the job run).
.. option:: -m <seconds>, --maxtime <seconds>
Maximum allowed runtime for the job (as the number of seconds). If the job
runs longer than that it will kill itself.
(The instance attribute will always be converted to the type
:class:`datetime.timedelta`)
.. option:: --fork <flag>
Forks the process and does the work in the child process. The parent
process is responsible for monitoring the maximum runtime (this is the
default). In non-forking mode the single process does both the work and
the runtime monitoring.
(Allowed ``<flag>`` values are ``false``, ``no``, ``0``, ``true``,
``yes`` or ``1``)
.. option:: --noisykills <flag>
Should a message be printed/a failure email be sent when the maximum
runtime is exceeded?
(Allowed ``<flag>`` values are ``false``, ``no``, ``0``, ``true``,
``yes`` or ``1``)
.. option:: --exit_on_error <flag>
End job execution even in repeat mode when an exception is thrown?
(Allowed ``<flag>`` values are ``false``, ``no``, ``0``, ``true``,
``yes`` or ``1``)
.. option:: -n <flag>, --notify <flag>
Should a notification be issued to the OS X Notification center?
(done via terminal-notifier__).
(Allowed ``<flag>`` values are ``false``, ``no``, ``0``, ``true``,
``yes`` or ``1``)
__ https://github.com/alloy/terminal-notifier
.. option:: -r <flag>, --repeat <flag>
Should job execution be repeated indefinitely?
(This means that the job basically functions as its own cron daemon).
(Allowed ``<flag>`` values are ``false``, ``no``, ``0``, ``true``,
``yes`` or ``1``)
.. option:: --nextrun <seconds>
How many seconds should we wait after a job run before the next run gets
started (only when :option:`--repeat` is set)?
The class/instance attribute can also be a callable (i.e. it's possible
to implement this as a method). Also :class:`datetime.datetime` is
supported and specifies the start date for the next job run.
.. option:: --healthcheck <flag>
Instead of normally executing the job, run a health check instead.
(Allowed ``<flag>`` values are ``false``, ``no``, ``0``, ``true``,
``yes`` or ``1``)
.. option:: --maxhealthcheckage <seconds>
If the last uneventful or successful job run is older then this number
of seconds, consider the job to be unhealthy.
.. option:: -f <flag>, --log2file <flag>
Should a logfile be written at all?
(Allowed ``<flag>`` values are ``false``, ``no``, ``0``, ``true``,
``yes`` or ``1``)
.. option:: --formatlogline <format>
An UL4 template for formatting each line in the logfile. Available
variables are ``time`` (current time), ``starttime`` (start time of the
job), ``tags`` (list of tags for the line) and ``line`` (the log line
itself).
.. option:: --keepfilelogs <days>
The number of days the logfiles are kept. Old logfiles (i.e. all files in
the same directory as the current logfile that are more than
``keepfilelogs`` days old) will be removed at the end of the job.
(The instance attribute will always be converted to the type
:class:`datetime.timedelta`)
.. option:: --compressfilelogs <days>
The number of days after which log files are compressed (if they aren't
deleted via :option:`--keepfilelogs`).
(The instance attribute will always be converted to the type
:class:`datetime.timedelta`)
.. option:: --compressmode <mode>
How to compress the logfiles. Possible values are: ``"gzip"``, ``"bzip2"``
and ``"lzma"``. The default is ``"bzip2"``.
.. option:: --encoding <encodingname>
The encoding to be used for the logfile. The default is ``"utf-8"``.
.. option:: --errors <errorhandlingname>
Encoding error handler name (goes with :option:`--encoding`). The default is
``"strict"``.
.. option:: --maxemailerrors <integer>
This options limits the number of exceptions and errors messages that
will get attached to the failure email. The default is 10.
.. option:: --proctitle <flag>
When this options is specified, the process title will be modified during
execution of the job, so that the :command:`ps` command shows what the
processes are doing. The default is ``True``. (This
requires :mod:`setproctitle`.)
(Allowed ``<flag>`` values are ``false``, ``no``, ``0``, ``true``,
``yes`` or ``1``)
Command line arguments take precedence over instance attributes (if
:func:`executewithargs` is used) and those take precedence over class
attributes.
Furthermore the following class attribute can be set to customize the
help message:
:attr:`argdescription`
Description for the help message of the command line argument parser.
"""
projectname = None
jobname = None
argdescription = "execute the job"
fromemail = None
toemail = None
smtphost = None
smtpport = 0
smtpuser = None
smtppassword = None
mattermost_url = None
mattermost_channel = None
mattermost_token = None
sentry_dsn = None
sentry_release = None
sentry_environment = None
sentry_debug = False
identifier = None
maxtime = datetime.timedelta(minutes=5)
fork = True
noisykills = False
exit_on_error = False
notify = False
repeat = False
nextrun = None
waitchildbreak = datetime.timedelta(seconds=0.5)
runhealthcheck = False
maxhealthcheckage = None
[docs]
def basedir(self) -> pathlib.Path:
"""
Return the base directory where all log files will be kept.
The path must be absolute.
"""
path = pathlib.Path(
"~",
"ll.sisyphus",
self.projectname,
self.jobname if self.identifier is None else f"{self.jobname}.{self.identifier}",
)
return path.expanduser().absolute()
[docs]
def logfilename(self) -> Optional[pathlib.Path]:
"""
Return the filename of the logfile for this job.
The value must by an absolute :class:`pathlib.Path` object (or ``None``
to disable creating the logfile).
"""
return self.basedir() / f"{self.starttime:%Y-%m-%d %H-%M-%S_%f}.sisyphuslog"
[docs]
def currentloglinkname(self) -> Optional[pathlib.Path]:
"""
Return the filename of the link to the currently active logfile.
The value must by an absolute :class:`pathlib.Path` object (or ``None``
to disable creating the link).
"""
return self.basedir() / f"current.sisyphuslog"
[docs]
def lastsuccessfulloglinkname(self) -> Optional[pathlib.Path]:
"""
Return the filename of the link that points to the logfile of the last
successful run of the job.
The value must by an absolute :class:`pathlib.Path` object (or ``None``
to disable creating the link).
"""
return self.basedir() / f"last_successful.sisyphuslog"
[docs]
def lastfailedloglinkname(self) -> Optional[pathlib.Path]:
"""
Return the filename of the link that points to the logfile of the last
failed run of the job.
The value must by an absolute :class:`pathlib.Path` object (or ``None``
to disable creating the link).
"""
return self.basedir() / f"last_failed.sisyphuslog"
[docs]
def lastinterruptedloglinkname(self) -> Optional[pathlib.Path]:
"""
Return the filename of the link that points to the logfile of the last
interrupted run of the job.
The value must by an absolute :class:`pathlib.Path` object (or ``None``
to disable creating the link).
"""
return self.basedir() / f"last_interrupted.sisyphuslog"
[docs]
def lasttimeoutloglinkname(self) -> Optional[pathlib.Path]:
"""
Return the filename of the link that points to the logfile of the last
run of the job with a timeout.
The value must by an absolute :class:`pathlib.Path` object (or ``None``
to disable creating the link).
"""
return self.basedir() / f"last_timeout.sisyphuslog"
[docs]
def healthfilename(self) -> pathlib.Path:
"""
Return the filename where the health of the last job run is stored.
The value must by an absolute :class:`pathlib.Path` object and may not be
``None``.
"""
return self.basedir() / f"current.sisyphushealth"
[docs]
def emailfilename(self, process: Optional[Process]=None) -> pathlib.Path:
"""
Return the filename where the parent and child process can log message
that should be part of the email report.
The value must by an absolute :class:`pathlib.Path` object and may not be
``None``.
"""
if process is None:
process = self.process
return self.basedir() / f"email.{process.name.lower()}.ul4on"
# URL of final log file (:const:`None` if no logging is done to a file)
logfileurl = None
log2file = True
log2stdout = False
log2stderr = False
formatlogline = r"""
[<?print time?>]
=
[t+<?print time-job.starttime?>]
<?for task in tasks?>
<?print " :: "?>
<?code desc = [task.type, task.name]?>
<?code desc = " ".join(str(d) for d in desc if d)?>
<?if task.index is not None?>
[
<?print task.index+1?>
<?if task.count is not None?>
/<?print task.count?>
<?end if?>
]
<?if desc?> <?end if?>
<?elif not desc?>
?
<?end if?>
<?print desc?>
<?end for?>
<?if tags?>
<?print " :: "?>
<?for tag in tags?>
{<?print tag?>}
<?end for?>
<?end if?>
<?print " >> "?>
<?print line?>
"""
formatemailsubject = r"""
<?print job.projectname?>/<?print job.jobname?> for <?print sysinfo.user_name?>@<?print sysinfo.host_fqdn?> (<?print sysinfo.host_ip?>)<?if log?> failed with <?print len(log)?> exceptions/messages<?end if?>
"""
formatemailbodytext = r"""
<?def line(label, value)?>
<?if value?>
<?code value = str(value).split("\n")?>
<?for line in value?>
<?print format(label, "11")?>: <?print line?><?print "\n"?>
<?code label = ""?>
<?end for?>
<?end if?>
<?end def?>
<?def tasklabel(task)?>
<?code desc = " ".join(str(part) for part in [task.type, task.name] if part)?>
<?if task.index is not None?>
[
<?print task.index+1?>
<?if task.count is not None?>
/<?print task.count?>
<?end if?>
]
<?if desc?>
<?print " "?>
<?end if?>
<?elif not desc?>
?
<?end if?>
<?print desc?>
<?if task.starttime?>
<?print " "?>
@
<?print " "?>
<?print task.starttime?>
<?end if?>
<?end def?>
<?render line("Project", job.projectname)?>
<?render line("Job", job.jobname)?>
<?render line("Identifier", job.identifier)?>
<?render line("Script", sysinfo.script_name)?>
<?render line("User", sysinfo.user_name)?>
<?render line("Python", sysinfo.python_executable)?>
<?render line("Version", sysinfo.python_version)?>
<?render line("Host", sysinfo.host_fqdn)?>
<?render line("IP", sysinfo.host_ip)?>
<?render line("PID", sysinfo.pid)?>
<?render line("Start", job.starttime)?>
<?render line("End", job.endtime)?>
<?if job.starttime and job.endtime?>
<?render line("Duration", job.endtime-job.starttime)?>
<?end if?>
<?render line("Exceptions", countexceptions)?>
<?render line("Messages", countmessages)?>
<?render line("Logfile", job.logfileurl)?>
<?code reportedexceptions = 0?>
<?code reportedmessages = 0?>
<?for (i, entry) in enumerate(log, 1)?>
<?print "\n"?>
<?print "-"*80?><?print "\n"?>
<?print "\n"?>
<?if entry.type == "exception"?>
<?code reportedexceptions += 1?>
#<?print i?>: Exception<?print "\n"?>
<?print "\n"?>
<?for task in entry.tasks?>
<?render line("Task", tasklabel.renders(task))?>
<?end for?>
<?render line("Class", entry.class)?>
<?render line("Value", entry.value)?>
<?if entry.traceback?>
<?print "\n"?>
<?print entry.traceback?>
<?end if?>
<?elif entry.type == "message"?>
<?code reportedmessages += 1?>
#<?print i?>: Message<?print "\n"?>
<?print "\n"?>
<?for task in entry.tasks?>
<?render line("Task", tasklabel.renders(task))?>
<?end for?>
<?render line("Message", entry.message)?>
<?end if?>
<?end for?>
<?if countexceptions + countmessages > reportedexceptions + reportedmessages?>
<?print "\n"?>
<?print "-"*80?><?print "\n"?>
<?if countexceptions > reportedexceptions?><?print countexceptions - reportedexceptions?> more exceptions<?end if?>
<?if countexceptions > reportedexceptions and countmessages > reportedmessages?> and<?end if?>
<?if countmessages > reportedmessages?><?print countmessages - reportedmessages?> more messages<?end if?>
...<?print "\n"?>
<?end if?>
"""
formatemailbodyhtml = r"""
<?note Subtemplates?>
<?def line(label, value, whitespace=None)?>
<?if value?>
<tr style="vertical-align: baseline;"><th style="text-align:right;"><?printx label?></th><td style="padding-left: 1em;<?if whitespace?>white-space: <?printx whitespace?>;<?end if?>"><?printx value?></td></tr>
<?end if?>
<?end def?>
<?def tasklabel(task)?>
<?code desc = [task.type, task.name]?>
<?code desc = " ".join(str(d) for d in desc if d)?>
<?if task.index is not None?>
[
<?printx task.index+1?>
<?if task.count is not None?>
/<?printx task.count?>
<?end if?>
]
<?if desc?>
<?printx " "?>
<?end if?>
<?elif not desc?>
?
<?end if?>
<?printx desc?>
<?if task.starttime?>
<?printx " "?>
@
<?printx " "?>
<?printx task.starttime?>
<?end if?>
<?end def?>
<?xml version='1.0' encoding='utf-8'?>
<html>
<head>
<title><?printx job.projectname?>/<?printx job.jobname?> for <?printx sysinfo.user_name?>@<?printx sysinfo.host_fqdn?> (<?printx sysinfo.host_ip?>) failed</title>
</head>
<body style="font-family: monospace;">
<h1><?printx job.projectname?>/<?printx job.jobname?> for <?printx sysinfo.user_name?>@<?printx sysinfo.host_fqdn?> (<?printx sysinfo.host_ip?>) failed</h1>
<table>
<?render line("Project", job.projectname)?>
<?render line("Job", job.jobname)?>
<?render line("Identifier", job.identifier)?>
<?render line("Script", sysinfo.script_name)?>
<?render line("User", sysinfo.user_name)?>
<?render line("Python", sysinfo.python_executable)?>
<?render line("Version", sysinfo.python_version)?>
<?render line("Host", sysinfo.host_fqdn)?>
<?render line("IP", sysinfo.host_ip)?>
<?render line("PID", sysinfo.pid)?>
<?render line("Start", job.starttime)?>
<?render line("End", job.endtime)?>
<?if job.starttime and job.endtime?>
<?render line("Duration", job.endtime-job.starttime)?>
<?end if?>
<?render line("Exceptions", countexceptions)?>
<?render line("Messages", countmessages)?>
<?render line("Logfile", job.logfileurl)?>
</table>
<?code reportedexceptions = 0?>
<?code reportedmessages = 0?>
<?for (i, entry) in enumerate(log, 1)?>
<hr/>
<?if entry.type == "exception"?>
<?code reportedexceptions += 1?>
<h2>#<?printx i?>: Exception</h2>
<table>
<?for task in entry.tasks?>
<?render line("Task", tasklabel.renders(task), whitespace="pre")?>
<?end for?>
<?render line("Timestamp", entry.timestamp)?>
<?render line("Class", entry.class)?>
<?render line("Value", entry.value)?>
</table>
<?if entry.traceback?>
<h3>Traceback<h3>
<pre style="font-weight:normal;">
<?printx entry.traceback?>
</pre>
<?end if?>
<?else?>
<?code reportedmessages += 1?>
<h2>#<?printx i?>: Message</h2>
<table>
<?for task in entry.tasks?>
<?render line("Task", tasklabel.renders(task), whitespace="pre")?>
<?end for?>
<?render line("Timestamp", entry.timestamp)?>
<?render line("Message", entry.message, whitespace="pre")?>
</table>
<?end if?>
<?end for?>
<?if countexceptions + countmessages > reportedexceptions + reportedmessages?>
<hr/>
<p>
<?if countexceptions > reportedexceptions?><?print countexceptions - reportedexceptions?> more exceptions<?end if?>
<?if countexceptions > reportedexceptions and countmessages > reportedmessages?> and<?end if?>
<?if countmessages > reportedmessages?><?print countmessages - reportedmessages?> more messages<?end if?>
...
</p>
<?end if?>
</body>
</html>
"""
formatmattermosttitle = r"""
<?if "error" in tags?>
<?if type == "exc"?>
<?code header = "Exception"?>
<?elif type == "obj"?>
<?code header = "Error object"?>
<?else?>
<?code header = "Error message"?>
<?end if?>
<?else?>
<?if type == "exc"?>
<?code header = "Exception"?>
<?elif type == "obj"?>
<?code header = "Object"?>
<?else?>
<?code header = "Message"?>
<?end if?>
<?end if?>
<?print header?> in sisyphus job `<?print job.projectname?>`/`<?print job.jobname?>` from `<?print sysinfo.user_name?>@<?print sysinfo.host_fqdn?>` (<?print sysinfo.host_ip?>)
"""
formatmattermostmessage = r"""
<?if type == "exc"?>
```
<?print "\n"?>
<?print message?>
<?print "\n"?>
```
<?elif type == "obj"?>
```py
<?print "\n"?>
<?print message?>
<?print "\n"?>
```
<?else?>
<?print message?>
<?end if?>
<?print "\n"?>
<?if tags?>
**Tags**: <?for (f, t) in isfirst(tags)?><?if not f?>, <?end if?>`<?print t?>`<?end for?>
<?print "\n"?>
<?end if?>
<?if len(tasks) > 1?>
**Task**:<?print " "?>
<?print " "?>
<?for (f, task) in isfirst(tasks[1:])?>
<?if not f?>
<?print " ⟶ "?>
<?end if?>
<?code output = False?>
<?if task.type is not None?>
<?if output?> <?end if?>
`<?print task.type?>`
<?code output = True?>
<?end if?>
<?if task.name is not None?>
<?if output?> <?end if?>
`<?print task.name?>`
<?code output = True?>
<?end if?>
<?if task.index is not None?>
<?if output?> <?end if?>
[
<?print task.index+1?>
<?if task.count is not None?>
/<?print task.count?>
<?end if?>
]
<?code output = True?>
<?end if?>
<?if not output?>
?
<?end if?>
<?end for?>
<?print "\n"?>
<?end if?>
**Timestamp**: <?print time?> — t+<?print time-job.starttime?>
<?print "\n"?>
"""
keepfilelogs = datetime.timedelta(days=30)
compressfilelogs = datetime.timedelta(days=7)
compressmode = "bzip2"
maxemailerrors = 10
proctitle = True
encoding = "utf-8"
errors = "strict"
ul4_attrs = {"sysinfo", "projectname", "jobname", "identifier", "maxtime", "starttime", "endtime", "maxemailerrors", "logfileurl"}
process = Process.SOLO
[docs]
def execute(self) -> OptStr:
"""
Execute the job once.
Overwrite in subclasses to implement your job functionality.
The return value is a one line summary of what the job did.
When this method returns :const:`None` instead this tells the job
machinery that the run of the job was uneventful and that the logfile
can be deleted.
"""
return "done"
[docs]
def healthcheck(self) -> OptStr:
"""
Called in parallel to a running job to check whether the job is healthy.
Returns ``None`` if everything is ok, or an error message otherwise.
"""
healthfilename = self.healthfilename()
try:
lastwrite = get_mtime(healthfilename)
cutoff = self._calc_maxhealthcheckage()
if lastwrite < cutoff:
return f"Not running since {cutoff} (last run at {lastwrite}; {datetime.datetime.now()-lastwrite} ago)"
error = healthfilename.read_text(encoding=self.encoding, errors=self.errors)
return error.strip() or None
except FileNotFoundError:
return f"Healthfile {healthfilename} missing"
except ValueError:
return f"Healthfile {healthfilename} malformed"
return None
[docs]
def argparser(self) -> argparse.ArgumentParser:
"""
Return an :mod:`argparse` parser for parsing the command line arguments.
This can be overwritten in subclasses to add more arguments.
"""
p = argparse.ArgumentParser(description=self.argdescription, epilog="For more info see http://python.livinglogic.de/sisyphus.html")
p.add_argument("-p", "--projectname", dest="projectname", metavar="NAME", help="The name of the project this job belongs to (default: %(default)s)", default=self.projectname)
p.add_argument("-j", "--jobname", dest="jobname", metavar="NAME", help="The name of the job (default: %(default)s)", default=self.jobname if self.jobname is not None else self.__class__.__qualname__)
p.add_argument( "--fromemail", dest="fromemail", metavar="ADDRESS", help="The sender email address for the failure report email (default: %(default)s)", default=self.fromemail)
p.add_argument( "--toemail", dest="toemail", metavar="ADDRESS", help="An email address where failure reports will be sent (default: %(default)s)", default=self.toemail)
p.add_argument( "--smtphost", dest="smtphost", metavar="HOSTNAME", help="The SMTP server to use for sending the failure report email (default: %(default)s)", default=self.smtphost)
p.add_argument( "--smtpport", dest="smtpport", metavar="PORT", help="The port number used for the connection to the SMTP server (default: %(default)s)", type=int, default=self.smtpport)
p.add_argument( "--smtpuser", dest="smtpuser", metavar="USER", help="The user name used to log into the SMTP server. (default: %(default)s)", default=self.smtpuser)
p.add_argument( "--smtppassword", dest="smtppassword", metavar="PASSWORD", help="The password used to log into the SMTP server. (default: %(default)s)", default=self.smtppassword)
p.add_argument( "--mattermost_url", dest="mattermost_url", metavar="URL", help="URL for logging to mattermost chat channel. (default: %(default)s)", default=self.mattermost_url)
p.add_argument( "--mattermost_channel", dest="mattermost_channel", metavar="ID", help="Channel id for logging to mattermost chat. (default: %(default)s)", default=self.mattermost_channel)
p.add_argument( "--mattermost_token", dest="mattermost_token", metavar="AUTH", help="Channel id for logging to mattermost chat. (default: %(default)s)", default=self.mattermost_token)
p.add_argument( "--sentry_dsn", dest="sentry_dsn", metavar="DSN", help="Sentry DSN for logging to a Sentry server. (default: %(default)s)", default=self.sentry_dsn)
p.add_argument( "--sentry_environment", dest="sentry_environment", metavar="ENVIRONMENT", help="Environment reported to Sentry. (default: %(default)s)", default=self.sentry_environment)
p.add_argument( "--sentry_release", dest="sentry_release", metavar="RELEASE", help="Release reported to Sentry. (default: %(default)s)", default=self.sentry_release)
p.add_argument( "--sentry_debug", dest="sentry_debug", help="Activate Sentry debug mode. (default: %(default)s)", action=misc.FlagAction, default=self.sentry_debug)
p.add_argument( "--identifier", dest="identifier", metavar="IDENTIFIER", help="Additional identifier that will be added to the failure report mail (default: %(default)s)", default=self.identifier)
p.add_argument("-m", "--maxtime", dest="maxtime", metavar="SECONDS", help="Maximum number of seconds the job is allowed to run (default: %(default)s)", type=argseconds, default=self.maxtime)
p.add_argument( "--fork", dest="fork", help="Fork the process and do the work in the child process? (default: %(default)s)", action=misc.FlagAction, default=self.fork)
p.add_argument("-f", "--log2file", dest="log2file", help="Should the job log into a file? (default: %(default)s)", action=misc.FlagAction, default=self.log2file)
p.add_argument("-o", "--log2stdout", dest="log2stdout", help="Should the job log to stdout? (default: %(default)s)", action=misc.FlagAction, default=self.log2stdout)
p.add_argument("-e", "--log2stderr", dest="log2stderr", help="Should the job log to stderr? (default: %(default)s)", action=misc.FlagAction, default=self.log2stderr)
p.add_argument( "--keepfilelogs", dest="keepfilelogs", metavar="DAYS", help="Number of days log files are kept (default: %(default)s)", type=argdays, default=self.keepfilelogs)
p.add_argument( "--compressfilelogs", dest="compressfilelogs", metavar="DAYS", help="Number of days log after which log files are gzipped (default: %(default)s)", type=argdays, default=self.compressfilelogs)
p.add_argument( "--compressmode", dest="compressmode", metavar="MODE", help="Method for compressing old log files (default: %(default)s)", choices=("gzip", "bzip2", "lzma"), default=self.compressmode)
p.add_argument( "--maxemailerrors", dest="maxemailerrors", metavar="INTEGER", help="Maximum number of errors or messages to report in the failure report (default: %(default)s)", default=self.maxemailerrors)
p.add_argument( "--proctitle", dest="proctitle", help="Set the process title (default: %(default)s)", action=misc.FlagAction, default=self.proctitle)
p.add_argument( "--encoding", dest="encoding", metavar="ENCODING", help="Encoding for the log file (default: %(default)s)", default=self.encoding)
p.add_argument( "--errors", dest="errors", metavar="METHOD", help="Error handling method for encoding errors in log texts (default: %(default)s)", default=self.errors)
p.add_argument( "--noisykills", dest="noisykills", help="Should a message be printed/failure email be sent if the maximum runtime is exceeded? (default: %(default)s)", action=misc.FlagAction, default=self.noisykills)
p.add_argument( "--exit_on_error", dest="exit_on_error", help="Stop the job when an error happens in repeat mode? (default: %(default)s)", action=misc.FlagAction, default=self.exit_on_error)
p.add_argument("-n", "--notify", dest="notify", help="Should a notification be issued to the OS X notification center? (default: %(default)s)", action=misc.FlagAction, default=self.notify)
p.add_argument("-r", "--repeat", dest="repeat", help="Repeat the job run indefinitely? (default: %(default)s)", action=misc.FlagAction, default=self.repeat)
p.add_argument( "--nextrun", dest="nextrun", metavar="SECONDS", help="How many seconds to wait after the run before repeating it? (default: %(default)s)", type=argseconds, default=self.nextrun)
p.add_argument( "--waitchildbreak", dest="waitchildbreak", metavar="SECONDS", help="How many seconds to wait to give the child process time to clean up? (default: %(default)s)", type=float, default=self.waitchildbreak)
p.add_argument( "--maxhealthcheckage", dest="maxhealthcheckage", metavar="SECONDS", help="How old may a healthcheckfile be before the health check complains about it? (default: %(default)s)", type=float, default=self.maxhealthcheckage)
p.add_argument( "--healthcheck", dest="runhealthcheck", help="Run a heathcheck instead of the normal job? (default: %(default)s)", action=misc.FlagAction, default=self.runhealthcheck)
return p
[docs]
def parseargs(self, args: Optional[List[str]]) -> argparse.Namespace:
"""
Use the parser returned by :meth:`argparser` to parse the argument
sequence ``args``, modify ``self`` accordingly and return
the result of the parsers :meth:`parse_args` call.
"""
p = self.argparser()
ns = p.parse_args(args)
self.projectname = ns.projectname
self.jobname = ns.jobname
self.fromemail = ns.fromemail
self.toemail = ns.toemail
self.smtphost = ns.smtphost
self.smtpport = ns.smtpport
self.smtpuser = ns.smtpuser
self.smtppassword = ns.smtppassword
self.mattermost_url = ns.mattermost_url
self.mattermost_channel = ns.mattermost_channel
self.mattermost_token = ns.mattermost_token
self.sentry_dsn = ns.sentry_dsn
self.sentry_environment = ns.sentry_environment
self.sentry_release = ns.sentry_release
self.sentry_debug = ns.sentry_debug
self.identifier = ns.identifier
self.maxtime = ns.maxtime
self.fork = ns.fork
self.noisykills = ns.noisykills
self.exit_on_error = ns.exit_on_error
self.log2file = ns.log2file
self.log2stdout = ns.log2stdout
self.log2stderr = ns.log2stderr
self.keepfilelogs = ns.keepfilelogs
self.compressfilelogs = ns.compressfilelogs
self.compressmode = ns.compressmode
self.maxemailerrors = ns.maxemailerrors
self.proctitle = ns.proctitle
self.encoding = ns.encoding
self.errors = ns.errors
self.notify = ns.notify
self.repeat = ns.repeat
self.nextrun = ns.nextrun
self.waitchildbreak = ns.waitchildbreak
self.runhealthcheck = ns.runhealthcheck
return ns
def _handleexecution(self) -> None:
"""
Handle executing the job including handling of duplicate or hanging jobs.
"""
if self.jobname is None:
self.jobname = self.__class__.__qualname__
self._originalproctitle = setproctitle.getproctitle() if self.setproctitle and setproctitle else None
self._run = 0
self.maxtime = argseconds(self.maxtime)
self.keepfilelogs = argdays(self.keepfilelogs)
self.compressfilelogs = argdays(self.compressfilelogs)
self.waitchildbreak = argseconds(self.waitchildbreak)
self.sentry_sdk = None
self._healthfilename = self.healthfilename()
if self.runhealthcheck:
result = self.healthcheck()
raise SystemExit(result)
self._formatlogline = ul4c.Template(self.formatlogline, "formatlogline", whitespace="strip") # Log line formatting template
self._formatemailsubject = ul4c.Template(self.formatemailsubject, "formatemailsubject", whitespace="strip") # Email subject formatting template
self._formatemailbodytext = ul4c.Template(self.formatemailbodytext, "formatemailbodytext", whitespace="strip") # Email body formatting template (plain text)
self._formatemailbodyhtml = ul4c.Template(self.formatemailbodyhtml, "formatemailbodyhtml", whitespace="strip") # Email body formatting template (HTML)
self._formatmattermosttitle = ul4c.Template(self.formatmattermosttitle, "formatmattermosttitle", whitespace="strip") # Mattermost chat title formatting template
self._formatmattermostmessage = ul4c.Template(self.formatmattermostmessage, "formatmattermostmessage", whitespace="strip") # Mattermost chat message formatting template
# Obtain a lock on the script file to make sure we're the only one running
with open(misc.sysinfo.script_name, "rb") as f:
if fcntl is not None:
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
# The previous invocation of the job is still running
return # Return without calling :meth:`execute`
if self.repeat:
while True:
status = self._handleoneexecution()
self._run += 1
nextrun = self._calc_nextrun()
wait = nextrun - datetime.datetime.now()
wait_seconds = wait.total_seconds()
self._closelogs(status)
if wait_seconds > 0:
self.setproctitle("Sleeping")
self.log.sisyphus.delay.info(f"Sleeping for {wait} until {nextrun}")
time.sleep(wait_seconds)
else:
self.log.sisyphus.delay.info(f"Restarting immediately")
else:
status = self._handleoneexecution()
self._closelogs(status)
if fcntl is not None:
fcntl.flock(f, fcntl.LOCK_UN | fcntl.LOCK_NB)
def _handleoneexecution(self) -> Status:
self._tasks = [] # type: List[Task]
self._loggers = [] # type: List[Logger]
self._exceptioncount = 0
self.process = Process.SOLO
# we were able to obtain the lock, so we are the only one running
self.starttime = datetime.datetime.now()
self.starttime_utc = datetime.datetime.utcnow()
self.endtime = None # type: Optional[datetime.datetime]
self._getscriptsource() # Get source code
self._getcrontab() # Get crontab
self.log = Tag(self._log) # Create tagged logger for files
self._delayed_logs = [] # type: Optional[LogList]
self._createlogs() # Create loggers
if self.fork and hasattr(os, "fork"):
self._tasks = [self.task("parent", str(os.getpid()))]
self.log.sisyphus.delay.init(f"{misc.sysinfo.script_name} (max time {self.maxtime})")
logmessage = self._logmessage()
self.log.sisyphus.delay.init(logmessage)
# Check for support of various thing we'd like to use
if fcntl is None:
self.log.sisyphus.init.delay.warning("Can't lock script file (module fcntl not available)")
if self.fork and not hasattr(os, "fork"):
self.log.sisyphus.init.delay.warning("Can't fork (function os.fork not available)")
self.fork = False
if not hasattr(signal, "SIGALRM"):
self.log.sisyphus.init.delay.warning("Can't use signals (signal.SIGALRM not available)")
self.fork = False
if self.setproctitle and setproctitle is None:
self.log.sisyphus.init.delay.warning("Can't set process title (module setproctitle not available)")
if self.fork: # Forking mode?
# Fork the process; the child will do the work; the parent will monitor the maximum runtime
self.killpid = pid = os.fork()
if pid: # We are the parent process
self.process = Process.PARENT
self.setproctitle(f"{logmessage} (max time {self.maxtime})")
# set a signal to delay CTRL-C handling until the child has cleaned up
signal.signal(signal.SIGINT, self._signal_interupt)
# set a signal to wake us up to kill the child process after the maximum runtime
if self.maxtime is not None:
signal.signal(signal.SIGALRM, self._signal_timeout)
signal.alarm(int(self.maxtime.total_seconds()))
try:
(pid, status) = os.waitpid(pid, 0) # Wait for the child process to terminate
if self.maxtime is not None:
signal.alarm(0) # Cancel maximum runtime alarm
except misc.Timeout as exc:
self._finished_timeout(exc)
if self.exit_on_error:
raise
else:
return # finish normally (or continue, if we're in repeat mode)
except KeyboardInterrupt as exc:
self._finished_break(exc)
raise
else:
status = Status(status >> 8)
if status is Status.UNEVENTFUL:
self._finished_uneventful()
elif status is Status.INTERRUPTED:
exc = KeyboardInterrupt()
self._finished_break(exc)
raise exc
elif status is Status.TIMEOUT:
exc = misc.Timeout(self.maxtime)
self._finished_timeout(exc)
if self.exit_on_error:
raise exc
elif status is Status.FAILED:
exc = RuntimeError("failed")
self._finished_exception(exc)
if self.exit_on_error:
raise exc
elif status is Status.SUCCESSFUL:
self._finished_successful(None)
return status # finish normally (or continue, if we're in repeat mode)
# Here we are in the child process
self.process = Process.CHILD
self.setproctitle()
task = self.task("child", misc.sysinfo.pid, self._run if self.repeat else None)
self._tasks = [task] # This replaces the task stack inherited from the parent
self.log.sisyphus.init.delay(f"forked worker child")
self._init_sentry()
else: # We didn't fork
# set a signal to kill ourselves after the maximum runtime
self._init_sentry()
if self.maxtime is not None and hasattr(signal, "SIGALRM"):
signal.signal(signal.SIGALRM, self._signal_timeout)
signal.alarm(int(self.maxtime.total_seconds()))
self.setproctitle("Setting up")
self.notifystart()
result = None
try:
with url.Context():
self.setproctitle("Working")
result = self.execute()
signal.alarm(0) # Cancel alarm
except misc.Timeout as exc:
status = self._finished_timeout(exc)
if not self.fork:
raise
result = str(exc)
except KeyboardInterrupt as exc:
status = self._finished_break(exc)
if not self.fork:
raise
result = "interrupted"
except Exception as exc:
status = self._finished_exception(exc)
result = f"failed with {misc.format_exception(exc)}"
if not self.fork and self.exit_on_error:
raise
else:
if result is None:
status = self._finished_uneventful()
else:
status = self._finished_successful(result)
self.notifyfinish(result)
if self.fork:
os._exit(status)
return status
def _kill_children(self) -> Set[int]:
if psutil is None:
try:
os.kill(self.killpid, signal.SIGTERM) # Kill our child
except ProcessLookupError:
pass # already gone
return {self.killpid}
else:
pids = set()
procs = psutil.Process().children(recursive=True)
# Send SIGTERM
for p in procs:
pids.add(p.pid)
p.terminate()
seconds = self.waitchildbreak.total_seconds()
(gone, alive) = psutil.wait_procs(procs, timeout=seconds)
# Send SIGKILL
if alive:
for p in alive:
pids.add(p.pid)
p.kill()
(gone, alive) = psutil.wait_procs(alive, timeout=seconds)
# Ignore whether any processes remain in the ``alive`` list
return pids
def _termination_message(self, exc, pids):
if not pids:
return f"Terminated: {exc}"
elif len(pids) == 1:
return f"Terminated child {misc.first(pids)}: {exc}"
else:
pidstr = ", ".join(str(pid) for pid in pids)
return f"Ternminated children {pidstr}: {exc}"
def _init_sentry(self) -> None:
if self.sentry_dsn is not None:
self.log.sisyphus.delay.init(f"Setting up sentry")
try:
import sentry_sdk
except ImportError:
self.log.sisyphus.delay.warning("Can't log to Sentry (module sentry_sdk not available)")
return
self.sentry_sdk = sentry_sdk
self.sentry_sdk.init(
self.sentry_dsn,
traces_sample_rate=1.0,
release=self.sentry_release,
environment=self.sentry_environment,
debug=self.sentry_debug,
)
if self.identifier:
app_name = f"{self.projectname} {self.jobname} ({self.identifier})"
else:
app_name = f"{self.projectname} {self.jobname}"
self.sentry_sdk.set_context(
"app",
{
"app_identifier": f"{self.projectname}.{self.jobname}",
"app_name": app_name,
"app_start_time": self.starttime_utc,
}
)
self.sentry_sdk.set_context(
"os",
{
"name": misc.sysinfo.host_sysname,
"version": misc.sysinfo.host_release,
"kernel_version": misc.sysinfo.host_version,
}
)
self.sentry_sdk.set_context(
"Sisyphus",
{
"Script": misc.sysinfo.script_name,
"PID": misc.sysinfo.pid,
"Python": misc.sysinfo.python_executable,
}
)
self.sentry_sdk.set_context(
"User",
{
"Name": misc.sysinfo.user_name,
"UID": misc.sysinfo.user_uid,
"GID": misc.sysinfo.user_gid,
"Home": misc.sysinfo.user_dir,
}
)
def _finished_uneventful(self) -> Status:
self.endtime = datetime.datetime.now()
self.setproctitle("Finishing")
if self.process is not Process.PARENT:
self._write_healthfile(None)
# log the result
if self._exceptioncount:
self.log.sisyphus.result.errors(None)
else:
self.log.sisyphus.result.delay.ok(None)
return Status.UNEVENTFUL
def _finished_successful(self, result: OptStr) -> Status:
self.endtime = datetime.datetime.now()
self.setproctitle("Finishing")
# log the result
if self.process is not Process.PARENT:
self._write_healthfile(None)
if self._exceptioncount:
self.log.sisyphus.result.errors(result)
else:
# Throw away delayed logs.
if self._delayed_logs is not None:
self._delayed_logs = []
self.log.sisyphus.result.ok(result)
return Status.SUCCESSFUL
def _finished_exception(self, exc: BaseException) -> Status:
self.endtime = datetime.datetime.now()
self.setproctitle("Handling exception")
if self.process is not Process.PARENT:
strexc = misc.format_exception(exc)
self._write_healthfile(f"Failed with {strexc}")
# log the error to the logfile, as we assume that :meth:`execute` didn't do it
self.log.sisyphus.external(exc)
self.log.sisyphus.result.fail(f"failed with {strexc}")
return Status.FAILED
def _finished_break(self, exc: KeyboardInterrupt) -> Status:
self.endtime = datetime.datetime.now()
self.setproctitle("Handling break")
self._write_healthfile("Interrupted")
if self.process is not Process.CHILD:
# Don't log to email or mattermost
self.log.sisyphus(exc)
self.log.sisyphus.result.fail(f"failed with {misc.format_exception(exc)}")
return Status.INTERRUPTED
def _finished_timeout(self, exc: misc.Timeout) -> Status:
self.endtime = datetime.datetime.now()
self.setproctitle("Timeout")
if self.process is not Process.CHILD:
self._write_healthfile(f"Timeout after {self.maxtime}")
if self.process is Process.PARENT:
pids = self._kill_children()
elif self.process is Process.SOLO:
pids = set()
if self.process is not Process.CHILD:
if self.noisykills:
self.log.email.mattermost(exc)
else:
self.log(exc)
self.log.sisyphus.result.kill(self._termination_message(exc, pids))
return Status.TIMEOUT
def _signal_timeout(self, signum: int, frame: Optional[types.FrameType]) -> NoReturn:
raise misc.Timeout(self.maxtime)
def _signal_interupt(self, signum: int, frame: Optional[types.FrameType]) -> NoReturn:
signal.alarm(0) # Cancel maximum runtime alarm
# Give the child process time to log the stacktrace
time.sleep(self.waitchildbreak.total_seconds())
raise KeyboardInterrupt
def _logmessage(self) -> str:
logmessage = []
for logger in self._loggers:
name = logger.name()
if name is not None:
logmessage.append(name)
logstr = ", ".join(logmessage)
if logstr:
return f"logging to {logstr}"
else:
return "no logging"
def notifystart(self) -> None:
if self.notify:
misc.notifystart()
def notifyfinish(self, result: OptStr) -> None:
if self.notify:
misc.notifyfinish(
f"{self.projectname} {self.jobname}",
f"finished after {self.endtime-self.starttime}",
result or "uneventful",
)
[docs]
def task(self, type:OptStr=None, name:OptStr=None, index:OptInt=None, count:OptInt=None, **data) -> "Task":
"""
:meth:`!task` is a context manager and can be used to specify subtasks.
Arguments have the following meaning:
``type`` : :class:`str` or :const:`None`
The type of the task.
``name`` : :class:`str` or :const:`None`
The name of the task.
``index`` : :class:`int` or :const:`None`
If this task is one in a sequence of similar tasks, ``index`` should
be the index of this task, i.e. the first task of this type has
``index==0``, the second one ``index==1`` etc.
``count`` : :class:`int` or :const:`None`
If this task is one in a sequence of similar tasks and the total number
of tasks is known, ``count`` should be the total number of tasks.
``**data``
Additional information about the task. This will be added to the
Sentry breadcrumbs when logging to Sentry. Otherwise this is ignored.
"""
return Task(self, type=type, name=name, index=index, count=count, **data)
[docs]
def tasks(self, iterable: Iterable[T], type: OptStrFromCall=None, name: OptStrFromCall=None, data: OptDictFromCall=None) -> Generator[T, None, None]:
"""
:meth:`!tasks` iterates through ``iterable`` and calls :meth:`task` for
each item. ``index`` and ``count`` will be passed to :meth:`task`
automatically. ``type``, ``name`` and ``data`` will be used for the type,
name and additional data of the task. They can either be constants
(in which case they will be passed as is) or callables (in which case
they will be called with the item to get the type/name/data).
Example::
import sys, operator
items = list(sys.modules.items())
for (name, module) in self.tasks(items, "module", operator.itemgetter(0)):
self.log(f"module is {module}")
The log output will look something like the following:
.. sourcecode:: output
[2019-05-06 18:52:31.366810]=[t+0:00:00.263849] :: parent 19448 :: {sisyphus}{init} >> /Users/walter/x/gurk.py (max time 0:01:40)
[2019-05-06 18:52:31.367831]=[t+0:00:00.264870] :: parent 19448 :: {sisyphus}{init} >> logging to <stdout>, /Users/walter/ll.sisyphus/Test/Job/2019-05-06-18-52-31-102961.sisyphuslog
[2019-05-06 18:52:31.371690]=[t+0:00:00.268729] :: [1] child 19451 :: {sisyphus}{init} >> forked worker child
[2019-05-06 18:52:31.376598]=[t+0:00:00.273637] :: [1] child 19451 :: [1/226] module sys >> module is <module 'sys' (built-in)>
[2019-05-06 18:52:31.378561]=[t+0:00:00.275600] :: [1] child 19451 :: [2/226] module builtins >> module is <module 'builtins' (built-in)>
[2019-05-06 18:52:31.380381]=[t+0:00:00.277420] :: [1] child 19451 :: [3/226] module _frozen_importlib >> module is <module 'importlib._bootstrap' (frozen)>
[2019-05-06 18:52:31.382248]=[t+0:00:00.279287] :: [1] child 19451 :: [4/226] module _imp >> module is <module '_imp' (built-in)>
[2019-05-06 18:52:31.384064]=[t+0:00:00.281103] :: [1] child 19451 :: [5/226] module _thread >> module is <module '_thread' (built-in)>
[2019-05-06 18:52:31.386047]=[t+0:00:00.283086] :: [1] child 19451 :: [6/226] module _warnings >> module is <module '_warnings' (built-in)>
[2019-05-06 18:52:31.388009]=[t+0:00:00.285048] :: [1] child 19451 :: [7/226] module _weakref >> module is <module '_weakref' (built-in)>
[...]
[2019-05-06 18:52:31.847315]=[t+0:00:00.744354] :: [1] child 19451 :: {sisyphus}{result}{ok} >> done
"""
try:
count = len(iterable)
except TypeError:
count = None
for (i, item) in enumerate(iterable):
realtype = type(item) if callable(type) else type
realname = name(item) if callable(name) else name
realdata = data(item) if callable(data) else data
with self.task(realtype, realname, i, count, **(realdata or {})):
yield item
def makeproctitle(self, detail: OptStr=None) -> str:
v = []
if self.process is not Process.SOLO:
v.append(self.process.name.lower())
for task in self._tasks:
v.append(str(task))
title = " :: ".join(v)
if not detail:
return title
if not title:
return detail
return f"{title} >> {detail}"
def setproctitle(self, detail:OptStr=None) -> None:
if self.proctitle and setproctitle:
title = self.makeproctitle(detail)
setproctitle.setproctitle(f"{self._originalproctitle} :: {title}")
def _log(self, tags:Tags, obj:Any) -> None:
"""
Log ``obj`` to all loggers using ``tags`` as the list of tags.
If we're in "delayed logs" mode, buffer up messages instead.
"""
timestamp = datetime.datetime.now()
if isinstance(obj, BaseException) and "exc" not in tags:
tags += ("exc",)
self._exceptioncount += 1
delayed = "delay" in tags
if delayed:
tags = tuple(tag for tag in tags if tag != "delay")
if delayed and self._delayed_logs is not None:
self._delayed_logs.append((timestamp, tags, self._tasks[:], obj))
else:
self._flush_logs()
for logger in self._loggers:
logger.log(timestamp, tags, self._tasks, obj)
def _flush_logs(self) -> None:
"""
Flush delayed logs and switch of "delayed logs" mode.
"""
if self._delayed_logs is not None:
for (timestamp, tags, tasks, obj) in self._delayed_logs:
for logger in self._loggers:
logger.log(timestamp, tags, tasks, obj)
self._delayed_logs = None # No more delayed logs
def _getscriptsource(self) -> None:
"""
Reads the source code of the script into ``self.source``.
"""
scriptname = misc.sysinfo.script_name.rstrip("c")
try:
encoding = tokenize.detect_encoding(open(scriptname, "rb").readline)[0]
with open(scriptname, "r", encoding=encoding, errors="replace") as f:
self.source = f.read()
except IOError: # Script might have called ``os.chdir()`` before
self.source = None
def _getcrontab(self) -> None:
"""
Reads the current crontab into ``self.crontab``.
"""
with os.popen("crontab -l 2>/dev/null") as f:
self.crontab = f.read()
def _calc_nextrun(self) -> datetime.datetime:
"""
Calculate when the job should run next (in repeat mode).
"""
nextrun = self.nextrun
if callable(nextrun):
nextrun = nextrun()
if nextrun is None:
nextrun = datetime.timedelta(0)
if isinstance(nextrun, (int, float)):
nextrun = datetime.timedelta(seconds=nextrun)
if isinstance(nextrun, datetime.timedelta):
return datetime.datetime.now() + nextrun
else:
return nextrun
def _calc_maxhealthcheckage(self) -> datetime.datetime:
"""
Calculate cut-off date for the health check.
A health check file with a timestamp before that date will indicate an
unhealthy job.
"""
cutoff = self.maxhealthcheckage
if callable(cutoff):
cutoff = cutoff()
if cutoff is None:
cutoff = datetime.datetime(datetime.MINYEAR, 1, 1)
if isinstance(cutoff, (int, float)):
cutoff = datetime.timedelta(seconds=cutoff)
if isinstance(cutoff, datetime.timedelta):
cutoff = datetime.datetime.now() - cutoff
return cutoff
def _createlogs(self) -> None:
"""
Create the logfile and the link to the logfile (if configured).
"""
self._loggers = []
skipfilenames = [] # type: List[pathlib.Path]
if self.toemail and self.fromemail and self.smtphost:
# Use the email logger as the first logger, so that when sending the email (in :meth:`EmailLogger.close`) fails,
# it will still be logged to the log file/stdout/stderr
self._loggers.append(EmailLogger(self))
if self.log2file:
logfilename = self.logfilename()
if logfilename is not None:
# Create the logger for the log file
self.logfileurl = str(url.Ssh(misc.sysinfo.user_name, misc.sysinfo.host_fqdn or misc.sysinfo.host_name, str(logfilename)))
self._loggers.append(FileLogger(self, logfilename, self.encoding, self.errors, skipfilenames, self._formatlogline))
skipfilenames.append(logfilename) # Note that we can still append URLs after the logger has been created, as the list object is shared
# Create logger for links
links = [
(self.currentloglinkname, CurrentLinkLogger),
(self.lastsuccessfulloglinkname, LastStatusLinkLogger, Status.SUCCESSFUL),
(self.lastfailedloglinkname, LastStatusLinkLogger, Status.FAILED),
(self.lastinterruptedloglinkname, LastStatusLinkLogger, Status.INTERRUPTED),
(self.lasttimeoutloglinkname, LastStatusLinkLogger, Status.TIMEOUT),
] # type: List[Tuple[Callable[[], pathlib.Path], Type[Logger]] | Tuple[Callable[[], pathlib.Path], Type[Logger], Status]]
for (makelinkfilename, logger, *additionalargs) in links:
linkfilename = makelinkfilename()
if linkfilename is not None:
self._loggers.append(logger(self, logfilename, linkfilename, *additionalargs))
skipfilenames.append(linkfilename)
if self._healthfilename is not None:
skipfilenames.append(self._healthfilename)
if self.log2stdout:
self._loggers.append(StreamLogger(self, sys.stdout, self._formatlogline))
if self.log2stderr:
self._loggers.append(StreamLogger(self, sys.stderr, self._formatlogline))
if self.mattermost_url is not None and self.mattermost_channel is not None and self.mattermost_token is not None:
self._loggers.append(MattermostLogger(self))
if self.sentry_dsn is not None:
self._loggers.append(SentryLogger(self))
def _closelogs(self, status:Status) -> None:
# Note that in forking mode the child process inherits the delayed log
# messages of the parent process. If both processes would log a
# non-delayed message, the inherited messages would be output twice.
# To avoid this problem, we clear the delayed log queue in the parent
# before continuing.
if self._delayed_logs:
self._delayed_logs = []
index = 0
while index < len(self._loggers):
# Don't remove the logger from the list immediately
# In this way, log messages that the logger outputs during closing will
# be logged by the logger itself (i.e. logfile cleanup will be logged
# in the logfile)
logger = self._loggers[index]
if logger.close(status):
# Logger has closed, so remove it
del self._loggers[index]
else:
# Logger didn't close, keep it and go to the next one
index += 1
def _write_healthfile(self, error:OptStr) -> None:
# Write the file that is used for the healthcheck
if self._healthfilename:
error = "" if error is None else error + "\n"
try:
self._healthfilename.write_text(error, encoding=self.encoding, errors=self.errors)
except FileNotFoundError:
self._healthfilename.parent.mkdir(parents=True)
self._healthfilename.write_text(error, encoding=self.encoding, errors=self.errors)
[docs]
class Task:
"""
A subtask of a :class:`Job`.
"""
ul4_attrs = {"index", "count", "type", "name", "starttime", "endtime", "success", "data"}
[docs]
def __init__(self, job:Job, type:OptStr=None, name:OptStr=None, index:OptInt=None, count:OptInt=None, **data):
"""
Create a :class:`!Task` object. For the meaning of the parameters see
:meth:`Job.task`.
"""
self.job = job
self.type = type
self.name = name
self.index = index
self.count = count
self.data = data
self.starttime = None # type: Optional[datetime.datetime]
self.endtime = None # type: Optional[datetime.datetime]
self.success = None # type: Optional[bool]
def __enter__(self) -> "Task":
self.starttime = datetime.datetime.now()
self.job._tasks.append(self)
self.job.setproctitle()
for logger in self.job._loggers:
logger.taskstart(self.job._tasks)
return self
def __exit__(self, type:Optional[Type[BaseException]], value:Optional[BaseException], traceback:Optional[types.TracebackType]) -> None:
self.endtime = datetime.datetime.now()
self.success = type is None
for logger in self.job._loggers:
logger.taskend(self.job._tasks)
self.job._tasks.pop()
self.job.setproctitle()
def __str__(self) -> str:
v = ""
if self.index is not None:
v += f"[{self.index+1:,}"
if self.count is not None:
v += f"/{self.count:,}"
v += "]"
d = " ".join(str(d) for d in (self.type, self.name) if d)
if d:
if v:
v += " "
v += d
return v or "?"
def asdict(self) -> Dict[str, Any]:
return dict(
type=self.type,
name=str(self.name) or None,
index=self.index,
count=self.count,
starttime=self.starttime,
endtime=self.endtime,
)
def __repr__(self) -> str:
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} type={self.type!r} name={self.name!r} at {id(self):#x}"
[docs]
class Tag:
"""
A :class:`!Tag` object can be used to call a function with an additional list
of tags. Tags can be added via :meth:`__getattr__` or :meth:`__getitem__` calls.
"""
def __init__(self, func:Callable, *tags:str):
self.func = func
self.tags = tags
self._map = {} # type: Dict[str, Tag]
def __getattr__(self, tag:str) -> "Tag":
if tag in self.tags: # Avoid duplicate tags
return self
if tag not in self._map:
newtag = Tag(self.func, *(self.tags + (tag,)))
self._map[tag] = newtag
return newtag
else:
return self._map[tag]
__getitem__ = __getattr__
def __call__(self, *args, **kwargs) -> "Tag":
return self.func(self.tags, *args, **kwargs)
[docs]
class Logger:
"""
A :class:`Logger` is called by the :class:`Job` for each logging event.
"""
[docs]
def name(self) -> OptStr:
"""
A name for the logger (using in reporting)
"""
return None
[docs]
def log(self, timestamp:datetime.datetime, tags:Tags, tasks:List[Task], text:str) -> None:
"""
Called by the :class:`Job` when a log entry has to be made.
Arguments have the following meaning:
``timestamp`` : ``datetime.datetime``
The moment when the logging call was made.
``tags`` : List of strings
The tags that were part of the logging call. For example for the
logging call::
self.log.xml.warning("Skipping foobar")
the list of tags is::
["xml", "warning"]
``tasks`` : List of :class:`Task` objects
The currently active stack of :class:`Task` objects.
``text`` : Any object
The log text. This can be any object. If it's not a string it will be
converted to a string via :func:`pprint.pformat` (or
:func:`traceback.format_exception` if it's an exception)
"""
[docs]
def taskstart(self, tasks:List[Task]) -> None:
"""
Called by the :class:`Job` when a new subtask has been started.
``tasks`` is the stack of currently active tasks (so ``tasks[-1]`` is
the task that has been started).
"""
[docs]
def taskend(self, tasks:List[Task]) -> None:
"""
Called by the :class:`Job` when a subtask is about to end.
``tasks`` is the stack of currently active tasks (so ``tasks[-1]`` is
the task that's about to end).
"""
[docs]
def close(self, status:Status) -> bool:
"""
Called by the :class:`Job` when job execution has finished.
``status`` (a :class:`Status`) is the result status of the job run.
Return whether the logfile has been closed. (All normal loggers
will close except ``stdout`` and ``stderr`` loggers).
"""
[docs]
class StreamLogger(Logger):
"""
Logger that writes logging events into an open file-like object. Is is used
for logging to ``stdout`` and ``stderr``.
"""
def __init__(self, job:Job, stream:TextIO, linetemplate:ul4c.Template) -> None:
self.job = job
self.stream = stream
self.linetemplate = linetemplate
self.lineno = 1 # Current line number
def __repr__(self) -> str:
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} stream={self.stream!r} at {id(self):#x}>"
def name(self) -> str:
return self.stream.name
def log(self, timestamp:datetime.datetime, tags:Tags, tasks:List[Task], text:str) -> None:
for line in _formatlines(text):
line = self.linetemplate.renders(line=line, time=timestamp, tags=tags, tasks=tasks, sysinfo=misc.sysinfo, job=self.job, env=env)
self.stream.write(line)
self.stream.write("\n")
self.lineno += 1
self.stream.flush()
def close(self, status:Status) -> bool:
return False
[docs]
class FileLogger(StreamLogger):
"""
Logger that writes logging events into a file specified via an
:class:`~ll.url.URL` object. This is used for logging to the standard log
file.
"""
def __init__(self, job:Job, filename:pathlib.Path, encoding:str, errors:str, skipfilenames:List[pathlib.Path], linetemplate:ul4c.Template) -> None:
self.filename = filename
try:
file = filename.open("w", encoding=encoding, errors=errors)
except FileNotFoundError:
filename.parent.mkdir(parents=True)
file = filename.open("w", encoding=encoding, errors=errors)
StreamLogger.__init__(self, job, file, linetemplate)
self.skipfilenames = skipfilenames
def __repr__(self) -> str:
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} filename={str(self.filename)!r} at {id(self):#x}>"
def close(self, status:Status) -> None:
keepfilelogs = self.job.keepfilelogs
compressfilelogs = self.job.compressfilelogs
if self.job.process is not Process.CHILD and (keepfilelogs is not None or compressfilelogs is not None):
now = datetime.datetime.now()
keepthreshold = now - keepfilelogs # Files older that this will be deleted
compressthreshold = now - compressfilelogs # Files older that this will be compressed
logdir = pathlib.Path(self.stream.name).parent
removedany = False
compressedany = False
warnedcompressany = False
for filename in sorted(list(logdir.iterdir())):
# Decide what to do with this file
# (Note that this might delete/compress files that were not produced by sisyphus)
if filename not in self.skipfilenames:
# If the file is not the logfile or a link to it ...
mdate = get_mtime(filename)
if mdate < keepthreshold:
# ... and it's to old to keep it, delete it
if not removedany: # Only log this line for the first logfile we remove
# This will still work, as the file isn't closed yet.
self.job.log.sisyphus.delay.info(f"Removing logfiles older than {keepfilelogs}")
removedany = True
self.remove(filename)
elif mdate < compressthreshold:
# ... and it's to old to keep it in uncompressed, compress it
if filename.suffix not in {".gz", ".bz2", ".xz"}:
if (self.job.compressmode == "gzip" and gzip is None) or (self.job.compressmode == "gzip2" and bz2 is None) or (self.job.compressmode == "lzma" and lzma is None):
if not warnedcompressany:
self.job.log.sisyphus.delay.warning(f"{self.job.compressmode} compression not available, leaving log files uncompressed")
warnedcompressany = True
else:
if not compressedany:
self.job.log.sisyphus.delay.info(f"Compressing logfiles older than {compressfilelogs} via {self.job.compressmode}")
compressedany = True
self.compress(filename)
if removedany or compressedany:
self.job.log.sisyphus.delay.info("Old logfiles cleaned up")
if self.job.process is not Process.CHILD and status is Status.UNEVENTFUL:
self.job.log.sisyphus.delay.info("Going to delete current logfile")
# Close the stream now, so that we're able to delete it (even on Windows)
self.stream.close()
if self.job.process is not Process.CHILD:
if status is Status.UNEVENTFUL:
# Remove current log file in case of a uneventful run
self.filename.unlink()
return True
def remove(self, filename:pathlib.Path) -> None:
self.job.log.sisyphus.delay.info(f"Removing logfile {filename}")
filename.unlink()
def compress(self, filename:pathlib.Path, bufsize:int=65536) -> None:
if self.job.compressmode == "gzip":
compressor = gzip.GzipFile
ext = ".gz"
elif self.job.compressmode == "bzip2":
compressor = bz2.BZ2File
ext = ".bz2"
elif self.job.compressmode == "lzma":
compressor = lzma.LZMAFile
ext = ".xz"
else:
raise ValueError(f"unknown compressmode {self.job.compressmode!r}")
self.job.log.sisyphus.delay.info(f"Compressing logfile {filename}")
compressedfilename = pathlib.Path(str(filename) + ext)
with filename.open("rb") as logfile:
with compressor(compressedfilename, mode="wb") as compressedlogfile:
while True:
data = logfile.read(bufsize)
if not data:
break
compressedlogfile.write(data)
# Copy timestamp of original file to the compressed file
# (otherwise removal of the compressed log file would be delayed)
times = get_utime(filename)
set_utime(compressedfilename, *times)
# Remove uncompressed log file
filename.unlink()
[docs]
class LinkLogger(Logger):
"""
Baseclass of all loggers that handle links to the log file.
"""
def __init__(self, job:Job, filename:pathlib.Path, linkname:pathlib.Path):
self.job = job
self.filename = filename
self.linkname = linkname
def __repr__(self) -> str:
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} linkname={str(self.linkname)!r} at {id(self):#x}>"
def _makelink(self) -> None:
linkname = self.linkname.absolute()
filename = self.filename
try:
filename = filename.absolute().relative_to(linkname.parent)
except ValueError:
pass
try:
linkname.symlink_to(filename)
except FileExistsError:
linkname.unlink()
linkname.symlink_to(filename)
def close(self, status:Status) -> bool:
return True
[docs]
class CurrentLinkLogger(LinkLogger):
"""
Logger that handles the link to the current log file.
"""
def __init__(self, job:Job, filename:pathlib.Path, linkname:pathlib.Path):
super().__init__(job, filename, linkname)
self._makelink()
[docs]
class LastStatusLinkLogger(LinkLogger):
"""
Logger that handles the link to the log file for a specific job status.
"""
def __init__(self, job:Job, filename:pathlib.Path, linkname:pathlib.Path, status:Status):
super().__init__(job, filename, linkname)
self.status = status
def __repr__(self) -> str:
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} linkname={str(self.linkname)!r} status={self.status.name} at {id(self):#x}>"
def close(self, status:Status) -> bool:
if self.job.process is not Process.CHILD and status is self.status:
self._makelink()
return True
[docs]
class EmailLogger(Logger):
"""
Logger that handles sending an email report of the job run.
"""
def __init__(self, job:Job):
self.job = job
self.filename = None
self.file = None
self.encoder = None
def name(self) -> str:
return "<email>"
def log(self, timestamp:datetime.datetime, tags:Tags, tasks:List[Task], text:str) -> None:
if "email" in tags or "external" in tags:
if self.file is None:
filename = self.job.emailfilename()
try:
file = filename.open("w", encoding="utf-8", buffering=1)
except FileNotFoundError:
filename.parent.mkdir(parents=True)
file = filename.open("w", encoding="utf-8", buffering=1)
self.file = file
self.encoder = ul4on.Encoder()
data = {"timestamp": timestamp, "tags": tags, "tasks": [t.asdict() for t in tasks]}
if isinstance(text, BaseException):
data["type"] = "exception"
data["class"] = misc.format_class(text)
data["value"] = str(text) or None
data["traceback"] = _formattraceback(text)
else:
data["type"] = "message"
data["message"] = "\n".join(_formatlines(text))
self.file.write(self.encoder.dumps(data))
self.file.write("\n")
self.file.flush()
def _load_dump(self, process:Process) -> Generator[Any, None, None]:
decoder = ul4on.Decoder()
filename = self.job.emailfilename(process)
try:
with filename.open("r", encoding="utf-8") as f:
while True:
try:
yield decoder.load(f)
except EOFError:
break
except FileNotFoundError:
pass
def close(self, status:Status) -> bool:
if self.file is not None:
self.file.close()
else:
# If we never wrote any logs, remove the log file (shoudn't exist anyway)
try:
self.job.emailfilename().unlink()
except FileNotFoundError:
pass
if self.job.process is not Process.CHILD:
if self.job.process is Process.SOLO:
processes = (Process.SOLO,)
else:
processes = (Process.CHILD, Process.PARENT)
log = sorted(
itertools.chain.from_iterable(self._load_dump(p) for p in processes),
key=operator.itemgetter("timestamp"),
)
# Without log messages, we have nothing to do
if log:
countexceptions = 0
countmessages = 0
for data in log:
if data["type"] == "exception":
countexceptions += 1
else:
countmessages += 1
# Limit size of email
log = log[:self.job.maxemailerrors]
jsondata = dict(
projectname=self.job.projectname,
jobname=self.job.jobname,
identifier=self.job.identifier,
log=log,
countexceptions=countexceptions,
countmessages=countmessages,
host_name=misc.sysinfo.host_name,
host_fqdn=misc.sysinfo.host_fqdn,
host_ip=misc.sysinfo.host_ip,
host_sysname=misc.sysinfo.host_sysname,
host_nodename=misc.sysinfo.host_nodename,
host_release=misc.sysinfo.host_release,
host_version=misc.sysinfo.host_version,
host_machine=misc.sysinfo.host_machine,
user_name=misc.sysinfo.user_name,
user_uid=misc.sysinfo.user_uid,
user_gid=misc.sysinfo.user_gid,
user_gecos=misc.sysinfo.user_gecos,
user_dir=misc.sysinfo.user_dir,
user_shell=misc.sysinfo.user_shell,
python_executable=misc.sysinfo.python_executable,
python_version=misc.sysinfo.python_version,
pid=misc.sysinfo.pid,
script_name=misc.sysinfo.script_name,
short_script_name=misc.sysinfo.short_script_name,
starttime=self.job.starttime,
endtime=self.job.endtime,
logfileurl=self.job.logfileurl,
)
variables = dict(
job=self.job,
env=env,
sysinfo=misc.sysinfo,
log=log,
countexceptions=countexceptions,
countmessages=countmessages,
)
emailsubject = self.job._formatemailsubject.renders(**variables)
emailbodytext = self.job._formatemailbodytext.renders(**variables)
emailbodyhtml = self.job._formatemailbodyhtml.renders(**variables)
textpart = text.MIMEText(emailbodytext)
htmlpart = text.MIMEText(emailbodyhtml, _subtype="html")
jsonpart = application.MIMEApplication(json.dumps(jsondata, cls=DatetimeEncoder).encode("utf-8"), _subtype="json", _encoder=encoders.encode_base64)
jsonpart.add_header('Content-Disposition', 'attachment', filename=f"{self.job.projectname}.{self.job.jobname}.json")
msg = multipart.MIMEMultipart(
_subparts=[
multipart.MIMEMultipart(_subtype="alternative", _subparts=[textpart, htmlpart]),
jsonpart,
]
)
msg["To"] = self.job.toemail
msg["From"] = self.job.fromemail
msg["Subject"] = emailsubject
try:
server = smtplib.SMTP(self.job.smtphost, self.job.smtpport)
if self.job.smtpuser and self.job.smtppassword:
server.login(self.job.smtpuser, self.job.smtppassword)
server.send_message(msg)
server.quit()
self.job.log.sisyphus.report(f"Sent email report to {self.job.toemail}")
except smtplib.SMTPException as exc:
self.job.log.sisyphus.report(exc)
# Remove files
for p in processes:
try:
self.job.emailfilename(p).unlink()
except FileNotFoundError:
pass
return True
[docs]
class MattermostLogger(Logger):
"""
Logger that logs messages to a Mattermost chat channel.
"""
def __init__(self, job:Job):
self.job = job
def name(self) -> str:
return "<mattermost>"
def log(self, timestamp:datetime.datetime, tags:Tags, tasks:List[Task], text:str) -> None:
if "mattermost" in tags or "external" in tags:
import requests
if isinstance(text, BaseException):
message = _formattraceback(text)
type = "exc"
elif not isinstance(text, str):
message = pprint.pformat(text)
type = "obj"
else:
message = text
type = "str"
message = message.strip("\n")
if len(message) > 14000:
message = message[:14000] + "..."
vars = dict(
job=self.job,
env=env,
sysinfo=misc.sysinfo,
type=type,
message=message,
time=timestamp,
tags=tags,
tasks=tasks,
)
title = self.job._formatmattermosttitle.renders(**vars)
message = self.job._formatmattermostmessage.renders(**vars)
message = f"# {title}\n{message}"
requests.post(
self.job.mattermost_url,
headers={
"Authorization": f"Bearer {self.job.mattermost_token}",
},
json={
"channel_id": self.job.mattermost_channel,
"message": message[:15000],
}
)
def close(self, status:Status) -> bool:
return True
[docs]
class SentryLogger(Logger):
"""
Logger that logs messages and exceptions to Sentry.
"""
def __init__(self, job:Job):
self.job = job
def name(self) -> str:
return "<sentry>"
def _task_description(self, task:Task) -> str:
v = ""
if task.type is not None and task.name is not None:
v = str(task.name)
if task.index is not None:
v += f" [{task.index+1:,}"
if task.count is not None:
v += f"/{task.count:,}"
v += "]"
return v or "?"
def log(self, timestamp:datetime.datetime, tags:Tags, tasks:List[Task], text:str) -> None:
if "sentry" in tags or "external" in tags:
sentry = self.job.sentry_sdk
if sentry is not None:
with sentry.push_scope() as scope:
if isinstance(text, BaseException):
scope.level = "fatal"
else:
for level in ("fatal", "error", "warning", "info", "debug"):
if level in tags:
scope.level = level
break
else:
scope.level = "info"
for tag in tags:
scope.set_tag(f"sisphus.tag.{tag}", "true")
for task in tasks:
sentry.add_breadcrumb(
type="debug",
category=task.type or task.name,
message=self._task_description(task),
data={k: str(v) for (k, v) in task.data.items()},
)
if isinstance(text, BaseException):
sentry.capture_exception(text)
else:
if not isinstance(text, str):
text = pprint.pformat(text)
sentry.capture_message(text, level="warning")
sentry.flush()
def close(self, status:Status) -> bool:
return True
###
### High-level interface for starting jobs
###
[docs]
def execute(job:Job) -> None:
"""
Execute the job ``job`` once or repeatedly.
"""
job._handleexecution()
[docs]
def executewithargs(job:Job, args:Optional[List[str]]=None) -> None:
"""
Execute the job ``job`` once or repeatedly with command line arguments.
``args`` are the command line arguments (:const:`None` results in
``sys.argv`` being used).
"""
job.parseargs(args)
job._handleexecution()