Source code for ll.misc

# -*- coding: utf-8 -*-
# cython: language_level=3, always_allow_keywords=True

## Copyright 2004-2021 by LivingLogic AG, Bayreuth/Germany
## Copyright 2004-2021 by Walter Dörwald
##
## All Rights Reserved
##
## See ll/xist/__init__.py for the license


"""
:mod:`!ll.misc` contains various utility functions and classes used by the other
LivingLogic modules and packages.
"""


import sys, os, os.path, platform, types, datetime, collections, argparse, functools, signal, contextlib, subprocess, enum
from collections import abc

from ll import ul4c, color


__docformat__ = "reStructuredText"


# get the current directory as early as possible to minimize the chance that someone has called ``os.chdir()``
_curdir = os.getcwd()


notifycmd = os.environ.get("LL_MISC_NOTIFY", "/usr/local/bin/terminal-notifier")


# Try to fetch ``xmlescape`` from C implementation
try:
	from ll._misc import *
except ImportError:
	def xmlescape(string):
		"""
		Return a copy of the argument string, where every occurrence of ``<``,
		``>``, ``&``, ``\"``, ``'`` and every restricted character has been
		replaced with their XML character entity or character reference.
		"""

		if isinstance(string, str):
			return string.translate({0x00: '&#0;', 0x01: '&#1;', 0x02: '&#2;', 0x03: '&#3;', 0x04: '&#4;', 0x05: '&#5;', 0x06: '&#6;', 0x07: '&#7;', 0x08: '&#8;', 0x0b: '&#11;', 0x0c: '&#12;', 0x0e: '&#14;', 0x0f: '&#15;', 0x10: '&#16;', 0x11: '&#17;', 0x12: '&#18;', 0x13: '&#19;', 0x14: '&#20;', 0x15: '&#21;', 0x16: '&#22;', 0x17: '&#23;', 0x18: '&#24;', 0x19: '&#25;', 0x1a: '&#26;', 0x1b: '&#27;', 0x1c: '&#28;', 0x1d: '&#29;', 0x1e: '&#30;', 0x1f: '&#31;', 0x22: '&quot;', 0x26: '&amp;', 0x27: '&#39;', 0x3c: '&lt;', 0x3e: '&gt;', 0x7f: '&#127;', 0x80: '&#128;', 0x81: '&#129;', 0x82: '&#130;', 0x83: '&#131;', 0x84: '&#132;', 0x86: '&#134;', 0x87: '&#135;', 0x88: '&#136;', 0x89: '&#137;', 0x8a: '&#138;', 0x8b: '&#139;', 0x8c: '&#140;', 0x8d: '&#141;', 0x8e: '&#142;', 0x8f: '&#143;', 0x90: '&#144;', 0x91: '&#145;', 0x92: '&#146;', 0x93: '&#147;', 0x94: '&#148;', 0x95: '&#149;', 0x96: '&#150;', 0x97: '&#151;', 0x98: '&#152;', 0x99: '&#153;', 0x9a: '&#154;', 0x9b: '&#155;', 0x9c: '&#156;', 0x9d: '&#157;', 0x9e: '&#158;', 0x9f: '&#159;'})
		else:
			string = string.replace("&", "&amp;")
			string = string.replace("<", "&lt;")
			string = string.replace(">", "&gt;")
			string = string.replace("'", "&#39;")
			string = string.replace('"', "&quot;")
			for c in "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x0b\x0c\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1f\x7f\x80\x81\x82\x83\x84\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f":
				string = string.replace(c, f"&#{ord(c)};")
			return string

	def xmlescape_text(string):
		"""
		Return a copy of the argument string, where every occurrence of ``<``,
		``>``, ``&``, and every restricted character has been replaced with their
		XML character entity or character reference.
		"""

		if isinstance(string, str):
			return string.translate({0x00: '&#0;', 0x01: '&#1;', 0x02: '&#2;', 0x03: '&#3;', 0x04: '&#4;', 0x05: '&#5;', 0x06: '&#6;', 0x07: '&#7;', 0x08: '&#8;', 0x0b: '&#11;', 0x0c: '&#12;', 0x0e: '&#14;', 0x0f: '&#15;', 0x10: '&#16;', 0x11: '&#17;', 0x12: '&#18;', 0x13: '&#19;', 0x14: '&#20;', 0x15: '&#21;', 0x16: '&#22;', 0x17: '&#23;', 0x18: '&#24;', 0x19: '&#25;', 0x1a: '&#26;', 0x1b: '&#27;', 0x1c: '&#28;', 0x1d: '&#29;', 0x1e: '&#30;', 0x1f: '&#31;', 0x26: '&amp;', 0x3c: '&lt;', 0x3e: '&gt;', 0x7f: '&#127;', 0x80: '&#128;', 0x81: '&#129;', 0x82: '&#130;', 0x83: '&#131;', 0x84: '&#132;', 0x86: '&#134;', 0x87: '&#135;', 0x88: '&#136;', 0x89: '&#137;', 0x8a: '&#138;', 0x8b: '&#139;', 0x8c: '&#140;', 0x8d: '&#141;', 0x8e: '&#142;', 0x8f: '&#143;', 0x90: '&#144;', 0x91: '&#145;', 0x92: '&#146;', 0x93: '&#147;', 0x94: '&#148;', 0x95: '&#149;', 0x96: '&#150;', 0x97: '&#151;', 0x98: '&#152;', 0x99: '&#153;', 0x9a: '&#154;', 0x9b: '&#155;', 0x9c: '&#156;', 0x9d: '&#157;', 0x9e: '&#158;', 0x9f: '&#159;'})
		else:
			string = string.replace("&", "&amp;")
			string = string.replace("<", "&lt;")
			string = string.replace(">", "&gt;")
			for c in "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x0b\x0c\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1f\x7f\x80\x81\x82\x83\x84\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f":
				string = string.replace(c, f"&#{ord(c)};")
			return string

	def xmlescape_attr(string):
		"""
		Return a copy of the argument string, where every occurrence of ``<``,
		``>``, ``&``, ``"`` and every restricted character has been replaced with
		their XML character entity or character reference.
		"""

		if isinstance(string, str):
			return string.translate({0x00: '&#0;', 0x01: '&#1;', 0x02: '&#2;', 0x03: '&#3;', 0x04: '&#4;', 0x05: '&#5;', 0x06: '&#6;', 0x07: '&#7;', 0x08: '&#8;', 0x0b: '&#11;', 0x0c: '&#12;', 0x0e: '&#14;', 0x0f: '&#15;', 0x10: '&#16;', 0x11: '&#17;', 0x12: '&#18;', 0x13: '&#19;', 0x14: '&#20;', 0x15: '&#21;', 0x16: '&#22;', 0x17: '&#23;', 0x18: '&#24;', 0x19: '&#25;', 0x1a: '&#26;', 0x1b: '&#27;', 0x1c: '&#28;', 0x1d: '&#29;', 0x1e: '&#30;', 0x1f: '&#31;', 0x22: '&quot;', 0x26: '&amp;', 0x3c: '&lt;', 0x3e: '&gt;', 0x7f: '&#127;', 0x80: '&#128;', 0x81: '&#129;', 0x82: '&#130;', 0x83: '&#131;', 0x84: '&#132;', 0x86: '&#134;', 0x87: '&#135;', 0x88: '&#136;', 0x89: '&#137;', 0x8a: '&#138;', 0x8b: '&#139;', 0x8c: '&#140;', 0x8d: '&#141;', 0x8e: '&#142;', 0x8f: '&#143;', 0x90: '&#144;', 0x91: '&#145;', 0x92: '&#146;', 0x93: '&#147;', 0x94: '&#148;', 0x95: '&#149;', 0x96: '&#150;', 0x97: '&#151;', 0x98: '&#152;', 0x99: '&#153;', 0x9a: '&#154;', 0x9b: '&#155;', 0x9c: '&#156;', 0x9d: '&#157;', 0x9e: '&#158;', 0x9f: '&#159;'})
		else:
			string = string.replace("&", "&amp;")
			string = string.replace("<", "&lt;")
			string = string.replace(">", "&gt;")
			string = string.replace('"', "&quot;")
			for c in "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x0b\x0c\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1f\x7f\x80\x81\x82\x83\x84\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f":
				string = string.replace(c, f"&#{ord(c)};")
			return string


[docs]def item(iterable, index, /, default=None): """ Returns the ``index``'th item from the iterable. ``index`` may be negative to count from the end. E.g. 0 returns the first item produced by the iterator, 1 the second, -1 the last one etc. If ``index`` is negative the iterator will be completely exhausted, if it's positive it will be exhausted up to the ``index``'th item. If the iterator doesn't produce that many items ``default`` will be returned. ``index`` may also be an iterable of indexes, in which case :func:`!item` will be applied recursively, i.e. ``item(["foo", "bar"], (1, -1))`` returns ``'r'``. """ if isinstance(index, int): index = (index,) for i in index: if i >= 0: for item in iterable: if not i: iterable = item break i -= 1 else: return default else: i = -i cache = collections.deque() for item in iterable: cache.append(item) if len(cache) > i: cache.popleft() if len(cache) == i: iterable = cache.popleft() else: return default return iterable
[docs]def first(iterable, /, default=None): """ Return the first item from the iterable. If the iterator doesn't produce any items ``default`` will be returned. """ for item in iterable: return item return default
[docs]def last(iterable, /, default=None): """ Return the last item from the iterable. If the iterator doesn't produce any items ``default`` will be returned. """ item = default for item in iterable: pass return item
[docs]def count(iterable, /): """ Count the number of items produced by the iterable. Calling this function will exhaust the iterator. """ count = 0 for item in iterable: count += 1 return count
[docs]def isfirst(iterable, /): """ Iterate through items of the iterable and give information about whether the item is the first in the iterable:: >>> list(misc.isfirst("foo")) [(True, 'f'), (False, 'o'), (False, 'o')] """ first = True for item in iterable: yield (first, item) first = False
[docs]def islast(iterable, /): """ Iterate through items of the iterable and give information about whether the item is the last in the iterable:: >>> list(misc.islast("foo")) [(False, 'f'), (False, 'o'), (True, 'o')] """ lastitem = None it = iter(iterable) try: item = next(it) except StopIteration: return while True: try: (lastitem, item) = (item, next(it)) except StopIteration: yield (True, item) # Items haven't been swapped yet return else: yield (False, lastitem)
[docs]def isfirstlast(iterable, /): """ Iterate through items of the iterable and give information about whether the item is the first and/or last in the iterable:: >>> list(misc.isfirstlast("foo")) [(True, False, 'f'), (False, False, 'o'), (False, True, 'o')] """ lastitem = None first = True it = iter(iterable) try: item = next(it) except StopIteration: return while True: try: (lastitem, item) = (item, next(it)) except StopIteration: yield (first, True, item) # Items haven't been swapped yet return else: yield (first, False, lastitem) first = False
[docs]def notimplemented(function): """ A decorator that raises :exc:`NotImplementedError` when the method is called. This saves you the trouble of formatting the error message yourself for each implementation. """ @functools.wraps(function) def wrapper(self, *args, **kwargs): raise NotImplementedError(f"method {function.__name__}() not implemented in {format_class(self)}") return wrapper
[docs]def withdoc(doc): """ A decorator that adds a docstring to the function it decorates. This can be useful if the docstring is not static, and adding it afterwards is not possible. """ def wrapper(function): function.__doc__ = doc return function return wrapper
class EnumMeta(enum.Enum.__class__): def __repr__(self): return f"<enum {self.__module__}.{self.__qualname__}>"
[docs]class Enum(enum.Enum, metaclass=EnumMeta): """ Subclass of :class:`enum.Enum` where class and instance :func:`repr` output include the module and fully qualified class name. """ def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__}.{self.name}: {self.value!r}>"
[docs]class IntEnum(enum.IntEnum, metaclass=EnumMeta): """ Subclass of :class:`enum.IntEnum` where class and instance :func:`repr` output includes the module and fully qualified class name. """ def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__}.{self.name}: {self.value!r}>"
class _propclass_Meta(type): def __new__(cls, name, bases, dict): if bases == (property,): # create propclass itself normally return super(_propclass_Meta, cls).__new__(cls, name, bases, dict) newdict = dict.copy() newdict.pop("__get__", None) newdict.pop("__set__", None) newdict.pop("__delete__", None) newdict.pop("__metaclass__", None) self = type.__new__(cls, name, bases, newdict) inst = self( dict.get("__get__", None), dict.get("__set__", None), dict.get("__delete__", None), dict.get("__doc__", None) ) inst.__name__ = name return inst
[docs]class propclass(property, metaclass=_propclass_Meta): ''' :class:`!propclass` provides an alternate way to define properties. Subclassing :class:`!propclass` and defining methods :meth:`__get__`, :meth:`__set__` and :meth:`__delete__` will automatically generate the appropriate property:: class name(misc.propclass): """ The name property """ def __get__(self): return self._name def __set__(self, name): self._name = name.lower() def __delete__(self): self._name = None '''
[docs]def format_class(obj): """ Format the name of the class of ``obj``. Example:: >>> misc.format_class(42) 'int' >>> misc.format_class(open('README.rst', 'rb')) '_io.BufferedReader' """ if obj.__class__.__module__ not in ("builtins", "exceptions"): return f"{obj.__class__.__module__}.{obj.__class__.__qualname__}" else: return obj.__class__.__qualname__
[docs]def format_exception(exc): """ Format an exception object. Example:: >>> misc.format_exception(ValueError("bad value")) 'ValueError: bad value' """ try: strexc = str(exc).strip() if strexc: return f"{format_class(exc)}: {strexc}" else: return format_class(exc) except UnicodeError: return f"{format_class(exc)}: ?"
[docs]def exception_chain(exc): """ Traverses the chain of exceptions. This is a generator. """ while True: yield exc if exc.__cause__ is not None: exc = exc.__cause__ elif exc.__context__ is not None and not exc.__suppress_context__: exc = exc.__context__ else: break
[docs]class Pool: """ A :class:`!Pool` object can be used as an inheritable alternative to modules. The attributes of a module can be put into a pool and each pool can have base pools where lookup continues if an attribute can't be found. """ def __init__(self, *objects): self._attrs = {} self.bases = [] for object in objects: self.register(object)
[docs] def register(self, object): r""" Register ``object`` in the pool. ``object`` can be a module, a dictionary or a :class:`!Pool` objects (with registers the pool as a base pool). If ``object`` is a module and has an attribute :attr:`__bases__` (being a sequence of other modules) this attribute will be used to initialize ``self``\s base pool. """ if isinstance(object, types.ModuleType): self.register(object.__dict__) elif isinstance(object, dict): for (key, value) in object.items(): if key == "__bases__": for base in value: if not isinstance(base, Pool): base = self.__class__(base) self.bases.append(base) elif not isinstance(value, (types.ModuleType, dict)): try: self._attrs[key] = value except TypeError: pass elif isinstance(object, Pool): self.bases.append(object) elif isinstance(object, type): self._attrs[object.__name__] = object
def __getitem__(self, key): try: return self._attrs[key] except KeyError: for base in self.bases: return base[key] raise def __getattr__(self, key): try: return self.__getitem__(key) except KeyError: raise AttributeError(key)
[docs] def clear(self): """ Make ``self`` empty. """ self._attrs.clear() del self.bases[:]
[docs] def clone(self): """ Return a copy of ``self``. """ copy = self.__class__() copy._attrs = self._attrs.copy() copy.bases = self.bases[:] return copy
def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__} object with {len(self._attrs):,} items at {id(self):#x}>"
[docs]def iterone(item): """ Return an iterator that will produce one item: ``item``. """ yield item
[docs]class Iterator: """ :class:`!Iterator` adds :meth:`__getitem__` support to an iterator. This is done by calling :func:`item` internally. """ __slots__ = ("iterator", ) def __init__(self, iterator): self.iterator = iterator def __getitem__(self, index): if isinstance(index, slice): return list(self.iterator)[index] default = object() result = item(self, index, default) if result is default: raise IndexError(index) return result def __iter__(self): return self def __next__(self): return next(self.iterator) # We can't implement :meth:`__len__`, because if such an object is passed to # :class:`list`, :meth:`__len__` would be called, exhausting the iterator def __bool__(self): for node in self: return True return False
[docs] def get(self, index, default=None): """ Return the ``index``'th item from the iterator (or ``default`` if there's no such item). """ return item(self, index, default)
[docs]class Queue: """ :class:`!Queue` provides FIFO queues: The method :meth:`write` writes to the queue and the method :meth:`read` read from the other end of the queue and remove the characters read. """ def __init__(self): self._buffer = ""
[docs] def write(self, chars): """ Write the string ``chars`` to the buffer. """ self._buffer += chars
[docs] def read(self, size=-1): """ Read up to ``size`` character from the buffer (or all if ``size`` is negative). Those characters will be removed from the buffer. """ if size < 0: s = self._buffer self._buffer = "" return s else: s = self._buffer[:size] self._buffer = self._buffer[size:] return s
[docs]class Const: """ This class can be used for singleton constants. """ __slots__ = ("_name", "_module") def __init__(self, name, module=None): self._name = name self._module = module def __repr__(self): return f"{self._module or self.__module__}.{self._name}"
[docs]class FlagAction(argparse.Action): """ :class:`!FlagAction` can be use with :mod:`argparse` for options that represent flags. An options can have a value like ``yes`` or ``no`` for the correspending boolean value, or if the value is omitted it is the inverted default value (i.e. specifying the option toggles it). """ true_choices = ("1", "true", "yes", "on") false_choices = ("0", "false", "no", "off") def __init__(self, option_strings, dest, default=False, help=None): super().__init__(option_strings=option_strings, dest=dest, default="yes" if default else "no", help=help, metavar="yes|no", const="no" if default else "yes", type=self.str2bool, nargs="?") # implementing this prevents :meth:`__repr__` from generating an infinite recursion def _get_kwargs(self): return [(key, getattr(self, key)) for key in ("option_strings", "dest", "default", "help")] def str2bool(self, value): value = value.lower() if value in self.true_choices: return True elif value in self.false_choices: return False else: choices = ", ".join(self.true_choices + self.false_choices) raise argparse.ArgumentTypeError(f"invalid flag value: {value!r} (use any of {choices})") def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, values)
[docs]def tokenizepi(string): """ Tokenize the string object ``string`` according to the processing instructions in the string. :func:`tokenize` will generate tuples with the first item being the processing instruction target and the second being the PI data. "Text" content (i.e. anything other than PIs) will be returned as ``(None, data)``. """ pos = 0 while True: pos1 = string.find("<?", pos) if pos1 < 0: part = string[pos:] if part: yield (None, part) return pos2 = string.find("?>", pos1) if pos2 < 0: part = string[pos:] if part: yield (None, part) return part = string[pos:pos1] if part: yield (None, part) part = string[pos1+2: pos2].strip() parts = part.split(None, 1) target = parts[0] if len(parts) > 1: data = parts[1] else: data = "" yield (target, data) pos = pos2+2
[docs]def itersplitat(string, positions): """ Split ``string`` at the positions specified in ``positions``. For example:: >>> from ll import misc >>> import datetime >>> datetime.datetime(*map(int, misc.itersplitat("20090609172345", (4, 6, 8, 10, 12)))) datetime.datetime(2009, 6, 9, 17, 23, 45) This is a generator. """ curpos = 0 for pos in positions: part = string[curpos:pos] if part: yield part curpos = pos part = string[curpos:] if part: yield part
[docs]def module(source, filename="unnamed.py", name=None): """ Create a module from the Python source code ``source``. ``filename`` will be used as the filename for the module and ``name`` as the module name (defaulting to the filename part of ``filename``). """ if name is None: name = os.path.splitext(os.path.basename(filename))[0] mod = types.ModuleType(name) mod.__file__ = filename code = compile(source, filename, "exec") exec(code, mod.__dict__) return mod
[docs]def javaexpr(obj): """ Return a Java expression for the object ``obj``. Example:: >>> print(misc.javaexpr([1, 2, 3])) java.util.Arrays.asList(1, 2, 3) """ from ll import ul4c if obj is None: return "null" elif obj is True: return "true" elif obj is False: return "false" elif isinstance(obj, str): if len(obj) > 10000: # Otherwise javac complains about ``constant string too long`` (the upper limit is 65535 UTF-8 bytes) parts = "".join(f".append({javaexpr(obj[i:i+10000])})" for i in range(0, len(obj), 10000)) return f"new StringBuilder({len(obj)}){parts}.toString()" else: v = [] specialchars = {"\r": "\\r", "\n": "\\n", "\t": "\\t", "\f": "\\f", "\b": "\\b", '"': '\\"', "\\": "\\\\"} for c in obj: try: v.append(specialchars[c]) except KeyError: oc = ord(c) v.append(c if 32 <= oc < 128 else f"\\u{oc:04x}") string = "".join(v) return f'"{string}"' elif isinstance(obj, datetime.datetime): # check ``datetime`` before ``date``, as ``datetime`` is a subclass of ``date`` return f"com.livinglogic.ul4.FunctionDate.call({obj.year}, {obj.month}, {obj.day}, {obj.hour}, {obj.minute}, {obj.second}, {obj.microsecond})" elif isinstance(obj, datetime.date): return f"com.livinglogic.ul4.FunctionDate.call({obj.year}, {obj.month}, {obj.day})" elif isinstance(obj, datetime.timedelta): return f"com.livinglogic.ul4.FunctionTimeDelta.call({obj.days}, {obj.seconds}, {obj.microseconds})" elif isinstance(obj, monthdelta): return f"com.livinglogic.ul4.FunctionMonthDelta.call({obj.months()})" elif isinstance(obj, color.Color): return "new com.livinglogic.ul4.Color({obj[0]}, {obj[1]}, {obj[2]}, {obj[3]})" elif isinstance(obj, float): return repr(obj) elif isinstance(obj, int): if -0x80000000 <= obj <= 0x7fffffff: return repr(obj) elif -0x8000000000000000 <= obj <= 0x7fffffffffffffff: return repr(obj) + "L" else: return f'new java.math.BigInteger("{obj}")' return repr(obj) elif isinstance(obj, abc.Sequence): items = ", ".join(javaexpr(item) for item in obj) return f"java.util.Arrays.asList({items})" elif isinstance(obj, abc.Mapping): items = ", ".join(f"{javaexpr(key)}, {javaexpr(value)}" for (key, value) in obj.items()) return f"com.livinglogic.utils.MapUtils.makeMap({items})" elif isinstance(obj, abc.Set): items = ", ".join(javaexpr(item) for item in obj) return f"com.livinglogic.utils.SetUtils.makeSet({items})" elif isinstance(obj, ul4c.UndefinedKey): return f"new com.livinglogic.ul4.UndefinedKey({javaexpr(obj._key)})" elif isinstance(obj, ul4c.UndefinedVariable): return f"new com.livinglogic.ul4.UndefinedVariable({javaexpr(obj._name)})" elif isinstance(obj, ul4c.Template): return obj.javasource() else: raise TypeError(f"can't handle object of type {type(obj)}")
[docs]class SysInfo: """ A :class:`!SysInfo` object contains information about the host, user, python version and script. Available attributes are ``host_name``, ``host_fqdn``, ``host_ip``, ``host_sysname``, ``host_nodename``, ``host_release``, ``host_version``, ``host_machine``, ``user_name``, ``user_uid``, ``user_gid``, ``user_gecos``, ``user_dir``, ``user_shell``, ``python_executable``, ``python_version``, ``pid``, ``script_name``, ``short_script_name`` and ``script_url``. :class:`!SysInfo` object also support a mimimal dictionary interface (i.e. :meth:`__getitem__` and :meth:`__iter__`). One module global instance named ``sysinfo`` is created at module import time. """ ul4_attrs = {"host_name", "host_fqdn", "host_ip", "host_sysname", "host_nodename", "host_release", "host_version", "host_machine", "user_name", "user_uid", "user_gid", "user_gecos", "user_dir", "user_shell", "python_executable", "python_version", "pid", "script_name", "short_script_name", "script_url"} def __init__(self): # Use ``object`` as a marker for "not initialized" self._host_name = object self._host_fqdn = object self._host_ip = object self._host_sysname = object self._host_nodename = object self._host_release = object self._host_version = object self._host_machine = object self._host_processor = object self._user_name = object self._user_uid = object self._user_gid = object self._user_gecos = object self._user_dir = object self._user_shell = object self._pid = object self._script_name = object self._short_script_name = object self._script_url = object @property def host_name(self): if self._host_name is object: import socket self._host_name = socket.gethostname() return self._host_name @property def host_fqdn(self): return self.host_name @property def host_ip(self): if self._host_ip is object: import socket self._host_ip = socket.gethostbyname(self.host_name) return self._host_ip def _make_host_info(self): (self._host_sysname, self._host_nodename, self._host_release, self._host_version, self._host_machine, self._host_processor) = platform.uname() @property def host_sysname(self): if self._host_sysname is object: self._make_host_info() return self._host_sysname @property def host_nodename(self): if self._host_nodename is object: self._make_host_info() return self._host_nodename @property def host_release(self): if self._host_release is object: self._make_host_info() return self._host_release @property def host_version(self): if self._host_version is object: self._make_host_info() return self._host_version @property def host_machine(self): if self._host_machine is object: self._make_host_info() return self._host_machine @property def host_processor(self): if self._host_processor is object: self._make_host_info() return self._host_processor def _make_user_info(self): try: import pwd except ImportError: # We're probably on Windows import getpass self._user_name = getpass.getuser() self._user_uid = None self._user_gid = None self._user_gecos = None self._user_dir = os.path.expanduser("~") self._user_shell = None else: (self._user_name, _, self._user_uid, self._user_gid, self._user_gecos, self._user_dir, self._user_shell) = pwd.getpwuid(os.getuid()) @property def user_name(self): if self._user_name is object: self._make_user_info() return self._user_name @property def user_uid(self): if self._user_uid is object: self._make_user_info() return self._user_uid @property def user_gid(self): if self._user_gid is object: self._make_user_info() return self._user_gid @property def user_gecos(self): if self._user_gecos is object: self._make_user_info() return self._user_gecos @property def user_dir(self): if self._user_dir is object: self._make_user_info() return self._user_dir @property def user_shell(self): if self._user_shell is object: self._make_user_info() return self._user_shell @property def python_executable(self): return sys.executable @property def python_version(self): v = sys.version_info if v.releaselevel != 'final': return f"{v.major}.{v.minor}.{v.micro} {v.releaselevel}" elif v.micro: return f"{v.major}.{v.minor}.{v.micro}" else: return f"{v.major}.{v.minor}" @property def pid(self): return os.getpid() @property def script_name(self): if self._script_name is object: main = sys.modules["__main__"] if hasattr(main, "__file__"): self._script_name = os.path.join(_curdir, main.__file__) else: self._script_name = "<shell>" return self._script_name @property def short_script_name(self): if self._short_script_name is object: script_name = self.script_name if script_name != "<shell>": userhome = os.path.expanduser("~") if script_name.startswith(userhome+"/"): script_name = "~" + script_name[len(userhome):] self._short_script_name = script_name return self._short_script_name @property def script_url(self): if self._script_url is object: from ll import url u = self.short_script_name if u != "<shell>": u = str(url.Ssh(self.user_name, self.host_fqdn or self.host_name, u)) self._script_url = u return self._script_url def __getitem__(self, key): if key in self.ul4_attrs: return getattr(self, key) raise KeyError(key) def __iter__(self): return iter(self.ul4_attrs)
# Single instance sysinfo = SysInfo()
[docs]class monthdelta: """ :class:`!monthdelta` objects can be used to add months/years to a :class:`datetime.datetime` or :class:`datetime.date` object. If the resulting day falls out of the range of valid days for the target month, the last day for the target month will be used instead:: >>> import datetime >>> from ll import misc >>> datetime.date(2000, 1, 31) + misc.monthdelta(1) datetime.date(2000, 2, 29) """ ul4_type = ul4c.InstantiableType(None, "monthdelta", "A number of months") ul4_attrs = {"months"} __slots__ = ("_months",) def __init__(self, months=0, /): self._months = months def __bool__(self): return self._months != 0 def __hash__(self): return self._months def __eq__(self, other): return isinstance(other, monthdelta) and self._months == other._months def __ne__(self, other): return not isinstance(other, monthdelta) or self._months != other._months def __lt__(self, other): if not isinstance(other, monthdelta): raise TypeError(f"unorderable types: {format_class(self)}() < {format_class(other)}()") return self._months < other._months def __le__(self, other): if not isinstance(other, monthdelta): raise TypeError(f"unorderable types: {format_class(self)}() <= {format_class(other)}()") return self._months <= other._months def __gt__(self, other): if not isinstance(other, monthdelta): raise TypeError(f"unorderable types: {format_class(self)}() > {format_class(other)}()") return self._months > other._months def __ge__(self, other): if not isinstance(other, monthdelta): raise TypeError(f"unorderable types: {format_class(self)}() >= {format_class(other)}()") return self._months >= other._months def __add__(self, other): if isinstance(other, monthdelta): return monthdelta(self._months+other._months) elif isinstance(other, (datetime.datetime, datetime.date)): year = other.year month = other.month + self._months (years_add, month) = divmod(month-1, 12) month += 1 year += years_add day = other.day while True: try: return other.replace(year=year, month=month, day=day) except ValueError: day -= 1 if day == 1: raise else: return NotImplemented def __radd__(self, other): return self.__add__(other) def __sub__(self, other): if isinstance(other, monthdelta): return monthdelta(self._months-other._months) else: return NotImplemented def __rsub__(self, other): return other + (-self) def __neg__(self): return monthdelta(-self._months) def __abs__(self): return monthdelta(abs(self._months)) def __mul__(self, other): if isinstance(other, int) and not isinstance(other, monthdelta): return monthdelta(self._months*other) else: return NotImplemented def __rmul__(self, other): return self.__mul__(other) def __floordiv__(self, other): if isinstance(other, int): return monthdelta(self._months//other) elif isinstance(other, monthdelta): return self._months//other._months else: return NotImplemented def __truediv__(self, other): if isinstance(other, monthdelta): return self._months/other._months else: return NotImplemented def __str__(self): m = self._months return f"{m} month{'s' if m != 1 and m != -1 else ''}" def __repr__(self): m = self._months if m: return f"monthdelta({m!r})" else: return f"monthdelta()" def months(self): return self._months
[docs]class Timeout(Exception): """ Exception that is raised when a timeout in :func:`timeout` occurs. """ def __init__(self, duration): self.duration = duration def __str__(self): if isinstance(self.duration, (int, float)): return f"timed out after {self.duration} seconds" else: return f"timed out after {self.duration}"
[docs]@contextlib.contextmanager def timeout(seconds): """ A context manager that limits the runtime of the wrapped code. As this uses :mod:`signal`, this won't with threads and only on UNIX. """ def _timeouthandler(signum, frame): raise Timeout(seconds) oldsignal = signal.signal(signal.SIGALRM, _timeouthandler) signal.alarm(seconds) try: yield finally: signal.alarm(0) signal.signal(signal.SIGALRM, oldsignal)
[docs]def notifystart(): """ Notify OS X of the start of a process by removing the previous notification. """ cmd = [notifycmd, "-remove", sysinfo.script_name] with open("/dev/null", "wb") as f: status = subprocess.call(cmd, stdout=f)
[docs]def notifyfinish(title, subtitle, message): """ Notify OS X of the end of a process. """ cmd = [notifycmd, "-title", title, "-subtitle", subtitle, "-message", message, "-group", sysinfo.script_name] with open("/dev/null", "wb") as f: status = subprocess.call(cmd, stdout=f)
[docs]def prettycsv(rows, padding=" "): """ Format table ``rows``. ``rows`` must be a list of lists of strings (e.g. as produced by the :mod:`csv` module). ``padding`` is the padding between columns. :func:`!prettycsv` is a generator. """ def width(row, i): try: return len(row[i]) except IndexError: return 0 maxlen = max(len(row) for row in rows) lengths = [max(width(row, i) for row in rows) for i in range(maxlen)] for row in rows: lasti = len(row)-1 for (i, (w, f)) in enumerate(zip(lengths, row)): if i: yield padding if i == lasti: f = f.rstrip() # don't add padding to the last column else: f = f"{f:<{w}}" yield f yield "\n"
class JSMinUnterminatedComment(Exception): pass class JSMinUnterminatedStringLiteral(Exception): pass class JSMinUnterminatedRegularExpression(Exception): pass
[docs]def jsmin(input): """ Minimizes the Javascript source ``input``. """ indata = iter(input.replace("\r", "\n")) # Copy the input to the output, deleting the characters which are # insignificant to JavaScript. Comments will be removed. Tabs will be # replaced with spaces. Carriage returns will be replaced with linefeeds. # Most spaces and linefeeds will be removed. class var: a = "\n" b = None lookahead = None outdata = [] def _get(): # Return the next character from the input. Watch out for lookahead. If # the character is a control character, translate it to a space or linefeed. c = var.lookahead var.lookahead = None if c is None: try: c = next(indata) except StopIteration: return "" # EOF if c >= " " or c == "\n": return c return " " def _peek(): var.lookahead = _get() return var.lookahead def isalphanum(c): # Return true if the character is a letter, digit, underscore, dollar sign, or non-ASCII character. return ('a' <= c <= 'z') or ('0' <= c <= '9') or ('A' <= c <= 'Z') or c in "_$\\" or (c is not None and ord(c) > 126) def _next(): # Get the next character, excluding comments. peek() is used to see if an unescaped '/' is followed by a '/' or '*'. c = _get() if c == "/" and var.a != "\\": p = _peek() if p == "/": c = _get() while c > "\n": c = _get() return c if p == "*": c = _get() while 1: c = _get() if c == "*": if _peek() == "/": _get() return " " if not c: raise JSMinUnterminatedComment() return c def _action(action): """ Do something! What you do is determined by the argument: 1 Output A. Copy B to A. Get the next B. 2 Copy B to A. Get the next B. (Delete A). 3 Get the next B. (Delete B). action treats a string as a single character. Wow! action recognizes a regular expression if it is preceded by ( or , or =. """ if action <= 1: outdata.append(var.a) if action <= 2: var.a = var.b if var.a in "'\"": while True: outdata.append(var.a) var.a = _get() if var.a == var.b: break if var.a <= "\n": raise JSMinUnterminatedStringLiteral() if var.a == "\\": outdata.append(var.a) var.a = _get() if action <= 3: var.b = _next() if var.b == "/" and var.a in "(,=:[?!&|;{}\n": outdata.append(var.a) outdata.append(var.b) while True: var.a = _get() if var.a == "/": break elif var.a == "\\": outdata.append(var.a) var.a = _get() elif var.a <= "\n": raise JSMinUnterminatedRegularExpression() outdata.append(var.a) var.b = _next() _action(3) while var.a: if var.a == " ": _action(1 if isalphanum(var.b) else 2) elif var.a == "\n": if var.b in "{[(+-": _action(1) elif var.b == " ": _action(3) else: _action(1 if isalphanum(var.b) else 2) else: if var.b == " ": _action(1 if isalphanum(var.a) else 3) elif var.b == "\n": _action(1 if isalphanum(var.a) or var.a in "}])+-\"'" else 3) else: _action(1) return "".join(outdata).lstrip()