# -*- coding: utf-8 -*-
# cython: language_level=3, always_allow_keywords=True
## Copyright 1999-2024 by LivingLogic AG, Bayreuth/Germany
## Copyright 1999-2024 by Walter Dörwald
##
## All Rights Reserved
##
## See ll/xist/__init__.py for the license
"""
:mod:`!ll.url` contains an :rfc:`2396` compliant implementation of URLs and
classes for accessing resource metadata as well as file like classes for
reading and writing resource data.
These three levels of functionality are implemented in three classes:
:class:`URL`
:class:`URL` objects are the names of resources and can be used and
modified, regardless of the fact whether these resources actually exist.
:class:`URL` objects never hits the hard drive or the net.
:class:`Connection`
:class:`Connection` objects contain functionality that accesses and
changes file metadata (like last modified date, permission bits,
directory structure etc.). A connection object can be created by calling
the :meth:`connect` method on a :class:`URL` object.
:class:`Resource`
:class:`Resource` objects are file like objects that work with the actual
bytes that make up the file data. This functionality lives in the
:class:`Resource` class and its subclasses. Creating a resource is done
by calling the :meth:`open` method on a :class:`Connection` or a
:class:`URL`.
Module documentation
--------------------
"""
import os, urllib.request, urllib.error, urllib.parse as urlparse, mimetypes, io, warnings
import datetime, re, fnmatch, pickle, errno, threading
import email
from email import utils
default_ssh_python = os.environ.get("LL_URL_SSH_PYTHON")
# don't fail when :mod:`pwd` or :mod:`grp` can't be imported, because if this
# doesn't work, we're probably on Windows and :func:`os.chown` won't work anyway.
try:
import pwd, grp
except ImportError:
pass
try:
import execnet
except ImportError:
pass
try:
from PIL import Image
except ImportError:
pass
from ll import misc
__docformat__ = "reStructuredText"
def mime2dt(s):
return datetime.datetime(*utils.parsedate(s)[:7])
weekdayname = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
monthname = [None, "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
[docs]
def httpdate(dt):
"""
Return a string suitable for a "Last-Modified" and "Expires" header.
``dt`` is a :class:`datetime.datetime` object in UTC.
"""
return f"{weekdayname[dt.weekday()]}, {dt.day:02d} {monthname[dt.month]:3} {dt.year:4} {dt.hour:02}:{dt.minute:02}:{dt.second:02} GMT"
def _normalizepath(path_segments):
"""
Internal helper function for normalizing a path list.
Should be equivalent to RFC2396, Section 5.2 (6) (c)-(f) with the exception
of removing empty path_segments.
"""
new_path_segments = []
l = len(path_segments)
for i in range(l):
segment = path_segments[i]
if not segment or segment == ".":
if i == l-1:
new_path_segments.append("")
elif segment == ".." and len(new_path_segments) and new_path_segments[-1] != "..":
new_path_segments.pop()
if i == l-1:
new_path_segments.append("")
else:
new_path_segments.append(segment)
return new_path_segments
def _escape(s, safe="".join(chr(c) for c in range(128))):
return urlparse.quote(s, safe)
_unescape = urlparse.unquote
alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
alphanum = alpha + "0123456789"
mark = "-_.!~*'()"
additionalsafe = "[]"
safe = alphanum + mark + additionalsafe
pathsafe = safe + ":@&=$,;+" + "|" # add "|" for Windows paths
querysafe = alphanum
fragsafe = alphanum
schemecharfirst = alpha
schemechar = alphanum + "+-."
def _urlencode(query_parts):
if query_parts is not None:
res = []
# generate a canonical order for the names
items = sorted(query_parts.items())
for (name, values) in items:
if not isinstance(values, (list, tuple)):
values = (values,)
else:
# generate a canonical order for the values
values.sort()
for value in values:
res.append(f"{_escape(name, querysafe)}={_escape(value, querysafe)}")
return "&".join(res)
else:
return None
def compilepattern(pattern, ignorecase=False):
if pattern is None:
return None
elif isinstance(pattern, str):
return (re.compile(fnmatch.translate(pattern), re.I if ignorecase else 0).match,)
else:
return tuple(re.compile(fnmatch.translate(p), re.I if ignorecase else 0).match for p in pattern)
def matchpatterns(name, include, exclude):
if include and not any(matcher(name) is not None for matcher in include):
return False
if exclude and any(matcher(name) is not None for matcher in exclude):
return False
return True
[docs]
class Context(object):
"""
Working with URLs (e.g. calling :meth:`URL.open` or :meth:`URL.connect`)
involves :class:`Connection` objects. To avoid constantly creating new
connections you can pass a :class:`!Context` object to those methods.
Connections will be stored in the :class:`!Context` object and will be
reused by those methods.
A :class:`!Context` object can also be used as a context manager. This context
object will be used for all :meth:`open` and :meth:`connect` calls inside the
:keyword:`with` block. (Note that after the end of the :keyword:`with` block
all connections will be closed.)
"""
def __init__(self):
self.schemes = {}
[docs]
def closeall(self):
"""
Close and drop all connections in this context.
"""
for scheme in self.schemes:
schemereg[scheme].closeall(self)
self.schemes = {}
def __enter__(self):
self.prev = threadlocalcontext.context
threadlocalcontext.context = self
def __exit__(self, *exc_info):
threadlocalcontext.context = self.prev
del self.prev
self.closeall()
class ThreadLocalContext(threading.local):
context = Context()
threadlocalcontext = ThreadLocalContext()
def getcontext(context):
if context is None:
return threadlocalcontext.context
return context
###
### Cursor for the :meth:`walk` method
###
[docs]
class Cursor(object):
"""
A :class:`!Cursor` object is used by the :meth:`walk` method during directory
traversal. It contains information about the state of the traversal and can
be used to influence which directories are traversed and in which order.
Information about the state of the traversal is provided in the following
attributes:
``rooturl``
The URL where traversal has been started (i.e. the object for which the
:meth:`walk` method has been called)
``url``
The current URL being traversed.
``event``
A string that specifies which event is currently handled. Possible values
are: ``"beforedir"``, ``"afterdir"`` and ``"file"``. A ``"beforedir"``
event is emitted before a directory is entered. ``"afterdir"``
is emitted after a directory has been entered. ``"file"`` is emitted when
a file is encountered.
``isdir``
True if ``url`` refers to a directory.
``isfile``
Tur if ``url`` refers to a regular file.
The following attributes specify which part of the tree should be traversed:
``beforedir``
Should the generator yield ``"beforedir"`` events?
``afterdir``
Should the generator yield ``"afterdir"`` events?
``file``
Should the generator yield ``"file"`` events?
``enterdir``
Should the directory be entered?
Note that if any of these attributes is changed by the code consuming the
generator, this new value will be used for the next traversal step once the
generator is resumed and will be reset to its initial value (specified in
the constructor) afterwards.
"""
[docs]
def __init__(self, url, beforedir=True, afterdir=False, file=True, enterdir=False):
"""
Create a new :class:`!Cursor` object for a tree traversal rooted at the
node ``node``.
The arguments ``beforedir``, ``afterdir``, ``file`` and
``enterdir`` are used as the initial values for the attributes of
the same name. (see the class docstring for info about their use).
"""
self.rooturl = self.url = url
self.event = None
self.beforedir = self._beforedir = beforedir
self.afterdir = self._afterdir = afterdir
self.file = self._file = file
self.enterdir = self._enterdir = enterdir
self.isdir = self.isfile = None
[docs]
def restore(self):
"""
Restore the attributes ``beforedir``, ``afterdir``, ``file`` and
``enterdir`` to their initial value.
"""
self.beforedir = self._beforedir
self.afterdir = self._afterdir
self.file = self._file
self.enterdir = self._enterdir
[docs]
class Connection(object):
"""
A :class:`!Connection` object is used for accessing and modifying the
metadata associated with a file. It is created by calling the
:meth:`~URL.connect` method on a :class:`URL` object.
"""
[docs]
@misc.notimplemented
def stat(self, url):
"""
Return the result of a :func:`stat` call on the file ``url``.
"""
[docs]
@misc.notimplemented
def lstat(self, url):
"""
Return the result of a :func:`stat` call on the file ``url``. Like
:meth:`stat`, but does not follow symbolic links.
"""
[docs]
@misc.notimplemented
def chmod(self, url, mode):
"""
Set the access mode of the file ``url`` to ``mode``.
"""
[docs]
@misc.notimplemented
def chown(self, url, owner=None, group=None):
"""
Change the owner and/or group of the file ``url``.
"""
[docs]
@misc.notimplemented
def lchown(self, url, owner=None, group=None):
"""
Change the owner and/or group of the file ``url``
(ignoring symbolic links).
"""
[docs]
@misc.notimplemented
def uid(self, url):
"""
Return the user id of the owner of the file ``url``.
"""
[docs]
@misc.notimplemented
def gid(self, url):
"""
Return the group id the file ``url`` belongs to.
"""
[docs]
@misc.notimplemented
def owner(self, url):
"""
Return the name of the owner of the file ``url``.
"""
[docs]
@misc.notimplemented
def group(self, url):
"""
Return the name of the group the file ``url`` belongs to.
"""
[docs]
def mimetype(self, url):
"""
Return the mimetype of the file ``url``.
"""
name = self._url2filename(url)
mimetype = mimetypes.guess_type(name)[0]
return mimetype or "application/octet-stream"
[docs]
@misc.notimplemented
def exists(self, url):
"""
Test whether the file ``url`` exists.
"""
[docs]
@misc.notimplemented
def isfile(self, url):
"""
Test whether the resource ``url`` is a file.
"""
[docs]
@misc.notimplemented
def isdir(self, url):
"""
Test whether the resource ``url`` is a directory.
"""
[docs]
@misc.notimplemented
def islink(self, url):
"""
Test whether the resource ``url`` is a link.
"""
[docs]
@misc.notimplemented
def ismount(self, url):
"""
Test whether the resource ``url`` is a mount point.
"""
[docs]
@misc.notimplemented
def access(self, url, mode):
"""
Test for access to the file/resource ``url``.
"""
[docs]
def size(self, url):
"""
Return the size of the file ``url``.
"""
return self.stat(url).st_size
[docs]
def imagesize(self, url):
"""
Return the size of the image ``url`` (if the resource is an image file)
as a ``(width, height)`` tuple. This requires the PIL__.
__ http://www.pythonware.com/products/pil/
"""
stream = self.open(url, mode="rb")
img = Image.open(stream) # Requires PIL
imagesize = img.size
stream.close()
return imagesize
[docs]
def cdate(self, url):
"""
Return the "metadate change" date of the file/resource ``url``
as a :class:`datetime.datetime` object in UTC.
"""
return datetime.datetime.utcfromtimestamp(self.stat(url).st_ctime)
[docs]
def adate(self, url):
"""
Return the last access date of the file/resource ``url`` as a
:class:`datetime.datetime` object in UTC.
"""
return datetime.datetime.utcfromtimestamp(self.stat(url).st_atime)
[docs]
def mdate(self, url):
"""
Return the last modification date of the file/resource ``url``
as a :class:`datetime.datetime` object in UTC.
"""
return datetime.datetime.utcfromtimestamp(self.stat(url).st_mtime)
[docs]
@misc.notimplemented
def remove(self, url):
"""
Remove the file ``url``.
"""
[docs]
@misc.notimplemented
def rmdir(self, url):
"""
Remove the directory ``url``.
"""
[docs]
@misc.notimplemented
def rename(self, url, target):
"""
Renames ``url`` to ``target``. This might not work if ``target``
has a different scheme than ``url`` (or is on a different server).
"""
[docs]
@misc.notimplemented
def link(self, url, target):
"""
Create a hard link from ``url`` to ``target``. This will not work
if ``target`` has a different scheme than ``url`` (or is on a
different server).
"""
[docs]
@misc.notimplemented
def symlink(self, url, target):
"""
Create a symbolic link from ``url`` to ``target``. This will not
work if ``target`` has a different scheme than ``url`` (or is on a
different server).
"""
[docs]
@misc.notimplemented
def chdir(self, url):
"""
Change the current directory to ``url``.
"""
os.chdir(self.name)
[docs]
@misc.notimplemented
def mkdir(self, url, mode=0o777):
"""
Create the directory ``url``.
"""
[docs]
@misc.notimplemented
def makedirs(self, url, mode=0o777):
"""
Create the directory ``url`` and all intermediate ones.
"""
[docs]
@misc.notimplemented
def walk(self, url, beforedir=True, afterdir=False, file=True, enterdir=True):
"""
Return an iterator for traversing the directory hierarchy rooted at
the directory ``url``.
Each item produced by the iterator is a :class:`Cursor` object.
It contains information about the state of the traversal and can be used
to influence which parts of the directory hierarchy are traversed and in
which order.
The arguments ``beforedir``, ``afterdir``,
``file`` and ``enterdir`` specify how the directory hierarchy should
be traversed. For more information see the :class:`Cursor` class.
Note that the :class:`Cursor` object is reused by :meth:`walk`, so you
can't rely on any attributes remaining the same across calls to
:func:`next`.
The following example shows how to traverse the current directory, print
all files except those in certain directories::
from ll import url
for cursor in url.here().walk(beforedir=True, afterdir=False, file=True):
if cursor.isdir:
if cursor.url.path[-2] in (".git", "build", "dist", "__pycache__"):
cursor.enterdir = False
else:
print(cursor.url)
"""
[docs]
def listdir(self, url, include=None, exclude=None, ignorecase=False):
"""
Iterates over items in the directory ``url``. The items produced are
:class:`URL` objects relative to ``url``.
With the optional ``include`` argument, this only lists items whose
names match the given pattern. Items matching the optional pattern
``exclude`` will not be listed. ``include`` and ``exclude`` can
be strings (which will be interpreted as :mod:`fnmatch` style filename
patterns) or lists of strings. If ``ignorecase`` is true
case-insensitive name matching will be performed.
"""
include = compilepattern(include, ignorecase)
exclude = compilepattern(exclude, ignorecase)
for cursor in self.walk(url, beforedir=True, afterdir=False, file=True, enterdir=False):
if matchpatterns(cursor.url.path[-1-cursor.isdir], include, exclude):
yield cursor.url
[docs]
def files(self, url, include=None, exclude=None, ignorecase=False):
"""
Iterates over files in the directory ``url``. The items produced
are :class:`URL` objects relative to ``url``.
With the optional ``include`` argument, this only lists files whose
names match the given pattern. Files matching the optional pattern
``exclude`` will not be listed. ``include`` and ``exclude`` can
be strings (which will be interpreted as :mod:`fnmatch` style filename
patterns) or lists of strings. If ``ignorecase`` is true
case-insensitive name matching will be performed.
"""
include = compilepattern(include, ignorecase)
exclude = compilepattern(exclude, ignorecase)
for cursor in self.walk(url, beforedir=False, afterdir=False, file=True, enterdir=False):
if cursor.isfile and matchpatterns(cursor.url.path[-1], include, exclude):
yield cursor.url
[docs]
def dirs(self, url, include=None, exclude=None, ignorecase=False):
"""
Iterates over directories in the directory ``url``. The items produced
are :class:`URL` objects relative to ``url``.
With the optional ``include`` argument, this only directories items
whose names match the given pattern. Directories matching the optional
pattern ``exclude`` will not be listed. ``include`` and
``exclude`` can be strings (which will be interpreted as :mod:`fnmatch`
style filename patterns) or lists of strings. If ``ignorecase`` is
true case-insensitive name matching will be performed.
"""
include = compilepattern(include, ignorecase)
exclude = compilepattern(exclude, ignorecase)
for cursor in self.walk(url, beforedir=True, afterdir=False, file=False, enterdir=False):
if cursor.isdir and matchpatterns(cursor.url.path[-2], include, exclude):
yield cursor.url
[docs]
def walkall(self, url, include=None, exclude=None, enterdir=None, skipdir=None, ignorecase=False):
"""
Recursively iterate over files and subdirectories. The iterator
yields :class:`URL` objects naming each child URL of the directory
``url`` and its descendants relative to ``url``. This performs
a depth-first traversal, returning each directory before all its children.
With the optional ``include`` argument, only yield items whose
names match the given pattern. Items matching the optional pattern
``exclude`` will not be listed. Directories that don't match the
optional pattern ``enterdir`` or match the pattern ``skipdir``
will not be traversed. ``include``, ``exclude``, ``enterdir``
and ``skipdir`` can be strings (which will be interpreted as
:mod:`fnmatch` style filename patterns) or lists of strings.
If ``ignorecase`` is true case-insensitive name matching will be
performed.
"""
include = compilepattern(include, ignorecase)
exclude = compilepattern(exclude, ignorecase)
enterdir = compilepattern(enterdir, ignorecase)
skipdir = compilepattern(skipdir, ignorecase)
for cursor in self.walk(url, beforedir=True, afterdir=False, file=True, enterdir=True):
name = cursor.url.path[-1-cursor.isdir]
if matchpatterns(name, include, exclude):
yield cursor.url
if cursor.isdir:
cursor.enterdir = matchpatterns(name, enterdir, skipdir)
[docs]
def walkfiles(self, url, include=None, exclude=None, enterdir=None, skipdir=None, ignorecase=False):
"""
Return a recursive iterator over files in the directory ``url``.
With the optional ``include`` argument, only yield files whose names
match the given pattern. Files matching the optional pattern
``exclude`` will not be listed. Directories that don't match the
optional pattern ``enterdir`` or match the pattern ``skipdir``
will not be traversed. ``include``, ``exclude``, ``enterdir``
and ``skipdir`` can be strings (which will be interpreted as
:mod:`fnmatch` style filename patterns) or lists of strings.
If ``ignorecase`` is true case-insensitive name matching will be
performed.
"""
include = compilepattern(include, ignorecase)
exclude = compilepattern(exclude, ignorecase)
enterdir = compilepattern(enterdir, ignorecase)
skipdir = compilepattern(skipdir, ignorecase)
for cursor in self.walk(url, beforedir=True, afterdir=False, file=True, enterdir=True):
if cursor.isfile:
if matchpatterns(cursor.url.path[-1], include, exclude):
yield cursor.url
else:
cursor.enterdir = matchpatterns(cursor.url.path[-2], enterdir, skipdir)
[docs]
def walkdirs(self, url, include=None, exclude=None, enterdir=None, skipdir=None, ignorecase=False):
"""
Return a recursive iterator over subdirectories in the directory
``url``.
With the optional ``include`` argument, only yield directories whose
names match the given pattern. Items matching the optional pattern
``exclude`` will not be listed. Directories that don't match the
optional pattern ``enterdir`` or match the pattern ``skipdir``
will not be traversed. ``include``, ``exclude``, ``enterdir``
and ``skipdir`` can be strings (which will be interpreted as
:mod:`fnmatch` style filename patterns) or lists of strings.
If ``ignorecase`` is true case-insensitive name matching will be
performed.
"""
include = compilepattern(include, ignorecase)
exclude = compilepattern(exclude, ignorecase)
enterdir = compilepattern(enterdir, ignorecase)
skipdir = compilepattern(skipdir, ignorecase)
for cursor in self.walk(url, beforedir=True, afterdir=False, file=False, enterdir=True):
name = cursor.url.path[-2]
if matchpatterns(name, include, exclude):
yield cursor.url
cursor.enterdir = matchpatterns(name, enterdir, skipdir)
[docs]
@misc.notimplemented
def open(self, url, *args, **kwargs):
"""
Open ``url`` for reading or writing. :meth:`open` returns a
:class:`Resource` object.
Which additional parameters are supported depends on the actual
resource created. Some common parameters are:
``mode`` : :class:`str`
A string indicating how the file is to be opened (just like the
mode argument for the builtin :func:`open` (e.g. ``"rb"`` or
``"wb"``).
``headers`` : :class:`dict`
Additional headers to use for an HTTP request.
``data`` : :class:`bytes`
Request body to use for an HTTP POST request.
``python`` : :class:`str` or :const:`None`
Name of the Python interpreter to use on the remote side (used by
``ssh`` URLs)
``nice`` : :class:`int` or :const:`None`
Nice level for the remote python (used by ``ssh`` URLs)
``check`` : :class:`bool` or :const:`None`
Whether ``ssh`` host keys should be checked (used by ``ssh`` URLs
where it defaults to ``True`` and ``ssh-nocheck`` URLs where it
defaults to ``False``).
"""
[docs]
class LocalConnection(Connection):
"""
A :class:`!LocalConnection` object is used for accessing and modifying the
metadata associated with a file in the local filesystem. It is created by
calling the :meth:`~URL.connect` method on a :class:`URL` object with no
scheme or the ``file`` or ``root`` scheme.
"""
def _url2filename(self, url):
return os.path.expanduser(url.local())
def stat(self, url):
return os.stat(self._url2filename(url))
def lstat(self, url):
return os.lstat(self._url2filename(url))
def chmod(self, url, mode):
name = self._url2filename(url)
os.chmod(name, mode)
def _chown(self, func, url, owner, group):
name = self._url2filename(url)
if owner is not None or group is not None:
if owner is None or group is None:
stat = os.stat(name)
if owner is None:
owner = stat.st_uid
elif isinstance(owner, str):
owner = pwd.getpwnam(owner)[2]
if group is None:
group = stat.st_gid
elif isinstance(group, str):
group = grp.getgrnam(group)[2]
func(name, owner, group)
def chown(self, url, owner=None, group=None):
self._chown(os.chown, url, owner, group)
def lchown(self, url, owner=None, group=None):
self._chown(os.lchown, url, owner, group)
def chdir(self, url):
os.chdir(self._url2filename(url))
def mkdir(self, url, mode=0o777):
os.mkdir(self._url2filename(url), mode)
def makedirs(self, url, mode=0o777):
os.makedirs(self._url2filename(url), mode)
def uid(self, url):
return self.stat(url).st_uid
def gid(self, url):
return self.stat(url).st_gid
def owner(self, url):
uid = self.uid(url)
try:
return pwd.getpwuid(uid)[0]
except KeyError:
return uid
def group(self, url):
gid = self.gid(url)
try:
return grp.getgrgid(gid)[0]
except KeyError:
return gid
def exists(self, url):
return os.path.exists(self._url2filename(url))
def isfile(self, url):
return os.path.isfile(self._url2filename(url))
def isdir(self, url):
return os.path.isdir(self._url2filename(url))
def islink(self, url):
return os.path.islink(self._url2filename(url))
def ismount(self, url):
return os.path.ismount(self._url2filename(url))
def access(self, url, mode):
return os.access(self._url2filename(url), mode)
def remove(self, url):
return os.remove(self._url2filename(url))
def rmdir(self, url):
return os.rmdir(self._url2filename(url))
def rename(self, url, target):
name = self._url2filename(url)
if not isinstance(target, URL):
target = URL(target)
target = self._url2filename(target)
os.rename(name, target)
def link(self, url, target):
name = self._url2filename(url)
if not isinstance(target, URL):
target = URL(target)
target = self._url2filename(target)
os.link(name, target)
def symlink(self, url, target):
name = self._url2filename(url)
if not isinstance(target, URL):
target = URL(target)
target = self._url2filename(target)
os.symlink(name, target)
def _walk(self, cursor, base, name):
def _event(url, event):
cursor.url = url
cursor.event = event
cursor.isdir = event != "file"
cursor.isfile = not cursor.isdir
return cursor
if name:
fullname = os.path.join(base, name)
else:
fullname = base
for childname in sorted(os.listdir(fullname)):
fullchildname = os.path.join(fullname, childname)
isdir = os.path.isdir(fullchildname)
relchildname = os.path.join(name, childname) if name else childname
emitbeforedir = cursor.beforedir
emitafterdir = cursor.afterdir
emitfile = cursor.file
enterdir = cursor.enterdir
if isdir:
if emitbeforedir or emitafterdir:
dirurl = Dir(relchildname, scheme=None)
if emitbeforedir:
yield _event(dirurl, "beforedir")
# The user may have altered ``cursor`` attributes outside the generator, so we refetch them
emitbeforedir = cursor.beforedir
emitafterdir = cursor.afterdir
emitfile = cursor.file
enterdir = cursor.enterdir
cursor.restore()
if enterdir:
yield from self._walk(cursor, base, relchildname)
if emitafterdir:
yield _event(dirurl, "afterdir")
cursor.restore()
else:
if emitfile:
yield _event(File(relchildname, scheme=None), "file")
cursor.restore()
def walk(self, url, beforedir=True, afterdir=False, file=True, enterdir=True):
cursor = Cursor(url, beforedir=beforedir, afterdir=afterdir, file=file, enterdir=enterdir)
return self._walk(cursor, url.local(), "")
def open(self, url, *args, **kwargs):
return FileResource(url, *args, **kwargs)
[docs]
class SshConnection(Connection):
"""
A :class:`!SshConnection` object is used for accessing and modifying the
metadata associated with a file on a remote filesystem. Remote files will
be accessed via code executed remotely on the target host via :mod:`execnet`.
:class:`!SshConnection` objects are created by calling the :meth:`connect`
method on a :class:`URL` object with the ``ssh`` or ``ssh-nocheck`` scheme.
.. note::
Using the scheme ``ssh-nocheck`` disables checks of the host key, i.e. it
passes ``-o "StrictHostKeyChecking=no"`` to the underlying ``ssh``
command.
If you need to use further options (e.g. when your ``known_hosts`` file
isn't writable), you should configure that in your ``~/.ssh/config`` file,
for example::
Host foo
Hostname foo.example.org
StrictHostKeyChecking no
UserKnownHostsfile /dev/null
or for Windows::
Host foo
Hostname foo.example.org
StrictHostKeyChecking no
UserKnownHostsfile nul:
"""
remote_code = """
import sys, os, pickle, re, fnmatch
try:
from urllib import request
except ImportError:
import urllib as request
try:
next
except NameError:
def next(iter):
return iter.next()
try:
unicode
except NameError:
unicode = str
files = {}
def ownergroup(filename, owner=None, group=None):
if owner is not None or group is not None:
if owner is None or group is None:
if isinstance(filename, unicode):
stat = os.stat(filename)
else:
stat = os.fstat(files[filename].fileno())
if owner is None:
owner = stat.st_uid
elif isinstance(owner, unicode):
import pwd
owner = pwd.getpwnam(owner)[2]
if group is None:
group = stat.st_gid
elif isinstance(group, unicode):
import grp
group = grp.getgrnam(group)[2]
return (owner, group)
def compilepattern(pattern, ignorecase=False):
if pattern is None:
return None
elif isinstance(pattern, unicode):
return (re.compile(fnmatch.translate(pattern), re.I if ignorecase else 0).match,)
else:
return tuple(re.compile(fnmatch.translate(p), re.I if ignorecase else 0).match for p in pattern)
def matchpatterns(name, include, exclude):
if include and not any(matcher(name) is not None for matcher in include):
return False
if exclude and any(matcher(name) is not None for matcher in exclude):
return False
return True
def listdir(dirname):
result = []
for childname in sorted(os.listdir(dirname)):
fullchildname = os.path.join(dirname, childname)
isdir = os.path.isdir(fullchildname)
result.append((isdir, childname))
return result
while True:
(filename, cmdname, args, kwargs) = channel.receive()
if isinstance(filename, unicode):
filename = os.path.expanduser(request.url2pathname(filename))
data = None
try:
if cmdname == "open":
try:
stream = open(filename, *args, **kwargs)
except IOError:
exc = sys.exc_info()[1]
if args:
mode = args[0]
else:
mode = kwargs.get("mode", "rb")
if "w" not in mode or exc.errno != 2: # didn't work for some other reason than a non existing directory
raise
(splitpath, splitname) = os.path.split(filename)
if splitpath:
os.makedirs(splitpath)
stream = open(filename, *args, **kwargs)
else:
raise # we don't have a directory to make so pass the error on
data = id(stream)
files[data] = stream
elif cmdname == "stat":
if isinstance(filename, unicode):
data = tuple(os.stat(filename))
else:
data = tuple(os.fstat(files[filename].fileno()))
elif cmdname == "lstat":
data = os.lstat(filename)
elif cmdname == "close":
try:
stream = files[filename]
except KeyError:
pass
else:
stream.close()
del files[filename]
elif cmdname == "chmod":
data = os.chmod(filename, *args, **kwargs)
elif cmdname == "chown":
(owner, group) = ownergroup(filename, *args, **kwargs)
if owner is not None:
data = os.chown(filename, owner, group)
elif cmdname == "lchown":
(owner, group) = ownergroup(filename, *args, **kwargs)
if owner is not None:
data = os.lchown(filename, owner, group)
elif cmdname == "uid":
stat = os.stat(filename)
data = stat.st_uid
elif cmdname == "gid":
stat = os.stat(filename)
data = stat.st_gid
elif cmdname == "owner":
import pwd
stat = os.stat(filename)
try:
data = unicode(pwd.getpwuid(stat.st_uid)[0])
except KeyError:
data = stat.st_uid
elif cmdname == "group":
import grp
stat = os.stat(filename)
try:
data = unicode(grp.getgrgid(stat.st_gid)[0])
except KeyError:
data = stat.st_gid
elif cmdname == "exists":
data = os.path.exists(filename)
elif cmdname == "isfile":
data = os.path.isfile(filename)
elif cmdname == "isdir":
data = os.path.isdir(filename)
elif cmdname == "islink":
data = os.path.islink(filename)
elif cmdname == "ismount":
data = os.path.ismount(filename)
elif cmdname == "access":
data = os.access(filename, *args, **kwargs)
elif cmdname == "remove":
data = os.remove(filename)
elif cmdname == "rmdir":
data = os.rmdir(filename)
elif cmdname == "rename":
data = os.rename(filename, os.path.expanduser(args[0]))
elif cmdname == "link":
data = os.link(filename, os.path.expanduser(args[0]))
elif cmdname == "symlink":
data = os.symlink(filename, os.path.expanduser(args[0]))
elif cmdname == "chdir":
data = os.chdir(filename)
elif cmdname == "mkdir":
data = os.mkdir(filename)
elif cmdname == "makedirs":
data = os.makedirs(filename)
elif cmdname == "makefifo":
data = os.makefifo(filename)
elif cmdname == "listdir":
data = listdir(filename)
elif cmdname == "next":
data = next(files[filename])
else:
data = getattr(files[filename], cmdname)
data = data(*args, **kwargs)
except StopIteration:
exc = sys.exc_info()[1]
channel.send((True, pickle.dumps(exc)))
except Exception:
exc = sys.exc_info()[1]
channel.send((True, pickle.dumps(exc)))
else:
channel.send((False, data))
"""
def __init__(self, context, host, port, user, python=None, nice=None, check=None):
# We don't have to store the context (this avoids cycles)
self.host = host
self.port = port
self.user = user
self.python = python
self.nice = nice
self.check = check
self._channel = None
def close(self):
if self._channel is not None and not self._channel.isclosed():
self._channel.close()
self._channel.gateway.exit()
self._channel.gateway.join()
def _url2filename(self, url):
if url.scheme not in {"ssh", "ssh-nocheck"}:
raise ValueError(f"URL {url!r} is not an ssh URL")
filename = str(url.path)
if filename.startswith("/~"):
filename = filename[1:]
return filename
def _send(self, filename, cmd, *args, **kwargs):
if self._channel is None:
server = "ssh="
if self.port is not None:
server += f"-p{self.port} "
if self.check is not None and not self.check:
server += ' -oStrictHostKeyChecking=no '
server += f"{self.user}@{self.host}"
python = self.python
if python is None:
python = default_ssh_python
if python is not None:
server += f"//python={python}"
if self.nice is not None:
server += f"//nice={self.nice}"
gateway = execnet.makegateway(server) # This requires ``execnet`` (http://codespeak.net/execnet/)
gateway.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False)
self._channel = gateway.remote_exec(self.remote_code)
self._channel.send((filename, cmd, args, kwargs))
(isexc, data) = self._channel.receive()
if isexc:
raise pickle.loads(data, fix_imports=True)
else:
return data
def stat(self, url):
filename = self._url2filename(url)
data = self._send(filename, "stat")
return os.stat_result(data) # channel returned a tuple => wrap it
def lstat(self, url):
filename = self._url2filename(url)
data = self._send(filename, "lstat")
return os.stat_result(data) # channel returned a tuple => wrap it
def chmod(self, url, mode):
return self._send(self._url2filename(url), "chmod", mode)
def chown(self, url, owner=None, group=None):
return self._send(self._url2filename(url), "chown", owner, group)
def lchown(self, url, owner=None, group=None):
return self._send(self._url2filename(url), "lchown", owner, group)
def chdir(self, url):
return self._send(self._url2filename(url), "chdir")
def mkdir(self, url, mode=0o777):
return self._send(self._url2filename(url), "mkdir", mode)
def makedirs(self, url, mode=0o777):
return self._send(self._url2filename(url), "makedirs", mode)
def uid(self, url):
return self._send(self._url2filename(url), "uid")
def gid(self, url):
return self._send(self._url2filename(url), "gid")
def owner(self, url):
return self._send(self._url2filename(url), "owner")
def group(self, url):
return self._send(self._url2filename(url), "group")
def exists(self, url):
return self._send(self._url2filename(url), "exists")
def isfile(self, url):
return self._send(self._url2filename(url), "isfile")
def isdir(self, url):
return self._send(self._url2filename(url), "isdir")
def islink(self, url):
return self._send(self._url2filename(url), "islink")
def ismount(self, url):
return self._send(self._url2filename(url), "ismount")
def access(self, url, mode):
return self._send(self._url2filename(url), "access", mode)
def remove(self, url):
return self._send(self._url2filename(url), "remove")
def rmdir(self, url):
return self._send(self._url2filename(url), "rmdir")
def _cmdwithtarget(self, cmdname, url, target):
filename = self._url2filename(url)
if not isinstance(target, URL):
target = URL(target)
targetname = self._url2filename(target)
if target.server != url.server:
raise OSError(errno.EXDEV, os.strerror(errno.EXDEV))
return self._send(filename, cmdname, targetname)
def rename(self, url, target):
return self._cmdwithtarget("rename", url, target)
def link(self, url, target):
return self._cmdwithtarget("link", url, target)
def symlink(self, url, target):
return self._cmdwithtarget("symlink", url, target)
def _walk(self, cursor, base, name):
def _event(url, event):
cursor.url = url
cursor.event = event
cursor.isdir = event != "file"
cursor.isfile = not cursor.isdir
return cursor
if name:
fullname = os.path.join(base, name)
else:
fullname = base
for (isdir, childname) in self._send(fullname, "listdir"):
fullchildname = os.path.join(fullname, childname)
relchildname = os.path.join(name, childname) if name else childname
emitbeforedir = cursor.beforedir
emitafterdir = cursor.afterdir
emitfile = cursor.file
enterdir = cursor.enterdir
if isdir:
if emitbeforedir or emitafterdir:
dirurl = Dir(relchildname, scheme=None)
if emitbeforedir:
yield _event(dirurl, "beforedir")
# The user may have altered ``cursor`` attributes outside the generator, so we refetch them
emitbeforedir = cursor.beforedir
emitafterdir = cursor.afterdir
emitfile = cursor.file
enterdir = cursor.enterdir
cursor.restore()
if enterdir:
yield from self._walk(cursor, base, relchildname)
if emitafterdir:
yield _event(dirurl, "afterdir")
cursor.restore()
else:
if emitfile:
yield _event(File(relchildname, scheme=None), "file")
cursor.restore()
def walk(self, url, beforedir=True, afterdir=False, file=True, enterdir=True):
cursor = Cursor(url, beforedir=beforedir, afterdir=afterdir, file=file, enterdir=enterdir)
return self._walk(cursor, self._url2filename(url), "")
def open(self, url, *args, **kwargs):
return RemoteFileResource(self, url, *args, **kwargs)
def __repr__(self):
if self.port is None:
return f"<{self.__class__.__module__}.{self.__class__.__name__} to {self.user}@{self.host} at {id(self):#x}>"
else:
return f"<{self.__class__.__module__}.{self.__class__.__name__} to {self.user}@{self.host}:{self.port} at {id(self):#x}>"
[docs]
class URLConnection(Connection):
"""
A :class:`!URLConnection` object is used for accessing and modifying the
metadata associated any other resource specified by a URL (except those
handled by the other :class:`Connection` subclasses).
"""
def mimetype(self, url):
return url.open().mimetype()
def size(self, url):
return url.open().size()
def imagesize(self, url):
return url.open(mode="rb").imagesize()
def mdate(self, url):
return url.open(mode="rb").mdate()
def resheaders(self, url):
return url.open(mode="rb").resheaders()
def isdir(self, url):
# URLs never are directories (even if they might be (for URLs ending in ``/``), there's no way to call :meth:`listdir`)
return False
def open(self, url, mode="rb", headers=None, data=None):
if mode != "rb":
raise NotImplementedError(f"mode {mode!r} not supported")
return URLResource(url, headers=headers, data=data)
[docs]
def here(scheme="file"):
"""
Return the current directory as an :class:`URL` object.
"""
return Dir(os.getcwd(), scheme)
[docs]
def home(user="", scheme="file"):
"""
Return the home directory of the current user (or the user named ``user``,
if ``user`` is specified) as an :class:`URL` object::
>>> url.home()
URL('file:/home/walter/')
>>> url.home("andreas")
URL('file:/home/andreas/')
"""
return Dir(f"~{user}", scheme)
[docs]
def root():
"""
Return a blank ``root`` :class:`URL`, i.e. ``URL("root:")``.
"""
return URL("root:")
[docs]
def File(name, scheme="file"):
"""
Turn a filename into an :class:`URL` object::
>>> url.File("a#b")
URL('file:a%23b')
"""
name = urllib.request.pathname2url(os.path.expanduser(name))
if name.startswith("///"):
name = name[2:]
url = URL(name)
url.scheme = scheme
return url
[docs]
def Dir(name, scheme="file"):
"""
Turns a directory name into an :class:`URL` object, just like :func:`File`,
but ensures that the path is terminated with a ``/``::
>>> url.Dir("a#b")
URL('file:a%23b/')
"""
name = urllib.request.pathname2url(os.path.expanduser(name))
if name:
if not name.endswith("/"):
name += "/"
else:
name = "./"
if name.startswith("///"):
name = name[2:]
url = URL(name)
url.scheme = scheme
return url
[docs]
def Ssh(user, host, path="~/"):
"""
Return a ssh :class:`URL` for the user ``user`` on the host ``host``
with the path ``path``. ``path`` (defaulting to the users home
directory) must be a path in URL notation (i.e. use ``/`` as directory
separator)::
>>> url.Ssh("root", "www.example.com", "~joe/public_html/index.html")
URL('ssh://root@www.example.com/~joe/public_html/index.html')
If the path starts with ``~/`` it is relative to this users home directory,
if it starts with ``~user`` it's relative to the home directory of the user
``user``. In all other cases the path is considered to be absolute.
"""
url = URL()
url.scheme = "ssh"
url.userinfo = user
url.host = host
if path.startswith("~"):
path = "/" + path
url.path = path
return url
[docs]
def first(urls):
"""
Return the first URL from ``urls`` that exists as a real file or
directory. :const:`None` entries in ``urls`` will be skipped.
"""
for url in urls:
if url is not None:
if url.exists():
return url
[docs]
def firstdir(urls):
"""
Return the first URL from ``urls`` that exists as a real directory.
:const:`None` entries in ``urls`` will be skipped.
"""
for url in urls:
if url is not None:
if url.isdir():
return url
[docs]
def firstfile(urls):
"""
Return the first URL from ``urls`` that exists as a real file.
:const:`None` entries in ``urls`` will be skipped.
"""
for url in urls:
if url is not None:
if url.isfile():
return url
[docs]
class Resource(object):
"""
A :class:`!Resource` is a base class that provides a file-like interface
to local and remote files, URLs and other resources.
Each resource object has the following attributes:
:attr:`url`
The URL for which this resource has been opened (i.e.
``foo.open().url is foo`` if ``foo`` is a :class:`URL` object);
:attr:`name`
A string version of :attr:`url`;
:attr:`closed`
A :class:`bool` specifying whether the resource has been closed
(i.e. whether the :meth:`close` method has been called).
In addition to file methods (like :meth:`read`, :meth:`readlines`,
:meth:`write` and :meth:`close`) a resource object might provide the
following methods:
:meth:`finalurl`
Return the real URL of the resource (this might be different from the
:attr:`url` attribute in case of a redirect).
:meth:`size`
Return the size of the file/resource.
:meth:`mdate`
Return the last modification date of the file/resource as a
:class:`datetime.datetime` object in UTC.
:meth:`mimetype`
Return the mimetype of the file/resource.
:meth:`imagesize`
Return the size of the image (if the resource is an image file) as a
``(width, height)`` tuple. This requires the PIL__.
__ http://www.pythonware.com/products/pil/
"""
def finalurl(self):
return self.url
def imagesize(self):
pos = self.tell()
self.seek(0)
img = Image.open(self) # Requires PIL
imagesize = img.size
self.seek(pos)
return imagesize
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
def __repr__(self):
return f"<{'closed' if self.closed else 'open'} {self.__class__.__module__}.{self.__class__.__name__} {self.name}, mode {self.mode!r} at {id(self):#x}>"
[docs]
class FileResource(Resource):
"""
A subclass of :class:`Resource` that handles local files.
"""
def __init__(self, url, mode="rb", *args, **kwargs):
url = URL(url)
self.name = os.path.expanduser(url.local())
self.mode = mode
try:
file = open(self.name, mode, *args, **kwargs)
except IOError as exc:
if "w" not in mode or exc.errno != 2: # didn't work for some other reason than a non existing directory
raise
(splitpath, splitname) = os.path.split(self.name)
if splitpath:
os.makedirs(splitpath)
file = open(self.name, mode, *args, **kwargs)
else:
raise # we don't have a directory to make so pass the error on
self.file = file
self.url = url
def __getattr__(self, name):
return getattr(self.file, name)
def __iter__(self):
return iter(self.file)
def close(self):
if self.file is not None:
self.file.close()
self.file = None
@property
def closed(self):
return self.file is None
def size(self):
# Forward to the connection
return LocalSchemeDefinition._connection.size(self.url)
def mdate(self):
# Forward to the connection
return LocalSchemeDefinition._connection.mdate(self.url)
def mimetype(self):
# Forward to the connection
return LocalSchemeDefinition._connection.mimetype(self.url)
[docs]
class RemoteFileResource(Resource):
"""
A subclass of :class:`Resource` that handles remote files (i.e. those using
the ``ssh`` scheme).
"""
def __init__(self, connection, url, mode="rb", *args, **kwargs):
self.connection = connection
self.url = URL(url)
self.mode = mode
self.args = args
self.kwargs = kwargs
filename = self.connection._url2filename(url)
self.name = str(self.url)
self.remoteid = self._send(filename, "open", mode, *args, **kwargs)
def __repr__(self):
return f"<{'closed' if self.connection is None else 'open'} {self.__class__.__module__}.{self.__class__.__name__} {self.name}, mode {self.mode!r} at {id(self):#x}>"
def _send(self, filename, cmd, *args, **kwargs):
if self.connection is None:
raise ValueError("I/O operation on closed file")
return self.connection._send(filename, cmd, *args, **kwargs)
def close(self):
if self.connection is not None:
self._send(self.remoteid, "close")
self.connection = None # close the channel too as there are no longer any meaningful operations
@property
def closed(self):
return self.connection is None
def read(self, size=None):
return self._send(self.remoteid, "read", size) if size is not None else self._send(self.remoteid, "read")
def readline(self, size=-1):
return self._send(self.remoteid, "readline", size) if size is not None else self._send(self.remoteid, "readline")
def readlines(self, size=-1):
return self._send(self.remoteid, "readlines", size) if size is not None else self._send(self.remoteid, "readlines")
def __iter__(self):
return self
def __next__(self):
return self._send(self.remoteid, "next")
def seek(self, offset, whence=0):
return self._send(self.remoteid, "seek", offset, whence)
def tell(self):
return self._send(self.remoteid, "tell")
def truncate(self, size=None):
if size is None:
return self._send(self.remoteid, "truncate")
else:
return self._send(self.remoteid, "truncate", size)
def write(self, string):
return self._send(self.remoteid, "write", string)
def writelines(self, strings):
return self._send(self.remoteid, "writelines", strings)
def flush(self):
return self._send(self.remoteid, "flush")
def size(self):
# Forward to the connection
return self.connection.size(self.url)
def mdate(self):
# Forward to the connection
return self.connection.mdate(self.url)
def mimetype(self):
# Forward to the connection
return self.connection.mimetype(self.url)
[docs]
class URLResource(Resource):
"""
A subclass of :class:`Resource` that handles HTTP, FTP and other URLs
(i.e. those that are not handled by :class:`FileResource` or
:class:`RemoteFileResource`.
"""
def __init__(self, url, mode="rb", headers=None, data=None):
if "w" in mode:
raise ValueError(f"writing mode {mode!r} not supported")
self.url = URL(url)
self.name = str(self.url)
self.mode = mode
self.reqheaders = headers
self.reqdata = data
self._finalurl = None
if data is not None:
data = urlparse.urlencode(data)
if headers is None:
headers = {}
req = urllib.request.Request(url=self.name, data=data, headers=headers)
self._stream = urllib.request.urlopen(req)
self._finalurl = URL(self._stream.url) # Remember the final URL in case of a redirect
self._resheaders = self._stream.info()
self._mimetype = None
self._encoding = None
contenttype = self._resheaders.get("Content-Type")
if contenttype is not None:
(mimetype, options) = misc.parse_header(contenttype)
self._mimetype = mimetype
self._encoding = options.get("charset")
cl = self._resheaders.get("Content-Length")
if cl:
cl = int(cl)
self._size = cl
lm = self._resheaders.get("Last-Modified")
if lm is not None:
lm = mime2dt(lm)
self._mdate = lm
self._buffer = io.BytesIO()
def __getattr__(self, name):
function = getattr(self._stream, name)
def call(*args, **kwargs):
return function(*args, **kwargs)
return call
def close(self):
if self._stream is not None:
self._stream.close()
self._stream = None
@property
def closed(self):
return self._stream is None
def finalurl(self):
return self._finalurl
def mimetype(self):
return self._mimetype
def resheaders(self):
return self._resheaders
def encoding(self):
return self._encoding
def mdate(self):
return self._mdate
def size(self):
return self._size
def read(self, size=None):
data = self._stream.read(size) if size is not None else self._stream.read()
self._buffer.write(data)
return data
def readline(self, size=None):
data = self._stream.readline(size) if size is not None else self._stream.readline()
self._buffer.write(data)
return data
def resdata(self):
data = self._stream.read()
self._buffer.write(data)
return self._buffer.getvalue()
def imagesize(self):
img = Image.open(io.BytesIO(self.resdata())) # Requires PIL
return img.size
def __iter__(self):
while True:
data = self._stream.readline()
if not data:
break
self._buffer.write(data)
yield data
[docs]
class SchemeDefinition(object):
"""
A :class:`!SchemeDefinition` instance defines the properties of a particular
URL scheme.
"""
_connection = URLConnection()
[docs]
def __init__(self, scheme, usehierarchy, useserver, usefrag, islocal=False, isremote=False, defaultport=None):
"""
Create a new :class:`!SchemeDefinition` instance. Arguments are:
* ``scheme``: The name of the scheme;
* ``usehierarchy``: Specifies whether this scheme uses hierarchical
URLs or opaque URLs (i.e. whether ``hier_part`` or ``opaque_part``
from the BNF in :rfc:`2396` is used);
* ``useserver``: Specifies whether this scheme uses an Internet-based
server :attr:`authority` component or a registry of naming authorities
(only for hierarchical URLs);
* ``usefrag``: Specifies whether this scheme uses fragments
(according to the BNF in :rfc:`2396` every scheme does, but it doesn't
make sense for e.g. ``"javascript"``, ``"mailto"`` or ``"tel"``);
* ``islocal``: Specifies whether URLs with this scheme refer to
local files;
* ``isremote``: Specifies whether URLs with this scheme refer to
remote files (there may be schemes which are neither local nor remote,
e.g. ``"mailto"``);
* ``defaultport``: The default port for this scheme (only for schemes
using server based authority).
"""
self.scheme = scheme
self.usehierarchy = usehierarchy
self.useserver = useserver
self.usefrag = usefrag
self.islocal = islocal
self.isremote = isremote
self.defaultport = defaultport
[docs]
def connect(self, url, context=None, **kwargs):
"""
Create a :class:`Connection` for the :class:`URL` ``url`` (which must
have ``self`` as the scheme).
"""
return self._connect(url, context, **kwargs)[0]
def _connect(self, url, context=None, **kwargs):
# Returns a tuple ``(connect, kwargs)`` (some of the keyword arguments
# might have been consumed by the connect call, the rest can be passed
# on the whatever call will be made on the connection itself)
# We can always use the same connection here, because the connection for
# local files and real URLs doesn't use any resources.
# This will be overwritten by :class:`SshSchemeDefinition`
return (self._connection, kwargs)
def open(self, *args, **kwargs):
return URLConnection(*args, **kwargs)
[docs]
def closeall(self, context):
"""
Close all connections active for this scheme in the context ``context``.
"""
def __repr__(self):
return f"<{self.__class__.__name__} instance scheme={self.scheme!r} usehierarchy={self.usehierarchy!r} useserver={self.useserver!r} usefrag={self.usefrag!r} at {id(self):#x}>"
class LocalSchemeDefinition(SchemeDefinition):
# Use a different connection than the base class (but still one single connection for all URLs)
_connection = LocalConnection()
def open(self, *args, **kwargs):
return FileResource(*args, **kwargs)
class SshSchemeDefinition(SchemeDefinition):
def _connect(self, url, context=None, **kwargs):
if "python" in kwargs or "nice" in kwargs or "check" in kwargs:
kwargs = kwargs.copy()
python = kwargs.pop("python", None)
nice = kwargs.pop("nice", None)
check = kwargs.pop("check", None)
else:
python = None
nice = None
check = None
context = getcontext(context)
if context is threadlocalcontext.__class__.context:
raise ValueError("ssh URLs need a custom context")
# Use one :class:`SshConnection` for each host/port/user/python combination
host = url.host
port = url.port
user = url.userinfo
if check is None:
check = self.scheme == "ssh"
try:
connections = context.schemes["ssh"]
except KeyError:
connections = context.schemes["ssh"] = {}
try:
connection = connections[(host, port, user, python, nice, check)]
except KeyError:
connection = connections[(host, port, user, python, nice, check)] = SshConnection(context, host, port, user, python, nice, check)
return (connection, kwargs)
def open(self, url, mode="rb", context=None, python=None, nice=None):
(connection, kwargs) = self._connect(url, context=context, python=python, nice=nice)
return RemoteFileResource(connection, url, mode, **kwargs)
def closeall(self, context):
for connection in context.schemes["ssh"].values():
connection.close()
schemereg = {
"http": SchemeDefinition("http", usehierarchy=True, useserver=True, usefrag=True, isremote=True, defaultport=80),
"https": SchemeDefinition("https", usehierarchy=True, useserver=True, usefrag=True, isremote=True, defaultport=443),
"ftp": SchemeDefinition("ftp", usehierarchy=True, useserver=True, usefrag=True, isremote=True, defaultport=21),
"file": LocalSchemeDefinition("file", usehierarchy=True, useserver=False, usefrag=True, islocal=True),
"root": LocalSchemeDefinition("root", usehierarchy=True, useserver=False, usefrag=True, islocal=True),
"javascript": SchemeDefinition("javascript", usehierarchy=False, useserver=False, usefrag=False),
"mailto": SchemeDefinition("mailto", usehierarchy=False, useserver=False, usefrag=False),
"tel": SchemeDefinition("tel", usehierarchy=False, useserver=False, usefrag=False),
"fax": SchemeDefinition("fax", usehierarchy=False, useserver=False, usefrag=False),
"ssh": SshSchemeDefinition("ssh", usehierarchy=True, useserver=True, usefrag=True, islocal=False, isremote=True),
"ssh-nocheck": SshSchemeDefinition("ssh-nocheck", usehierarchy=True, useserver=True, usefrag=True, islocal=False, isremote=True),
}
defaultreg = LocalSchemeDefinition("", usehierarchy=True, useserver=True, islocal=True, usefrag=True)
class Path(object):
__slots__ = ("_path", "_segments")
def __init__(self, path=None):
self._path = ""
self._segments = []
self.path = path
def _prefix(cls, path):
if path.startswith("/"):
return "/"
else:
return ""
def insert(self, index, *others):
segments = self.segments
segments[index:index] = map(_unescape, others)
self.segments = segments
def startswith(self, prefix):
"""
Return whether ``self`` starts with the path ``prefix``.
``prefix`` will be converted to a :class:`Path` if it isn't one.
"""
if not isinstance(prefix, Path):
prefix = Path(prefix)
segments = prefix.segments
if self.isabs != prefix.isabs:
return False
if segments and not segments[-1] and len(self.segments) > len(segments):
return self.segments[:len(segments)-1] == segments[:-1]
else:
return self.segments[:len(segments)] == segments
def endswith(self, suffix):
"""
Return whether ``self`` ends with the path ``suffix``. ``suffix``
will be converted to a :class:`Path` if it isn't one. If ``suffix`` is
absolute a normal comparison will be done.
"""
if not isinstance(suffix, Path):
suffix = Path(suffix)
if suffix.isabs:
return self == suffix
else:
segments = suffix.segments
return self.segments[-len(segments):] == segments
def clone(self):
return Path(self)
def __repr__(self):
return f"Path({self._path!r})"
def __str__(self):
return self.path
def __eq__(self, other):
if not isinstance(other, Path):
other = Path(other)
return self._path == other._path
def __ne__(self, other):
return not self == other
def __hash__(self):
return hash(self._path)
def __len__(self):
return len(self.segments)
def __getitem__(self, index):
if isinstance(index, slice):
# Return of slice of the path. The resulting path will always be relative, i.e. the leading ``/`` will be dropped.
return Path(self.segments[index])
else:
return self.segments[index]
def __setitem__(self, index, value):
segments = self.segments
if isinstance(index, slice):
segments[index] = map(_unescape, value)
self._path = self._prefix(self._path) + self._segments2path(segments)
else:
segments[index] = _unescape(value)
self._path = self._prefix(self._path) + self._segments2path(segments)
self._segments = segments
def __delitem__(self, index):
if isinstance(index, slice):
del self.segments[index]
else:
segments = self.segments
del segments[index]
self._path = self._segments2path(segments)
self._segments = segments
def __contains__(self, item):
return _unescape(item) in self.segments
class isabs(misc.propclass):
"""
Is the path absolute?
"""
def __get__(self):
return self._path.startswith("/")
def __set__(self, isabs):
isabs = bool(isabs)
if isabs != self._path.startswith("/"):
if isabs:
self._path = "/" + self._path
else:
self._path = self._path[1:]
def __delete__(self):
if self._path.startswith("/"):
self._path = self._path[1:]
@classmethod
def _segments2path(cls, segments):
return "/".join(_escape(segment, pathsafe) for segment in segments)
@classmethod
def _path2segments(cls, path):
if path.startswith("/"):
path = path[1:]
return list(map(_unescape, path.split("/")))
def _setpathorsegments(self, path):
if path is None:
self._path = ""
self._segments = []
elif isinstance(path, Path):
self._path = path._path
self._segments = None
elif isinstance(path, (list, tuple)):
self._segments = list(map(_unescape, path))
self._path = self._prefix(self._path) + self._segments2path(self._segments)
else:
path = _escape(path)
prefix = self._prefix(path)
if prefix:
path = path[1:]
self._segments = self._path2segments(path)
self._path = prefix + self._segments2path(self._segments)
class path(misc.propclass):
"""
The complete path as a string.
"""
def __get__(self):
return self._path
def __set__(self, path):
self._setpathorsegments(path)
def __delete__(self):
self.clear()
class segments(misc.propclass):
"""
The path as a list of (name, param) tuples.
"""
def __get__(self):
if self._segments is None:
self._segments = self._path2segments(self._path)
return self._segments
def __set__(self, path):
self._setpathorsegments(path)
def __delete__(self):
self._path = self._prefix(self._path)
self._segments = []
class file(misc.propclass):
"""
The filename without the path, i.e. the name part of the last component
of :attr:`path`. The ``baz.html`` part of
``http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag``.
"""
def __get__(self):
try:
return self[-1]
except IndexError:
return None
def __set__(self, file):
"""
Setting the filename preserves the parameter in the last segment.
"""
if file is None:
del self.file
segments = self.segments
if segments:
self[-1] = file
else:
self.segments = [file]
def __delete__(self):
"""
Deleting the filename preserves the parameter in the last segment.
"""
segments = self.segments
if segments:
self[-1] = ""
class ext(misc.propclass):
"""
The filename extension of the last segment of the path. The ``html`` part
of ``http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag``.
"""
def __get__(self):
ext = None
segments = self.segments
if segments:
segment = segments[-1]
pos = segment.rfind(".")
if pos != -1:
ext = segment[pos+1:]
return ext
def __set__(self, ext):
if ext is None:
del self.ext
segments = self.segments
if segments:
segment = segments[-1]
pos = segment.rfind(".")
if pos != -1:
segment = segment[:pos+1] + ext
else:
segment = segment + "." + ext
self[-1] = segment
def __delete__(self):
segments = self.segments
if segments:
segment = segments[-1]
pos = segment.rfind(".")
if pos != -1:
segment = segment[:pos]
self[-1] = segment
def withext(self, ext):
"""
Return a new :class:`Path` where the filename extension has been replaced
with ``ext``.
"""
path = self.clone()
path.ext = ext
return path
def withoutext(self):
"""
Return a new :class:`Path` where the filename extension has been removed.
"""
if "/" not in self._path and self._path.rfind(".")==0:
return Path("./")
else:
path = self.clone()
del path.ext
return path
def withfile(self, file):
"""
Return a new :class:`Path` where the filename (i.e. the name of the last
component of :attr:`segments`) has been replaced with ``file``.
"""
path = self.clone()
path.file = file
return path
def withoutfile(self):
"""
Return a new :class:`Path` where the filename (i.e. the name of the last
component of :attr:`segments`) has been removed.
"""
if "/" not in self._path:
return Path("./")
else:
path = Path(self)
del path.file
return path
def clear(self):
self._path = ""
self._segments = []
def __truediv__(self, other):
"""
Join two paths.
"""
if isinstance(other, str):
other = Path(other)
if isinstance(other, Path):
newpath = Path()
# RFC2396, Section 5.2 (5)
if other.isabs:
newpath._path = other._path
newpath._segments = None
else:
# the following should be equivalent to RFC2396, Section 5.2 (6) (c)-(f)
newpath._path = self._prefix(self._path) + self._segments2path(
_normalizepath(
self.segments[:-1] + # RFC2396, Section 5.2 (6) (a)
other.segments # RFC2396, Section 5.2 (6) (b)
)
)
newpath._segments = None
return newpath
elif isinstance(other, (list, tuple)): # this makes path/list possible
return other.__class__(self/path for path in other)
else: # this makes path/generator possible
return (self/path for path in other)
def __rtruediv__(self, other):
"""
Right hand version of :meth:`__div__`. This supports list and generators
as the left hand side too.
"""
if isinstance(other, str):
other = Path(other)
if isinstance(other, Path):
return other/self
elif isinstance(other, (list, tuple)):
return other.__class__(path/self for path in other)
else:
return (path/self for path in other)
def relative(self, basepath):
"""
Return an relative :class:`Path` ``rel`` such that
``basepath/rel == self``, i.e. this is the inverse operation of
:meth:`__div__`.
If ``self`` is relative, an identical copy of ``self`` will be
returned.
"""
# if ``self`` is relative don't do anything
if not self.isabs:
pass # FIXME return self.clone()
basepath = Path(basepath) # clone/coerce
self_segments = _normalizepath(self.segments)
base_segments = _normalizepath(basepath.segments)
while len(self_segments) > 1 and len(base_segments) > 1 and self_segments[0] == base_segments[0]:
del self_segments[0]
del base_segments[0]
# build a path from one file to the other
self_segments[:0] = [".."]*(len(base_segments)-1)
if not len(self_segments) or self_segments == [""]:
self_segments = [".", ""]
return Path(self._segments2path(self_segments))
def reverse(self):
segments = self.segments
segments.reverse()
if segments and not segments[0]:
del segments[0]
segments.append("")
self.segments = segments
def normalize(self):
self.segments = _normalizepath(self.segments)
def normalized(self):
new = self.clone()
new.normalize()
return new
def local(self):
"""
Return ``self`` converted to a filename using the file naming
conventions of the OS. Parameters will be dropped in the resulting string.
"""
localpath = _unescape(self._path)
if self._path.endswith("/") and not (localpath.endswith(os.sep) or (os.altsep is not None and localpath.endswith(os.altsep))):
localpath += os.sep
return localpath
def abs(self):
"""
Return an absolute version of ``self``.
"""
path = os.path.abspath(self.local())
path = path.rstrip(os.sep)
if path.startswith("///"):
path = path[2:]
path = urllib.request.pathname2url(path)
if len(self) and not self.segments[-1]:
path += "/"
return Path(path)
def real(self):
"""
Return the canonical version of ``self``, eliminating all symbolic
links.
"""
path = os.path.realpath(self.local())
path = path.rstrip(os.sep)
path = urllib.request.pathname2url(path)
if path.startswith("///"):
path = path[2:]
if len(self) and not self.segments[-1]:
path += "/"
return Path(path)
class Query(dict):
__slots__ = ()
def __init__(self, arg=None, **kwargs):
if arg is not None:
if isinstance(arg, dict):
for (key, value) in arg.items():
self.add(key, value)
else:
for (key, value) in arg:
self.add(key, value)
for (key, value) in kwargs.items():
self.add(key, value)
def __setitem__(self, key, value):
dict.__setitem__(self, str(key), [str(value)])
def add(self, key, *values):
key = str(key)
values = map(str, values)
self.setdefault(key, []).extend(values)
[docs]
class URL(object):
"""
An :rfc:`2396` compliant URL.
"""
[docs]
def __init__(self, url=None):
"""
Create a new :class:`!URL` instance. ``url`` may be a :class:`str`
object, or an :class:`!URL` (in which case you'll get a copy of ``url``),
or :const:`None` (which will create an :class:`!URL` referring to the
"current document").
"""
self.url = url
def _clear(self):
# internal helper method that makes ``self`` empty.
self.reg = defaultreg
self._scheme = None
self._userinfo = None
self._host = None
self._port = None
self._path = Path()
self._reg_name = None
self._query = None
self._query_parts = None
self._opaque_part = None
self._frag = None
[docs]
def clone(self):
"""
Return an identical copy ``self``.
"""
return URL(self)
@staticmethod
def _checkscheme(scheme):
# Check whether ``scheme`` contains only legal characters.
if not scheme or scheme[0] not in schemecharfirst:
return False
for c in scheme[1:]:
if c not in schemechar:
return False
return True
class scheme(misc.propclass):
"""
The URL scheme (e.g. ``ftp``, ``ssh``, ``http`` or ``mailto``). The
scheme will be :const:`None` if the URL is a relative one.
"""
def __get__(self):
return self._scheme
def __set__(self, scheme):
"""
The scheme will be converted to lowercase on setting (if ``scheme``
is not :const:`None`, otherwise the scheme will be deleted).
"""
if scheme is None:
self._scheme = None
else:
scheme = scheme.lower()
# check if the scheme only has allowed characters
if not self._checkscheme(scheme):
raise ValueError(f"Illegal scheme char in scheme {scheme!r}")
self._scheme = scheme
self.reg = schemereg.get(scheme, defaultreg)
def __delete__(self):
"""
Deletes the scheme, i.e. makes the URL relative.
"""
self._scheme = None
self.reg = defaultreg
class userinfo(misc.propclass):
"""
The user info part of the :class:`URL`; i.e. the ``user`` part of
``http://user@www.example.com:8080/bar/baz.html;xyzzy?spam=eggs#frag``.
"""
def __get__(self):
return self._userinfo
def __set__(self, userinfo):
self._userinfo = userinfo
def __delete__(self):
self._userinfo = None
class host(misc.propclass):
"""
The host part of the :class:`URL`; i.e. the ``www.example.com`` part of
``http://user@www.example.com:8080/bar/baz.html;xyzzy?spam=eggs#frag``.
"""
def __get__(self):
return self._host
def __set__(self, host):
if host is not None:
host = host.lower()
self._host = host
def __delete__(self):
self._host = None
class port(misc.propclass):
"""
The port number of the :class:`URL` (as an :class:`int`) or :const:`None`
if the :class:`URL` has none. The ``8080`` in
``http://user@www.example.com:8080/bar/baz.html;xyzzy?spam=eggs#frag``.
"""
def __get__(self):
return self._port
def __set__(self, port):
if port is not None:
port = int(port)
self._port = port
def __delete__(self):
self._port = None
class hostport(misc.propclass):
"""
The host and (if specified) the port number of the :class:`URL`, i.e. the
``www.example.com:8080`` in
``http://user@www.example.com:8080/bar/baz.html;xyzzy?spam=eggs#frag``.
"""
def __get__(self):
if self.host is not None:
hostport = _escape(self.host, safe)
if self.port is not None:
hostport += f":{self.port}"
return hostport
else:
return None
def __set__(self, hostport):
# find the port number (RFC2396, Section 3.2.2)
if hostport is None:
del self.hostport
else:
del self.port
pos = hostport.rfind(":")
if pos != -1:
if pos != len(hostport)-1:
self.port = hostport[pos+1:]
hostport = hostport[:pos]
self.host = _unescape(hostport)
def __delete__(self):
del self.host
del self.port
class server(misc.propclass):
"""
The server part of the :class:`URL`; i.e. the ``user@www.example.com``
part of ``http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag``.
"""
def __get__(self):
if self.hostport is not None:
userinfo = self.userinfo
if userinfo is not None:
return _escape(userinfo, safe) + "@" + self.hostport
else:
return self.hostport
else:
return None
def __set__(self, server):
"""
Setting the server always works even if the current :attr:`scheme`
does use :attr:`opaque_part` or :attr:`reg_name` but will be ignored
when reassembling the URL for the :attr:`url` property.
"""
if server is None:
del self.server
else:
# find the userinfo (RFC2396, Section 3.2.2)
pos = server.find("@")
if pos != -1:
self.userinfo = _unescape(server[:pos])
server = server[pos+1:]
else:
del self.userinfo
self.hostport = server
def __delete__(self):
del self.userinfo
del self.hostport
class reg_name(misc.propclass):
"""
The reg_name part of the :class:`URL` for hierarchical schemes that use
a name based :attr:`authority` instead of :attr:`server`.
"""
def __get__(self):
return self._reg_name
def __set__(self, reg_name):
if reg_name is None:
del self.reg_name
else:
self._reg_name = reg_name
def __delete__(self):
self._reg_name = None
class authority(misc.propclass):
"""
The authority part of the :class:`URL` for hierarchical schemes.
Depending on the scheme, this is either :attr:`server` or
:attr:`reg_name`.
"""
def __get__(self):
if self.reg.useserver:
return self.server
else:
return self.reg_name
def __set__(self, authority):
if self.reg.useserver:
self.server = authority
else:
self.reg_name = authority
def __delete__(self):
if self.reg.useserver:
del self.server
else:
del self.reg_name
class isabspath(misc.propclass):
"""
Specifies whether the path of a hierarchical :class:`URL` is absolute,
(i.e. it has a leading ``"/"``). Note that the path will always be
absolute if an :attr:`authority` is specified.
"""
def __get__(self):
return (self.authority is not None) or self.path.isabs
def __set__(self, isabspath):
self.path.isabs = isabspath
class path(misc.propclass):
"""
The path segments of a hierarchical :class:`URL` as a :class:`Path` object.
"""
def __get__(self):
return self._path
def __set__(self, path):
self._path = Path(path)
def __delete__(self):
self._path = Path()
class file(misc.propclass):
"""
The filename without the path, i.e. the name part of the last component
of :attr:`path`. The ``baz.html`` part of
``http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag``.
"""
def __get__(self):
return self.path.file
def __set__(self, file):
"""
Setting the filename preserves the parameter in the last segment.
"""
self.path.file = file
def __delete__(self):
"""
Deleting the filename preserves the parameter in the last segment.
"""
del self.path.file
class ext(misc.propclass):
"""
The filename extension of the last segment of the path. The ``html`` part
of ``http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag``.
"""
def __get__(self):
return self.path.ext
def __set__(self, ext):
"""
Setting the extension preserves the parameter in the last segment.
"""
self.path.ext = ext
def __delete__(self):
"""
Deleting the extension preserves the parameter in the last segment.
"""
del self.path.ext
class query_parts(misc.propclass):
"""
The query component as a dictionary, i.e. ``{u"spam": u"eggs"}`` from
``http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag``.
If the query component couldn't be parsed, ``query_parts`` will be
:const:`False`.
"""
def __get__(self):
return self._query_parts
def __set__(self, query_parts):
self._query = _urlencode(query_parts)
self._query_parts = query_parts
def __delete__(self):
self._query = None
self._query_parts = None
class query(misc.propclass):
"""
The query component, i.e. the ``spam=eggs`` part of
``http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag``.
"""
def __get__(self):
if self._query_parts is False:
return self._query
else:
return _urlencode(self._query_parts)
def __set__(self, query):
self._query = query
if query is not None:
parts = {}
for part in query.split("&"):
namevalue = part.split("=", 1)
name = _unescape(namevalue[0].replace("+", " "))
if len(namevalue) == 2:
value = _unescape(namevalue[1].replace("+", " "))
parts.setdefault(name, []).append(value)
else:
parts = False
break
query = parts
self._query_parts = query
def __delete__(self):
self._query = None
self._query_parts = None
class opaque_part(misc.propclass):
"""
The opaque part (for schemes like ``mailto`` that are not hierarchical).
"""
def __get__(self):
return self._opaque_part
def __set__(self, opaque_part):
self._opaque_part = opaque_part
def __delete__(self):
self._opaque_part = None
class frag(misc.propclass):
"""
The fragment identifier, which references a part of the resource, i.e.
the ``frag`` part of
``http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag``.
"""
def __get__(self):
return self._frag
def __set__(self, frag):
self._frag = frag
def __delete__(self):
self._frag = None
class url(misc.propclass):
"""
The complete URL
"""
def __get__(self):
"""
Getting :attr:`url` reassembles the URL from the components.
"""
result = ""
if self.scheme is not None:
result += self.scheme + ":"
if self.reg.usehierarchy:
if self.authority is not None:
result += "//" + self.authority
if not self.path.isabs:
result += "/"
result += str(self.path)
if self.query is not None:
result += "?" + self.query
else:
result += self.opaque_part
if self.reg.usefrag and self.frag is not None:
result += "#" + _escape(self.frag, fragsafe)
return result
def __set__(self, url):
"""
Setting :attr:`url` parses ``url`` into the components. ``url``
may also be an :class:`URL` instance, in which case the URL will be
copied.
"""
self._clear()
if url is None:
return
elif isinstance(url, URL):
self.scheme = url.scheme
self.userinfo = url.userinfo
self.host = url.host
self.port = url.port
self.path = url.path.clone()
self.reg_name = url.reg_name
self.opaque_part = url.opaque_part
self.query = url.query
self.frag = url.frag
else:
url = _escape(url)
# find the scheme (RFC2396, Section 3.1)
pos = url.find(":")
if pos != -1:
scheme = url[:pos]
if self._checkscheme(scheme): # if the scheme is illegal assume there is none (e.g. "/foo.php?x=http://www.bar.com", will *not* have the scheme "/foo.php?x=http")
self.scheme = scheme # the info about what we have to expect in the rest of the URL can be found in self.reg now
url = url[pos+1:]
# find the fragment (RFC2396, Section 4.1)
if self.reg.usefrag:
# the fragment itself may not contain a "#", so find the last "#"
pos = url.rfind("#")
if pos != -1:
self.frag = _unescape(url[pos+1:])
url = url[:pos]
if self.reg.usehierarchy:
# find the query (RFC2396, Section 3.4)
pos = url.rfind("?")
if pos != -1:
self.query = url[pos+1:]
url = url[:pos]
if url.startswith("//"):
url = url[2:]
# find the authority part (RFC2396, Section 3.2)
pos = url.find("/")
if pos != -1:
authority = url[:pos]
url = url[pos:] # keep the "/"
else:
authority = url
url = "/"
self.authority = authority
self.path = Path(url)
else:
self.opaque_part = url
def __delete__(self):
"""
After deleting the URL the resulting object will refer to the
"current document".
"""
self._clear()
[docs]
def withext(self, ext):
"""
Return a new :class:`URL` where the filename extension has been replaced
with ``ext``.
"""
url = URL(self)
url.path = url.path.withext(ext)
return url
[docs]
def withoutext(self):
"""
Return a new :class:`URL` where the filename extension has been removed.
"""
url = URL(self)
url.path = url.path.withoutext()
return url
[docs]
def withfile(self, file):
"""
Return a new :class:`URL` where the filename (i.e. the name of last
component of :attr:`path_segments`) has been replaced with
``file``.
"""
url = URL(self)
url.path = url.path.withfile(file)
return url
def withoutfile(self):
url = URL(self)
url.path = url.path.withoutfile()
return url
[docs]
def withfrag(self, frag):
"""
Return a new :class:`URL` where the fragment has been replaced with
``frag``.
"""
url = URL(self)
url.frag = frag
return url
[docs]
def withoutfrag(self):
"""
Return a new :class:`URL` where the frag has been dropped.
"""
url = URL(self)
del url.frag
return url
[docs]
def __truediv__(self, other):
"""
Join ``self`` with another (possible relative) :class:`URL`
``other``, to form a new :class:`URL`.
``other`` may be a :class:`str` or :class:`URL` object. It may be
:const:`None` (referring to the "current document") in which case
``self`` will be returned. It may also be a list or other iterable.
For this case a list (or iterator) will be returned where
:meth:`__div__` will be applied to every item in the list/iterator. E.g.
the following expression returns all the files in the current directory
as absolute URLs (see the method :meth:`files` and the function
:func:`here` for further explanations)::
>>> here = url.here()
>>> for f in here/here.files():
... print(f)
"""
if isinstance(other, str):
other = URL(other)
if isinstance(other, URL):
newurl = URL()
# RFC2396, Section 5.2 (2)
if other.scheme is None and other.authority is None and not str(other.path) and other.query is None:
newurl = URL(self)
newurl.frag = other.frag
return newurl
if not self.reg.usehierarchy: # e.g. "mailto:x@y"/"file:foo"
return other
# In violation of RFC2396 we treat file URLs as relative ones (if the base is a local URL)
if other.scheme == "file" and self.islocal():
other = URL(other)
del other.scheme
del other.authority
# RFC2396, Section 5.2 (3)
if other.scheme is not None:
return other
newurl.scheme = self.scheme
newurl.query = other.query
newurl.frag = other.frag
# RFC2396, Section 5.2 (4)
if other.authority is None:
newurl.authority = self.authority
# RFC2396, Section 5.2 (5) & (6) (a) (b)
newurl._path = self._path/other._path
else:
newurl.authority = other.authority
newurl._path = other._path.clone()
return newurl
elif isinstance(other, (list, tuple)): # this makes path/list possible
return other.__class__(self/path for path in other)
else: # this makes path/generator possible
return (self/path for path in other)
[docs]
def __rtruediv__(self, other):
"""
Right hand version of :meth:`__div__`. This supports lists and iterables
as the left hand side too.
"""
if isinstance(other, str):
other = URL(other)
if isinstance(other, URL):
return other/self
elif isinstance(other, (list, tuple)):
return other.__class__(item/self for item in other)
else:
return (item/self for item in other)
[docs]
def relative(self, baseurl, allowschemerel=False):
"""
Return an relative :class:`URL` ``rel`` such that
``baseurl/rel == self``, i.e. this is the inverse operation of
:meth:`__div__`.
If ``self`` is relative, has a different :attr:`scheme` or
:attr:`authority` than ``baseurl`` or a non-hierarchical scheme, an
identical copy of ``self`` will be returned.
If ``allowschemerel`` is true, scheme relative URLs are allowed, i.e.
if both ``self`` and ``baseurl`` use the same hierarchical scheme,
but a different authority (i.e. server), a scheme relative url
(``//server/path/file.html``) will be returned.
"""
# if ``self`` is relative don't do anything
if self.scheme is None:
return URL(self)
# javascript etc.
if not self.reg.usehierarchy:
return URL(self)
baseurl = URL(baseurl) # clone/coerce
newurl = URL(self) # clone
# only calculate a new URL if to the same scheme/server, else use the original (or a scheme relative one)
if self.authority != baseurl.authority:
if self.scheme == baseurl.scheme and allowschemerel:
del newurl.scheme
return newurl
elif self.scheme != baseurl.scheme:
return newurl
del newurl.scheme
del newurl.authority
selfpath_segments = _normalizepath(self._path.segments)
basepath_segments = _normalizepath(baseurl._path.segments)
while len(selfpath_segments) > 1 and len(basepath_segments) > 1 and selfpath_segments[0] == basepath_segments[0]:
del selfpath_segments[0]
del basepath_segments[0]
# does the URL go to the same file?
if selfpath_segments == basepath_segments and self.query == baseurl.query:
# only return the frag
del newurl.path
del newurl.query
else:
# build a path from one file to the other
selfpath_segments[:0] = [".."]*(len(basepath_segments)-1)
if not len(selfpath_segments) or selfpath_segments == [""]:
selfpath_segments = [".", ""]
newurl._path.segments = selfpath_segments
newurl._path = self.path.relative(baseurl.path)
newurl._path.isabs = False
return newurl
def __str__(self):
return self.url
def __repr__(self):
return f"URL({self.url!r})"
[docs]
def __bool__(self):
"""
Return whether the :class:`URL` is not empty, i.e. whether it is not the
:class:`URL` referring to the start of the current document.
"""
return self.url != ""
[docs]
def __eq__(self, other):
"""
Return whether two :class:`URL` objects are equal. Note that only
properties relevant for the current scheme will be compared.
"""
if self.__class__ != other.__class__:
return False
if self.scheme != other.scheme:
return False
if self.reg.usehierarchy:
if self.reg.useserver:
selfport = self.port or self.reg.defaultport
otherport = other.port or other.reg.defaultport
if self.userinfo != other.userinfo or self.host != other.host or selfport != otherport:
return False
else:
if self.reg_name != other.reg_name:
return False
if self._path != other._path:
return False
else:
if self.opaque_part != other.opaque_part:
return False
# Use canonical version of (i.e. sorted names and values)
if self.query != other.query:
return False
if self.frag != other.frag:
return False
return True
[docs]
def __ne__(self, other):
"""
Return whether two :class:`URL` objects are *not* equal.
"""
return not self == other
[docs]
def __hash__(self):
"""
Return a hash value for ``self``, to be able to use :class:`URL`
objects as dictionary keys. You must be careful not to modify an
:class:`URL` as soon as you use it as a dictionary key.
"""
res = hash(self.scheme)
if self.reg.usehierarchy:
if self.reg.useserver:
res ^= hash(self.userinfo)
res ^= hash(self.host)
res ^= hash(self.port or self.reg.defaultport)
else:
res ^= hash(self.reg_name)
res ^= hash(self._path)
else:
res ^= hash(self.opaque_part)
res ^= hash(self.query)
res ^= hash(self.frag)
return res
[docs]
def abs(self, scheme=-1):
"""
Return an absolute version of ``self`` (works only for local URLs).
If the argument ``scheme`` is specified, it will be used for the
resulting URL otherwise the result will have the same scheme as
``self``.
"""
self._checklocal()
new = self.clone()
new.path = self.path.abs()
if scheme != -1:
new.scheme = scheme
return new
[docs]
def real(self, scheme=-1):
"""
Return the canonical version of ``self``, eliminating all symbolic
links (works only for local URLs).
If the argument ``scheme`` is specified, it will be used for the
resulting URL otherwise the result will have the same scheme as
``self``.
"""
self._checklocal()
new = self.clone()
new.path = self.path.real()
if scheme != -1:
new.scheme = scheme
return new
[docs]
def islocal(self):
"""
Return whether ``self`` refers to a local file, i.e. whether
``self`` is a relative :class:`URL` or the scheme is ``root`` or
``file``).
"""
return self.reg.islocal
def _checklocal(self):
if not self.islocal():
raise ValueError(f"URL {self!r} is not local")
[docs]
def local(self):
"""
Return ``self`` as a local filename (which will only works if
``self`` is local (see :meth:`islocal`).
"""
self._checklocal()
return self.path.local()
def _connect(self, context=None, **kwargs):
return self.reg._connect(self, context=context, **kwargs)
[docs]
def connect(self, context=None, **kwargs):
"""
Return a :class:`Connection` object for accessing and modifying the
metadata of ``self``.
Whether you get a new connection object, or an existing one depends on
the scheme, the URL itself, and the context passed in (as the
``context`` argument).
"""
return self._connect(context, **kwargs)[0]
[docs]
def open(self, *args, **kwargs):
"""
Open ``self`` for reading or writing. :meth:`open` returns a
:class:`Resource` object.
Which additional parameters are supported depends on the actual resource
created. Some common parameters are:
``mode`` (supported by all resources)
A string indicating how the file is to be opened (just like the
mode argument for the builtin :func:`open`; e.g. ``"rb"`` or
``"wb"``).
``context`` (supported by all resources)
:meth:`open` needs a :class:`Connection` for this URL which it gets
from a :class:`Context` object.
``headers``
Additional headers to use for an HTTP request.
``data``
Request body to use for an HTTP POST request.
``python``
Name of the Python interpreter to use on the remote side
(used by ``ssh`` URLs)
``nice``
Nice level for the remove python (used by ``ssh`` URLs)
"""
(connection, kwargs) = self._connect(**kwargs)
if "context" in kwargs:
kwargs = kwargs.copy()
del kwargs["context"]
return connection.open(self, *args, **kwargs)
def openread(self, *args, **kwargs):
return self.open(mode="rb", *args, **kwargs)
def openwrite(self, *args, **kwargs):
return self.open(mode="wb", *args, **kwargs)
[docs]
def import_(self, name=None):
"""
Import the content of the URL ``self`` as a Python module.
``name`` can be used the specify the module name (i.e. the ``__name__``
attribute of the module). The default determines it from the URL.
"""
if self.islocal():
filename = self.real().local()
else:
filename = f"/{self.scheme}/{self.server}{self.path}"
return misc.module(self.openread().read(), filename, name)
def __iter__(self):
try:
isdir = self.isdir()
except AttributeError:
isdir = False
if isdir:
return iter(self/self.listdir())
else:
return iter(self.open())
# All the following methods need a connection and simply forward the operation to the connection
def stat(self, **kwargs):
return self.connect(**kwargs).stat(self)
def lstat(self, **kwargs):
return self.connect(**kwargs).lstat(self)
def chmod(self, mode, **kwargs):
return self.connect(**kwargs).chmod(self, mode)
def chown(self, owner=None, group=None, **kwargs):
return self.connect(**kwargs).chown(self, owner=owner, group=group)
def lchown(self, owner=None, group=None, **kwargs):
return self.connect(**kwargs).lchown(self, owner=owner, group=group)
def uid(self, **kwargs):
return self.connect(**kwargs).uid(self)
def gid(self, **kwargs):
return self.connect(**kwargs).gid(self)
def owner(self, **kwargs):
return self.connect(**kwargs).owner(self)
def group(self, **kwargs):
return self.connect(**kwargs).group(self)
def mimetype(self, **kwargs):
return self.connect(**kwargs).mimetype(self)
def exists(self, **kwargs):
return self.connect(**kwargs).exists(self)
def isfile(self, **kwargs):
return self.connect(**kwargs).isfile(self)
def isdir(self, **kwargs):
return self.connect(**kwargs).isdir(self)
def islink(self, **kwargs):
return self.connect(**kwargs).islink(self)
def ismount(self, **kwargs):
return self.connect(**kwargs).ismount(self)
def access(self, mode, **kwargs):
return self.connect(**kwargs).access(self, mode)
def size(self, **kwargs):
return self.connect(**kwargs).size(self)
def imagesize(self, **kwargs):
return self.connect(**kwargs).imagesize(self)
def cdate(self, **kwargs):
return self.connect(**kwargs).cdate(self)
def adate(self, **kwargs):
return self.connect(**kwargs).adate(self)
def mdate(self, **kwargs):
return self.connect(**kwargs).mdate(self)
def resheaders(self, **kwargs):
return self.connect(**kwargs).resheaders(self)
def remove(self, **kwargs):
return self.connect(**kwargs).remove(self)
def rmdir(self, **kwargs):
return self.connect(**kwargs).rmdir(self)
def rename(self, target, **kwargs):
return self.connect(**kwargs).rename(self, target)
def link(self, target, **kwargs):
return self.connect(**kwargs).link(self, target)
def symlink(self, target, **kwargs):
return self.connect(**kwargs).symlink(self, target)
def chdir(self, **kwargs):
return self.connect(**kwargs).chdir(self)
def mkdir(self, mode=0o777, **kwargs):
return self.connect(**kwargs).mkdir(self, mode=mode)
def makedirs(self, mode=0o777, **kwargs):
return self.connect(**kwargs).makedirs(self, mode=mode)
def walk(self, beforedir=True, afterdir=False, file=True, enterdir=True, **kwargs):
return self.connect(**kwargs).walk(self, beforedir=beforedir, afterdir=afterdir, file=file, enterdir=enterdir)
def listdir(self, include=None, exclude=None, ignorecase=False, **kwargs):
return self.connect(**kwargs).listdir(self, include=include, exclude=exclude, ignorecase=ignorecase)
def files(self, include=None, exclude=None, ignorecase=False, **kwargs):
return self.connect(**kwargs).files(self, include=include, exclude=exclude, ignorecase=ignorecase)
def dirs(self, include=None, exclude=None, ignorecase=False, **kwargs):
return self.connect(**kwargs).dirs(self, include=include, exclude=exclude, ignorecase=ignorecase)
def walkall(self, include=None, exclude=None, enterdir=None, skipdir=None, ignorecase=False, **kwargs):
return self.connect(**kwargs).walkall(self, include=include, exclude=exclude, enterdir=enterdir, skipdir=skipdir, ignorecase=ignorecase)
def walkfiles(self, include=None, exclude=None, enterdir=None, skipdir=None, ignorecase=False, **kwargs):
return self.connect(**kwargs).walkfiles(self, include=include, exclude=exclude, enterdir=enterdir, skipdir=skipdir, ignorecase=ignorecase)
def walkdirs(self, include=None, exclude=None, enterdir=None, skipdir=None, ignorecase=False, **kwargs):
return self.connect(**kwargs).walkdirs(self, include=include, exclude=exclude, enterdir=enterdir, skipdir=skipdir, ignorecase=ignorecase)
warnings.filterwarnings("always", module="url")