# -*- coding: utf-8 -*-
# cython: language_level=3, always_allow_keywords=True
## Copyright 2004-2024 by LivingLogic AG, Bayreuth/Germany
## Copyright 2004-2024 by Walter Dörwald
##
## All Rights Reserved
##
## See ll/xist/__init__.py for the license
"""
:mod:`!ll.misc` contains various utility functions and classes used by the other
LivingLogic modules and packages.
"""
import sys, os, os.path, platform, types, datetime, collections, argparse, functools, signal, contextlib, subprocess, enum
from collections import abc
from ll import ul4c, color
__docformat__ = "reStructuredText"
# get the current directory as early as possible to minimize the chance that someone has called ``os.chdir()``
_curdir = os.getcwd()
notifycmd = os.environ.get("LL_MISC_NOTIFY", "/usr/local/bin/terminal-notifier")
# Try to fetch ``xmlescape`` from C implementation
try:
from ll._misc import *
except ImportError:
def xmlescape(string):
"""
Return a copy of the argument string, where every occurrence of ``<``,
``>``, ``&``, ``\"``, ``'`` and every restricted character has been
replaced with their XML character entity or character reference.
"""
if isinstance(string, str):
return string.translate({0x00: '�', 0x01: '', 0x02: '', 0x03: '', 0x04: '', 0x05: '', 0x06: '', 0x07: '', 0x08: '', 0x0b: '', 0x0c: '', 0x0e: '', 0x0f: '', 0x10: '', 0x11: '', 0x12: '', 0x13: '', 0x14: '', 0x15: '', 0x16: '', 0x17: '', 0x18: '', 0x19: '', 0x1a: '', 0x1b: '', 0x1c: '', 0x1d: '', 0x1e: '', 0x1f: '', 0x22: '"', 0x26: '&', 0x27: ''', 0x3c: '<', 0x3e: '>', 0x7f: '', 0x80: '€', 0x81: '', 0x82: '‚', 0x83: 'ƒ', 0x84: '„', 0x86: '†', 0x87: '‡', 0x88: 'ˆ', 0x89: '‰', 0x8a: 'Š', 0x8b: '‹', 0x8c: 'Œ', 0x8d: '', 0x8e: 'Ž', 0x8f: '', 0x90: '', 0x91: '‘', 0x92: '’', 0x93: '“', 0x94: '”', 0x95: '•', 0x96: '–', 0x97: '—', 0x98: '˜', 0x99: '™', 0x9a: 'š', 0x9b: '›', 0x9c: 'œ', 0x9d: '', 0x9e: 'ž', 0x9f: 'Ÿ'})
else:
string = string.replace("&", "&")
string = string.replace("<", "<")
string = string.replace(">", ">")
string = string.replace("'", "'")
string = string.replace('"', """)
for c in "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x0b\x0c\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1f\x7f\x80\x81\x82\x83\x84\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f":
string = string.replace(c, f"&#{ord(c)};")
return string
def xmlescape_text(string):
"""
Return a copy of the argument string, where every occurrence of ``<``,
``>``, ``&``, and every restricted character has been replaced with their
XML character entity or character reference.
"""
if isinstance(string, str):
return string.translate({0x00: '�', 0x01: '', 0x02: '', 0x03: '', 0x04: '', 0x05: '', 0x06: '', 0x07: '', 0x08: '', 0x0b: '', 0x0c: '', 0x0e: '', 0x0f: '', 0x10: '', 0x11: '', 0x12: '', 0x13: '', 0x14: '', 0x15: '', 0x16: '', 0x17: '', 0x18: '', 0x19: '', 0x1a: '', 0x1b: '', 0x1c: '', 0x1d: '', 0x1e: '', 0x1f: '', 0x26: '&', 0x3c: '<', 0x3e: '>', 0x7f: '', 0x80: '€', 0x81: '', 0x82: '‚', 0x83: 'ƒ', 0x84: '„', 0x86: '†', 0x87: '‡', 0x88: 'ˆ', 0x89: '‰', 0x8a: 'Š', 0x8b: '‹', 0x8c: 'Œ', 0x8d: '', 0x8e: 'Ž', 0x8f: '', 0x90: '', 0x91: '‘', 0x92: '’', 0x93: '“', 0x94: '”', 0x95: '•', 0x96: '–', 0x97: '—', 0x98: '˜', 0x99: '™', 0x9a: 'š', 0x9b: '›', 0x9c: 'œ', 0x9d: '', 0x9e: 'ž', 0x9f: 'Ÿ'})
else:
string = string.replace("&", "&")
string = string.replace("<", "<")
string = string.replace(">", ">")
for c in "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x0b\x0c\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1f\x7f\x80\x81\x82\x83\x84\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f":
string = string.replace(c, f"&#{ord(c)};")
return string
def xmlescape_attr(string):
"""
Return a copy of the argument string, where every occurrence of ``<``,
``>``, ``&``, ``"`` and every restricted character has been replaced with
their XML character entity or character reference.
"""
if isinstance(string, str):
return string.translate({0x00: '�', 0x01: '', 0x02: '', 0x03: '', 0x04: '', 0x05: '', 0x06: '', 0x07: '', 0x08: '', 0x0b: '', 0x0c: '', 0x0e: '', 0x0f: '', 0x10: '', 0x11: '', 0x12: '', 0x13: '', 0x14: '', 0x15: '', 0x16: '', 0x17: '', 0x18: '', 0x19: '', 0x1a: '', 0x1b: '', 0x1c: '', 0x1d: '', 0x1e: '', 0x1f: '', 0x22: '"', 0x26: '&', 0x3c: '<', 0x3e: '>', 0x7f: '', 0x80: '€', 0x81: '', 0x82: '‚', 0x83: 'ƒ', 0x84: '„', 0x86: '†', 0x87: '‡', 0x88: 'ˆ', 0x89: '‰', 0x8a: 'Š', 0x8b: '‹', 0x8c: 'Œ', 0x8d: '', 0x8e: 'Ž', 0x8f: '', 0x90: '', 0x91: '‘', 0x92: '’', 0x93: '“', 0x94: '”', 0x95: '•', 0x96: '–', 0x97: '—', 0x98: '˜', 0x99: '™', 0x9a: 'š', 0x9b: '›', 0x9c: 'œ', 0x9d: '', 0x9e: 'ž', 0x9f: 'Ÿ'})
else:
string = string.replace("&", "&")
string = string.replace("<", "<")
string = string.replace(">", ">")
string = string.replace('"', """)
for c in "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x0b\x0c\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1f\x7f\x80\x81\x82\x83\x84\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f":
string = string.replace(c, f"&#{ord(c)};")
return string
[docs]
def item(iterable, index, /, default=None):
"""
Returns the ``index``'th item from the iterable. ``index`` may be
negative to count from the end. E.g. 0 returns the first item produced by
the iterator, 1 the second, -1 the last one etc. If ``index`` is negative
the iterator will be completely exhausted, if it's positive it will be
exhausted up to the ``index``'th item. If the iterator doesn't produce
that many items ``default`` will be returned.
``index`` may also be an iterable of indexes, in which case :func:`!item`
will be applied recursively, i.e. ``item(["foo", "bar"], (1, -1))`` returns
``'r'``.
"""
if isinstance(index, int):
index = (index,)
for i in index:
if i >= 0:
for item in iterable:
if not i:
iterable = item
break
i -= 1
else:
return default
else:
i = -i
cache = collections.deque()
for item in iterable:
cache.append(item)
if len(cache) > i:
cache.popleft()
if len(cache) == i:
iterable = cache.popleft()
else:
return default
return iterable
[docs]
def first(iterable, /, default=None):
"""
Return the first item from the iterable. If the iterator doesn't
produce any items ``default`` will be returned.
"""
for item in iterable:
return item
return default
[docs]
def last(iterable, /, default=None):
"""
Return the last item from the iterable. If the iterator doesn't produce any
items ``default`` will be returned.
"""
item = default
for item in iterable:
pass
return item
[docs]
def count(iterable, /):
"""
Count the number of items produced by the iterable. Calling this function
will exhaust the iterator.
"""
count = 0
for item in iterable:
count += 1
return count
[docs]
def isfirst(iterable, /):
"""
Iterate through items of the iterable and give information about whether the
item is the first in the iterable::
>>> list(misc.isfirst("foo"))
[(True, 'f'), (False, 'o'), (False, 'o')]
"""
first = True
for item in iterable:
yield (first, item)
first = False
[docs]
def islast(iterable, /):
"""
Iterate through items of the iterable and give information about whether the
item is the last in the iterable::
>>> list(misc.islast("foo"))
[(False, 'f'), (False, 'o'), (True, 'o')]
"""
lastitem = None
it = iter(iterable)
try:
item = next(it)
except StopIteration:
return
while True:
try:
(lastitem, item) = (item, next(it))
except StopIteration:
yield (True, item) # Items haven't been swapped yet
return
else:
yield (False, lastitem)
[docs]
def isfirstlast(iterable, /):
"""
Iterate through items of the iterable and give information about whether the
item is the first and/or last in the iterable::
>>> list(misc.isfirstlast("foo"))
[(True, False, 'f'), (False, False, 'o'), (False, True, 'o')]
"""
lastitem = None
first = True
it = iter(iterable)
try:
item = next(it)
except StopIteration:
return
while True:
try:
(lastitem, item) = (item, next(it))
except StopIteration:
yield (first, True, item) # Items haven't been swapped yet
return
else:
yield (first, False, lastitem)
first = False
[docs]
def notimplemented(function):
"""
A decorator that raises :exc:`NotImplementedError` when the method is called.
This saves you the trouble of formatting the error message yourself for each
implementation.
"""
@functools.wraps(function)
def wrapper(self, *args, **kwargs):
raise NotImplementedError(f"method {function.__name__}() not implemented in {format_class(self)}")
return wrapper
[docs]
def withdoc(doc):
"""
A decorator that adds a docstring to the function it decorates.
This can be useful if the docstring is not static, and adding it afterwards
is not possible.
"""
def wrapper(function):
function.__doc__ = doc
return function
return wrapper
class EnumMeta(enum.Enum.__class__):
def __repr__(self):
return f"<enum {self.__module__}.{self.__qualname__}>"
[docs]
class Enum(enum.Enum, metaclass=EnumMeta):
"""
Subclass of :class:`enum.Enum` where class and instance :func:`repr` output
include the module and fully qualified class name.
"""
def __repr__(self):
return f"<{self.__class__.__module__}.{self.__class__.__qualname__}.{self.name}: {self.value!r}>"
[docs]
class IntEnum(enum.IntEnum, metaclass=EnumMeta):
"""
Subclass of :class:`enum.IntEnum` where class and instance :func:`repr`
output includes the module and fully qualified class name.
"""
def __repr__(self):
return f"<{self.__class__.__module__}.{self.__class__.__qualname__}.{self.name}: {self.value!r}>"
class _propclass_Meta(type):
def __new__(cls, name, bases, dict):
if bases == (property,):
# create propclass itself normally
return super(_propclass_Meta, cls).__new__(cls, name, bases, dict)
newdict = dict.copy()
newdict.pop("__get__", None)
newdict.pop("__set__", None)
newdict.pop("__delete__", None)
newdict.pop("__metaclass__", None)
self = type.__new__(cls, name, bases, newdict)
inst = self(
dict.get("__get__", None),
dict.get("__set__", None),
dict.get("__delete__", None),
dict.get("__doc__", None)
)
inst.__name__ = name
return inst
[docs]
class propclass(property, metaclass=_propclass_Meta):
'''
:class:`!propclass` provides an alternate way to define properties.
Subclassing :class:`!propclass` and defining methods :meth:`__get__`,
:meth:`__set__` and :meth:`__delete__` will automatically generate the
appropriate property::
class name(misc.propclass):
"""
The name property
"""
def __get__(self):
return self._name
def __set__(self, name):
self._name = name.lower()
def __delete__(self):
self._name = None
'''
[docs]
def exception_chain(exc):
"""
Traverses the chain of exceptions. This is a generator.
"""
while True:
yield exc
if exc.__cause__ is not None:
exc = exc.__cause__
elif exc.__context__ is not None and not exc.__suppress_context__:
exc = exc.__context__
else:
break
[docs]
class Pool:
"""
A :class:`!Pool` object can be used as an inheritable alternative to modules.
The attributes of a module can be put into a pool and each pool can have
base pools where lookup continues if an attribute can't be found.
"""
def __init__(self, *objects):
self._attrs = {}
self.bases = []
for object in objects:
self.register(object)
[docs]
def register(self, object):
r"""
Register ``object`` in the pool. ``object`` can be a module, a
dictionary or a :class:`!Pool` objects (with registers the pool as a base
pool). If ``object`` is a module and has an attribute :attr:`__bases__`
(being a sequence of other modules) this attribute will be used to
initialize ``self``\s base pool.
"""
if isinstance(object, types.ModuleType):
self.register(object.__dict__)
elif isinstance(object, dict):
for (key, value) in object.items():
if key == "__bases__":
for base in value:
if not isinstance(base, Pool):
base = self.__class__(base)
self.bases.append(base)
elif not isinstance(value, (types.ModuleType, dict)):
try:
self._attrs[key] = value
except TypeError:
pass
elif isinstance(object, Pool):
self.bases.append(object)
elif isinstance(object, type):
self._attrs[object.__name__] = object
def __getitem__(self, key):
try:
return self._attrs[key]
except KeyError:
for base in self.bases:
return base[key]
raise
def __getattr__(self, key):
try:
return self.__getitem__(key)
except KeyError:
raise AttributeError(key)
[docs]
def clear(self):
"""
Make ``self`` empty.
"""
self._attrs.clear()
del self.bases[:]
[docs]
def clone(self):
"""
Return a copy of ``self``.
"""
copy = self.__class__()
copy._attrs = self._attrs.copy()
copy.bases = self.bases[:]
return copy
def __repr__(self):
return f"<{self.__class__.__module__}.{self.__class__.__qualname__} object with {len(self._attrs):,} items at {id(self):#x}>"
[docs]
def iterone(item):
"""
Return an iterator that will produce one item: ``item``.
"""
yield item
[docs]
class Iterator:
"""
:class:`!Iterator` adds :meth:`__getitem__` support to an iterator. This is
done by calling :func:`item` internally.
"""
__slots__ = ("iterator", )
def __init__(self, iterator):
self.iterator = iterator
def __getitem__(self, index):
if isinstance(index, slice):
return list(self.iterator)[index]
default = object()
result = item(self, index, default)
if result is default:
raise IndexError(index)
return result
def __iter__(self):
return self
def __next__(self):
return next(self.iterator)
# We can't implement :meth:`__len__`, because if such an object is passed to
# :class:`list`, :meth:`__len__` would be called, exhausting the iterator
def __bool__(self):
for node in self:
return True
return False
[docs]
def get(self, index, default=None):
"""
Return the ``index``'th item from the iterator (or ``default`` if
there's no such item).
"""
return item(self, index, default)
[docs]
class Queue:
"""
:class:`!Queue` provides FIFO queues: The method :meth:`write` writes to the
queue and the method :meth:`read` read from the other end of the queue and
remove the characters read.
"""
def __init__(self):
self._buffer = ""
[docs]
def write(self, chars):
"""
Write the string ``chars`` to the buffer.
"""
self._buffer += chars
[docs]
def read(self, size=-1):
"""
Read up to ``size`` character from the buffer (or all if ``size``
is negative). Those characters will be removed from the buffer.
"""
if size < 0:
s = self._buffer
self._buffer = ""
return s
else:
s = self._buffer[:size]
self._buffer = self._buffer[size:]
return s
[docs]
class Const:
"""
This class can be used for singleton constants.
"""
__slots__ = ("_name", "_module")
def __init__(self, name, module=None):
self._name = name
self._module = module
def __repr__(self):
return f"{self._module or self.__module__}.{self._name}"
[docs]
class FlagAction(argparse.Action):
"""
:class:`!FlagAction` can be use with :mod:`argparse` for options that
represent flags. An options can have a value like ``yes`` or ``no`` for the
correspending boolean value, or if the value is omitted it is the inverted
default value (i.e. specifying the option toggles it).
"""
true_choices = ("1", "true", "yes", "on")
false_choices = ("0", "false", "no", "off")
def __init__(self, option_strings, dest, default=False, help=None):
super().__init__(option_strings=option_strings, dest=dest, default="yes" if default else "no", help=help, metavar="yes|no", const="no" if default else "yes", type=self.str2bool, nargs="?")
# implementing this prevents :meth:`__repr__` from generating an infinite recursion
def _get_kwargs(self):
return [(key, getattr(self, key)) for key in ("option_strings", "dest", "default", "help")]
def str2bool(self, value):
value = value.lower()
if value in self.true_choices:
return True
elif value in self.false_choices:
return False
else:
choices = ", ".join(self.true_choices + self.false_choices)
raise argparse.ArgumentTypeError(f"invalid flag value: {value!r} (use any of {choices})")
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, values)
[docs]
def tokenizepi(string):
"""
Tokenize the string object ``string`` according to the processing
instructions in the string. :func:`tokenize` will generate tuples with the
first item being the processing instruction target and the second being the
PI data. "Text" content (i.e. anything other than PIs) will be returned as
``(None, data)``.
"""
pos = 0
while True:
pos1 = string.find("<?", pos)
if pos1 < 0:
part = string[pos:]
if part:
yield (None, part)
return
pos2 = string.find("?>", pos1)
if pos2 < 0:
part = string[pos:]
if part:
yield (None, part)
return
part = string[pos:pos1]
if part:
yield (None, part)
part = string[pos1+2: pos2].strip()
parts = part.split(None, 1)
target = parts[0]
if len(parts) > 1:
data = parts[1]
else:
data = ""
yield (target, data)
pos = pos2+2
[docs]
def itersplitat(string, positions):
"""
Split ``string`` at the positions specified in ``positions``.
For example::
>>> from ll import misc
>>> import datetime
>>> datetime.datetime(*map(int, misc.itersplitat("20090609172345", (4, 6, 8, 10, 12))))
datetime.datetime(2009, 6, 9, 17, 23, 45)
This is a generator.
"""
curpos = 0
for pos in positions:
part = string[curpos:pos]
if part:
yield part
curpos = pos
part = string[curpos:]
if part:
yield part
# The following two function are copied from Python ``cgi`` module
# (which will be deprecated in Python 3.13)
def _parseparam(s):
while s[:1] == ';':
s = s[1:]
end = s.find(';')
while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2:
end = s.find(';', end + 1)
if end < 0:
end = len(s)
f = s[:end]
yield f.strip()
s = s[end:]
[docs]
def module(source, filename="unnamed.py", name=None):
"""
Create a module from the Python source code ``source``. ``filename``
will be used as the filename for the module and ``name`` as the module
name (defaulting to the filename part of ``filename``).
"""
if name is None:
name = os.path.splitext(os.path.basename(filename))[0]
mod = types.ModuleType(name)
mod.__file__ = filename
code = compile(source, filename, "exec")
exec(code, mod.__dict__)
return mod
[docs]
def javaexpr(obj):
"""
Return a Java expression for the object ``obj``.
Example::
>>> print(misc.javaexpr([1, 2, 3]))
java.util.Arrays.asList(1, 2, 3)
"""
from ll import ul4c
if obj is None:
return "null"
elif obj is True:
return "true"
elif obj is False:
return "false"
elif isinstance(obj, str):
if len(obj) > 10000: # Otherwise javac complains about ``constant string too long`` (the upper limit is 65535 UTF-8 bytes)
parts = "".join(f".append({javaexpr(obj[i:i+10000])})" for i in range(0, len(obj), 10000))
return f"new StringBuilder({len(obj)}){parts}.toString()"
else:
v = []
specialchars = {"\r": "\\r", "\n": "\\n", "\t": "\\t", "\f": "\\f", "\b": "\\b", '"': '\\"', "\\": "\\\\"}
for c in obj:
try:
v.append(specialchars[c])
except KeyError:
oc = ord(c)
v.append(c if 32 <= oc < 128 else f"\\u{oc:04x}")
string = "".join(v)
return f'"{string}"'
elif isinstance(obj, datetime.datetime): # check ``datetime`` before ``date``, as ``datetime`` is a subclass of ``date``
return f"com.livinglogic.ul4.FunctionDate.call({obj.year}, {obj.month}, {obj.day}, {obj.hour}, {obj.minute}, {obj.second}, {obj.microsecond})"
elif isinstance(obj, datetime.date):
return f"com.livinglogic.ul4.FunctionDate.call({obj.year}, {obj.month}, {obj.day})"
elif isinstance(obj, datetime.timedelta):
return f"com.livinglogic.ul4.FunctionTimeDelta.call({obj.days}, {obj.seconds}, {obj.microseconds})"
elif isinstance(obj, monthdelta):
return f"com.livinglogic.ul4.FunctionMonthDelta.call({obj.months()})"
elif isinstance(obj, color.Color):
return "new com.livinglogic.ul4.Color({obj[0]}, {obj[1]}, {obj[2]}, {obj[3]})"
elif isinstance(obj, float):
return repr(obj)
elif isinstance(obj, int):
if -0x80000000 <= obj <= 0x7fffffff:
return repr(obj)
elif -0x8000000000000000 <= obj <= 0x7fffffffffffffff:
return repr(obj) + "L"
else:
return f'new java.math.BigInteger("{obj}")'
return repr(obj)
elif isinstance(obj, abc.Sequence):
items = ", ".join(javaexpr(item) for item in obj)
return f"java.util.Arrays.asList({items})"
elif isinstance(obj, abc.Mapping):
items = ", ".join(f"{javaexpr(key)}, {javaexpr(value)}" for (key, value) in obj.items())
return f"com.livinglogic.utils.MapUtils.makeMap({items})"
elif isinstance(obj, abc.Set):
items = ", ".join(javaexpr(item) for item in obj)
return f"com.livinglogic.utils.SetUtils.makeSet({items})"
elif isinstance(obj, ul4c.UndefinedKey):
return f"new com.livinglogic.ul4.UndefinedKey(null, {javaexpr(obj.key)})"
elif isinstance(obj, ul4c.UndefinedVariable):
return f"new com.livinglogic.ul4.UndefinedVariable({javaexpr(obj.name)})"
elif isinstance(obj, ul4c.Template):
return obj.javasource()
else:
raise TypeError(f"can't handle object of type {type(obj)}")
[docs]
class SysInfo:
"""
A :class:`!SysInfo` object contains information about the host, user, python
version and script. Available attributes are ``host_name``, ``host_fqdn``,
``host_ip``, ``host_sysname``, ``host_nodename``, ``host_release``,
``host_version``, ``host_machine``, ``user_name``, ``user_uid``, ``user_gid``,
``user_gecos``, ``user_dir``, ``user_shell``, ``python_executable``,
``python_version``, ``pid``, ``script_name``, ``short_script_name`` and
``script_url``.
:class:`!SysInfo` object also support a mimimal dictionary interface (i.e.
:meth:`__getitem__` and :meth:`__iter__`).
One module global instance named ``sysinfo`` is created at module import
time.
"""
ul4_attrs = {"host_name", "host_fqdn", "host_ip", "host_sysname", "host_nodename", "host_release", "host_version", "host_machine", "user_name", "user_uid", "user_gid", "user_gecos", "user_dir", "user_shell", "python_executable", "python_version", "pid", "script_name", "short_script_name", "script_url"}
def __init__(self):
# Use ``object`` as a marker for "not initialized"
self._host_name = object
self._host_fqdn = object
self._host_ip = object
self._host_sysname = object
self._host_nodename = object
self._host_release = object
self._host_version = object
self._host_machine = object
self._host_processor = object
self._user_name = object
self._user_uid = object
self._user_gid = object
self._user_gecos = object
self._user_dir = object
self._user_shell = object
self._pid = object
self._script_name = object
self._short_script_name = object
self._script_url = object
@property
def host_name(self):
if self._host_name is object:
import socket
self._host_name = socket.gethostname()
return self._host_name
@property
def host_fqdn(self):
return self.host_name
@property
def host_ip(self):
if self._host_ip is object:
import socket
self._host_ip = socket.gethostbyname(self.host_name)
return self._host_ip
def _make_host_info(self):
(self._host_sysname, self._host_nodename, self._host_release, self._host_version, self._host_machine, self._host_processor) = platform.uname()
@property
def host_sysname(self):
if self._host_sysname is object:
self._make_host_info()
return self._host_sysname
@property
def host_nodename(self):
if self._host_nodename is object:
self._make_host_info()
return self._host_nodename
@property
def host_release(self):
if self._host_release is object:
self._make_host_info()
return self._host_release
@property
def host_version(self):
if self._host_version is object:
self._make_host_info()
return self._host_version
@property
def host_machine(self):
if self._host_machine is object:
self._make_host_info()
return self._host_machine
@property
def host_processor(self):
if self._host_processor is object:
self._make_host_info()
return self._host_processor
def _make_user_info(self):
try:
import pwd
except ImportError:
# We're probably on Windows
import getpass
self._user_name = getpass.getuser()
self._user_uid = None
self._user_gid = None
self._user_gecos = None
self._user_dir = os.path.expanduser("~")
self._user_shell = None
else:
(self._user_name, _, self._user_uid, self._user_gid, self._user_gecos, self._user_dir, self._user_shell) = pwd.getpwuid(os.getuid())
@property
def user_name(self):
if self._user_name is object:
self._make_user_info()
return self._user_name
@property
def user_uid(self):
if self._user_uid is object:
self._make_user_info()
return self._user_uid
@property
def user_gid(self):
if self._user_gid is object:
self._make_user_info()
return self._user_gid
@property
def user_gecos(self):
if self._user_gecos is object:
self._make_user_info()
return self._user_gecos
@property
def user_dir(self):
if self._user_dir is object:
self._make_user_info()
return self._user_dir
@property
def user_shell(self):
if self._user_shell is object:
self._make_user_info()
return self._user_shell
@property
def python_executable(self):
return sys.executable
@property
def python_version(self):
v = sys.version_info
if v.releaselevel != 'final':
return f"{v.major}.{v.minor}.{v.micro} {v.releaselevel}"
elif v.micro:
return f"{v.major}.{v.minor}.{v.micro}"
else:
return f"{v.major}.{v.minor}"
@property
def pid(self):
return os.getpid()
@property
def script_name(self):
if self._script_name is object:
main = sys.modules["__main__"]
if hasattr(main, "__file__"):
self._script_name = os.path.join(_curdir, main.__file__)
else:
self._script_name = "<shell>"
return self._script_name
@property
def short_script_name(self):
if self._short_script_name is object:
script_name = self.script_name
if script_name != "<shell>":
userhome = os.path.expanduser("~")
if script_name.startswith(userhome+"/"):
script_name = "~" + script_name[len(userhome):]
self._short_script_name = script_name
return self._short_script_name
@property
def script_url(self):
if self._script_url is object:
from ll import url
u = self.short_script_name
if u != "<shell>":
u = str(url.Ssh(self.user_name, self.host_fqdn or self.host_name, u))
self._script_url = u
return self._script_url
def __getitem__(self, key):
if key in self.ul4_attrs:
return getattr(self, key)
raise KeyError(key)
def __iter__(self):
return iter(self.ul4_attrs)
# Single instance
sysinfo = SysInfo()
[docs]
class monthdelta:
"""
:class:`!monthdelta` objects can be used to add months/years to a
:class:`datetime.datetime` or :class:`datetime.date` object. If the resulting
day falls out of the range of valid days for the target month, the last day
for the target month will be used instead::
>>> import datetime
>>> from ll import misc
>>> datetime.date(2000, 1, 31) + misc.monthdelta(1)
datetime.date(2000, 2, 29)
"""
ul4_type = ul4c.InstantiableType(None, "monthdelta", "A number of months")
ul4_attrs = {"months"}
__slots__ = ("_months",)
def __init__(self, months=0, /):
self._months = months
def __bool__(self):
return self._months != 0
def __hash__(self):
return self._months
def __eq__(self, other):
return isinstance(other, monthdelta) and self._months == other._months
def __ne__(self, other):
return not isinstance(other, monthdelta) or self._months != other._months
def __lt__(self, other):
if not isinstance(other, monthdelta):
raise TypeError(f"unorderable types: {format_class(self)}() < {format_class(other)}()")
return self._months < other._months
def __le__(self, other):
if not isinstance(other, monthdelta):
raise TypeError(f"unorderable types: {format_class(self)}() <= {format_class(other)}()")
return self._months <= other._months
def __gt__(self, other):
if not isinstance(other, monthdelta):
raise TypeError(f"unorderable types: {format_class(self)}() > {format_class(other)}()")
return self._months > other._months
def __ge__(self, other):
if not isinstance(other, monthdelta):
raise TypeError(f"unorderable types: {format_class(self)}() >= {format_class(other)}()")
return self._months >= other._months
def __add__(self, other):
if isinstance(other, monthdelta):
return monthdelta(self._months+other._months)
elif isinstance(other, (datetime.datetime, datetime.date)):
year = other.year
month = other.month + self._months
(years_add, month) = divmod(month-1, 12)
month += 1
year += years_add
day = other.day
while True:
try:
return other.replace(year=year, month=month, day=day)
except ValueError:
day -= 1
if day == 1:
raise
else:
return NotImplemented
def __radd__(self, other):
return self.__add__(other)
def __sub__(self, other):
if isinstance(other, monthdelta):
return monthdelta(self._months-other._months)
else:
return NotImplemented
def __rsub__(self, other):
return other + (-self)
def __neg__(self):
return monthdelta(-self._months)
def __abs__(self):
return monthdelta(abs(self._months))
def __mul__(self, other):
if isinstance(other, int) and not isinstance(other, monthdelta):
return monthdelta(self._months*other)
else:
return NotImplemented
def __rmul__(self, other):
return self.__mul__(other)
def __floordiv__(self, other):
if isinstance(other, int):
return monthdelta(self._months//other)
elif isinstance(other, monthdelta):
return self._months//other._months
else:
return NotImplemented
def __truediv__(self, other):
if isinstance(other, monthdelta):
return self._months/other._months
else:
return NotImplemented
def __str__(self):
m = self._months
return f"{m} month{'s' if m != 1 and m != -1 else ''}"
def __repr__(self):
m = self._months
if m:
return f"monthdelta({m!r})"
else:
return f"monthdelta()"
def months(self):
return self._months
[docs]
class Timeout(Exception):
"""
Exception that is raised when a timeout in :func:`timeout` occurs.
"""
def __init__(self, duration):
self.duration = duration
def __str__(self):
if isinstance(self.duration, (int, float)):
return f"timed out after {self.duration} seconds"
else:
return f"timed out after {self.duration}"
[docs]
@contextlib.contextmanager
def timeout(seconds):
"""
A context manager that limits the runtime of the wrapped code.
As this uses :mod:`signal`, this won't with threads and only on UNIX.
"""
def _timeouthandler(signum, frame):
raise Timeout(seconds)
oldsignal = signal.signal(signal.SIGALRM, _timeouthandler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, oldsignal)
[docs]
def notifystart():
"""
Notify OS X of the start of a process by removing the previous notification.
"""
cmd = [notifycmd, "-remove", sysinfo.script_name]
with open("/dev/null", "wb") as f:
status = subprocess.call(cmd, stdout=f)
[docs]
def notifyfinish(title, subtitle, message):
"""
Notify OS X of the end of a process.
"""
cmd = [notifycmd, "-title", title, "-subtitle", subtitle, "-message", message, "-group", sysinfo.script_name]
with open("/dev/null", "wb") as f:
status = subprocess.call(cmd, stdout=f)
[docs]
def prettycsv(rows, padding=" "):
"""
Format table ``rows``.
``rows`` must be a list of lists of strings (e.g. as produced by the
:mod:`csv` module). ``padding`` is the padding between columns.
:func:`!prettycsv` is a generator.
"""
def width(row, i):
try:
return len(row[i])
except IndexError:
return 0
maxlen = max(len(row) for row in rows)
lengths = [max(width(row, i) for row in rows) for i in range(maxlen)]
for row in rows:
lasti = len(row)-1
for (i, (w, f)) in enumerate(zip(lengths, row)):
if i:
yield padding
if i == lasti:
f = f.rstrip() # don't add padding to the last column
else:
f = f"{f:<{w}}"
yield f
yield "\n"
class JSMinUnterminatedComment(Exception):
pass
class JSMinUnterminatedStringLiteral(Exception):
pass
class JSMinUnterminatedRegularExpression(Exception):
pass
[docs]
def jsmin(input):
"""
Minimizes the Javascript source ``input``.
"""
indata = iter(input.replace("\r", "\n"))
# Copy the input to the output, deleting the characters which are
# insignificant to JavaScript. Comments will be removed. Tabs will be
# replaced with spaces. Carriage returns will be replaced with linefeeds.
# Most spaces and linefeeds will be removed.
class var:
a = "\n"
b = None
lookahead = None
outdata = []
def _get():
# Return the next character from the input. Watch out for lookahead. If
# the character is a control character, translate it to a space or linefeed.
c = var.lookahead
var.lookahead = None
if c is None:
try:
c = next(indata)
except StopIteration:
return "" # EOF
if c >= " " or c == "\n":
return c
return " "
def _peek():
var.lookahead = _get()
return var.lookahead
def isalphanum(c):
# Return true if the character is a letter, digit, underscore, dollar sign, or non-ASCII character.
return ('a' <= c <= 'z') or ('0' <= c <= '9') or ('A' <= c <= 'Z') or c in "_$\\" or (c is not None and ord(c) > 126)
def _next():
# Get the next character, excluding comments. peek() is used to see if an unescaped '/' is followed by a '/' or '*'.
c = _get()
if c == "/" and var.a != "\\":
p = _peek()
if p == "/":
c = _get()
while c > "\n":
c = _get()
return c
if p == "*":
c = _get()
while 1:
c = _get()
if c == "*":
if _peek() == "/":
_get()
return " "
if not c:
raise JSMinUnterminatedComment()
return c
def _action(action):
"""
Do something! What you do is determined by the argument:
1 Output A. Copy B to A. Get the next B.
2 Copy B to A. Get the next B. (Delete A).
3 Get the next B. (Delete B).
action treats a string as a single character. Wow!
action recognizes a regular expression if it is preceded by ( or , or =.
"""
if action <= 1:
outdata.append(var.a)
if action <= 2:
var.a = var.b
if var.a in "'\"":
while True:
outdata.append(var.a)
var.a = _get()
if var.a == var.b:
break
if var.a <= "\n":
raise JSMinUnterminatedStringLiteral()
if var.a == "\\":
outdata.append(var.a)
var.a = _get()
if action <= 3:
var.b = _next()
if var.b == "/" and var.a in "(,=:[?!&|;{}\n":
outdata.append(var.a)
outdata.append(var.b)
while True:
var.a = _get()
if var.a == "/":
break
elif var.a == "\\":
outdata.append(var.a)
var.a = _get()
elif var.a <= "\n":
raise JSMinUnterminatedRegularExpression()
outdata.append(var.a)
var.b = _next()
_action(3)
while var.a:
if var.a == " ":
_action(1 if isalphanum(var.b) else 2)
elif var.a == "\n":
if var.b in "{[(+-":
_action(1)
elif var.b == " ":
_action(3)
else:
_action(1 if isalphanum(var.b) else 2)
else:
if var.b == " ":
_action(1 if isalphanum(var.a) else 3)
elif var.b == "\n":
_action(1 if isalphanum(var.a) or var.a in "}])+-\"'" else 3)
else:
_action(1)
return "".join(outdata).lstrip()