Source code for ll.xist.parse

# -*- coding: utf-8 -*-
# cython: language_level=3, always_allow_keywords=True

## Copyright 1999-2024 by LivingLogic AG, Bayreuth/Germany
## Copyright 1999-2024 by Walter Dörwald
##
## All Rights Reserved
##
## See ll/xist/__init__.py for the license


"""
This module contains everything you need to create XIST objects by parsing
files, strings, URLs etc.

Parsing XML is done with a pipelined approach. The first step in the pipeline
is a source object that provides the input for the rest of the pipeline.
The next step is the XML parser. It turns the input source into an iterator over
parsing events (an "event stream"). Further steps in the pipeline might resolve
namespace prefixes (:class:`NS`), and instantiate XIST classes
(:class:`Node`). The final step in the pipeline is either building an
XML tree via :func:`tree` or an iterative parsing step (similar to ElementTrees
:func:`iterparse` function) via :func:`itertree`.

Parsing a simple HTML string might e.g. look like this::

	>>> from ll.xist import xsc, parse
	>>> from ll.xist.ns import html
	>>> source = b"<a href='http://www.python.org/'>Python</a>"
	>>> doc = parse.tree(
	... 	parse.String(source),
	... 	parse.Expat(),
	... 	parse.NS(html),
	... 	parse.Node(pool=xsc.Pool(html)),
	... )
	>>> doc.string()
	'<a href="http://www.python.org/">Python</a>'

A source object is an iterable object that produces the input byte string for
the parser (possibly in multiple chunks) (and information about the URL of the
input)::

	>>> from ll.xist import parse
	>>> list(parse.String(b"<a href='http://www.python.org/'>Python</a>"))
	[('url', URL('STRING')),
	 ('bytes', "<a href='http://www.python.org/'>Python</a>")]

All subsequent objects in the pipeline are callable objects, get the input
iterator as an argument and return an iterator over events themselves. The
following code shows an example of an event stream::

	>>> from ll.xist import parse
	>>> source = b"<a href='http://www.python.org/'>Python</a>"
	>>> list(parse.events(parse.String(source), parse.Expat()))
	[('url', URL('STRING')),
	 ('position', (0, 0)),
	 ('enterstarttag', 'a'),
	 ('enterattr', 'href'),
	 ('text', 'http://www.python.org/'),
	 ('leaveattr', 'href'),
	 ('leavestarttag', 'a'),
	 ('position', (0, 39)),
	 ('text', 'Python'),
	 ('endtag', 'a')]

An event is a tuple consisting of the event type and the event data. Different
stages in the pipeline produce different event types. The following event types
can be produced by source objects:

	``"url"``
		The event data is the URL of the source. Usually such an event is produced
		only once at the start of the event stream. For sources that have no
		natural URL (like strings or streams) the URL can be specified when
		creating the source object.

	``"bytes"``
		This event is produced by source objects  (and :class:`Transcoder` objects).
		The event data is a byte string.

	``"str"``
		The event data is a string. This event is produced by :class:`Decoder`
		or source objects. Note that the only predefined pipeline objects that can
		handle ``"str"`` events are :class:`Encoder` objects, i.e. normally a
		parser handles ``"bytes"`` events, but not ``"str"`` events.

The following type of events are produced by parsers (in addition to the
``"url"`` event from above):

	``"position"``
		The event data is a tuple containing the line and column number in the
		source (both starting with 0). All the following events should use this
		position information until the next position event.

	``"xmldecl"``
		The XML declaration. The event data is a dictionary containing the keys
		``"version"``, ``"encoding"`` and ``"standalone"``. Parsers may omit this
		event.

	``"begindoctype"``
		The begin of the doctype. The event data is a dictionary containing the
		keys ``"name"``, ``"publicid"`` and ``"systemid"``.  Parsers may omit this
		event.

	``"enddoctype"``
		The end of the doctype. The event data is :const:`None`. (If there is no
		internal subset, the ``"enddoctype"`` event immediately follows the
		``"begindoctype"`` event). Parsers may omit this event.

	``"comment"``
		A comment. The event data is the content of the comment.

	``"text"``
		Text data. The event data is the text content. Parsers should try to avoid
		outputting multiple text events in sequence.

	``"cdata"``
		A CDATA section. The event data is the content of the CDATA section.
		Parsers may report CDATA sections as ``"text"`` events instead of
		``"cdata"`` events.

	``"enterstarttag"``
		The beginning of an element start tag. The event data is the element name.

	``"leavestarttag"``
		The end of an element start tag. The event data is the element name.
		The parser will output events for the attributes between the
		``"enterstarttag"`` and the ``"leavestarttag"`` event.

	``"enterattr"``
		The beginning of an attribute. The event data is the attribute name.

	``"leaveattr"``
		The end of an attribute. The event data is the attribute name.
		The parser will output events for the attribute value between the
		``"enterattr"`` and the ``"leaveattr"`` event. (In almost all cases
		this is one text event).

	``"endtag"``
		An element end tag. The event data is the element name.

	``"procinst"``
		A processing instruction. The event data is a tuple consisting of the
		processing instruction target and the data.

	``"entity"``
		An entity reference. The event data is the entity name.

The following events are produced for elements and attributes in namespace mode
(instead of those without the ``ns`` suffix). They are produced by :class:`NS`
objects or by :class:`Expat` objects when the ``ns`` argument is true (i.e.
the expat parser performs the namespace resolution):

	``"enterstarttagns"``
		The beginning of an element start tag in namespace mode.
		The event data is an (namespace name, element name) tuple.

	``"leavestarttagns"``
		The end of an element start tag in namespace mode. The event data is an
		(namespace name, element name) tuple.

	``"enterattrns"``
		The beginning of an attribute in namespace mode. The event data is an
		(namespace name, element name) tuple.

	``"leaveattrns"``
		The end of an attribute in namespace mode. The event data is an
		(namespace name, element name) tuple.

	``"endtagns"``
		An element end tag in namespace mode. The event data is an
		(namespace name, element name) tuple.

Once XIST nodes have been instantiated (by :class:`Node` objects) the
following events are used:

	``"xmldeclnode"``
		The XML declaration. The event data is an instance of
		:class:`ll.xist.ns.xml.XML`.

	``"doctypenode"``
		The doctype. The event data is an instance of :class:`ll.xist.xsc.DocType`.

	``"commentnode"``
		A comment. The event data is an instance of :class:`ll.xist.xsc.Comment`.

	``"textnode"``
		Text data. The event data is an instance of :class:`ll.xist.xsc.Text`.

	``"enterelementnode"``
		The beginning of an element. The event data is an instance of
		:class:`ll.xist.xsc.Element` (or one of its subclasses). The attributes
		of the element object are set, but the element has no content yet.

	``"leaveelementnode"``
		The end of an element. The event data is an instance of
		:class:`ll.xist.xsc.Element`.

	``"procinstnode"``
		A processing instruction. The event data is an instance of
		:class:`ll.xist.xsc.ProcInst`.

	``"entitynode"``
		An entity reference. The event data is an instance of
		:class:`ll.xist.xsc.Entity`.

For consuming event streams there are three functions:

	:func:`events`
		This generator simply outputs the events.

	:func:`tree`
		This function builds an XML tree from the events and returns it.

	:func:`itertree`
		This generator builds a tree like :func:`tree`, but returns events
		during certain steps in the parsing process.


Example
-------

The following example shows a custom generator in the pipeline that lowercases
all element and attribute names::

	from ll.xist import xsc, parse
	from ll.xist.ns import html

	def lowertag(input):
		for (event, data) in input:
			if event in {"enterstarttag", "leavestarttag", "endtag", "enterattr", "leaveattr"}:
				data = data.lower()
			yield (event, data)

	e = parse.tree(
		parse.String(b"<A HREF='gurk'><B>gurk</B></A>"),
		parse.Expat(),
		lowertag,
		parse.NS(html),
		parse.Node(pool=xsc.Pool(html))
	)

	print(e.string())

This scripts outputs:

.. sourcecode:: xml

	<a href="gurk"><b>gurk</b></a>
"""


import os, os.path, warnings, io, codecs, contextlib

from xml.parsers import expat

from ll import url as url_, xml_codec
from ll.xist import xsc, xfind
try:
	from ll.xist import sgmlop
except ImportError:
	pass
from ll.xist.ns import xml


__docformat__ = "reStructuredText"


html_xmlns = "http://www.w3.org/1999/xhtml"


###
### exceptions
###

[docs] class UnknownEventError(TypeError): """ This exception is raised when a pipeline object doesn't know how to handle an event. """ def __init__(self, pipe, event): self.pipe = pipe self.event = event def __str__(self): return f"{self.pipe!r} can't handle event type {self.event[0]!r}"
### ### Sources: Classes that create on event stream ###
[docs] class String: """ Provides parser input from a string. """
[docs] def __init__(self, data, url=None): """ Create a :class:`String` object. ``data`` must be a :class:`bytes` or :class:`str` object. ``url`` specifies the URL for the source (defaulting to ``"STRING"``). """ self.url = url_.URL(url if url is not None else "STRING") self.data = data
[docs] def __iter__(self): """ Produces an event stream of one ``"url"`` event and one ``"bytes"`` or ``"str"`` event for the data. """ yield ("url", self.url) if isinstance(self.data, bytes): yield ("bytes", self.data) elif isinstance(self.data, str): yield ("str", self.data) else: raise TypeError("data must be str or bytes")
[docs] class Iter: """ Provides parser input from an iterator over strings. """
[docs] def __init__(self, iterable, url=None): """ Create a :class:`Iter` object. ``iterable`` must be an iterable object producing :class:`bytes` or :class:`str` objects. ``url`` specifies the URL for the source (defaulting to ``"ITER"``). """ self.url = url_.URL(url if url is not None else "ITER") self.iterable = iterable
[docs] def __iter__(self): """ Produces an event stream of one ``"url"`` event followed by the ``"bytes"``/``"str"`` events for the data from the iterable. """ yield ("url", self.url) for data in self.iterable: if isinstance(data, bytes): yield ("bytes", data) elif isinstance(data, int): # From iterating over a ``bytes`` object yield ("bytes", bytes([data])) elif isinstance(data, str): yield ("str", data) else: raise TypeError("data must be str or bytes")
[docs] class Stream: """ Provides parser input from a stream (i.e. an object that provides a :meth:`read` method). """
[docs] def __init__(self, stream, url=None, bufsize=8192): """ Create a :class:`Stream` object. ``stream`` must have a :meth:`read` method (with a ``size`` argument). ``url`` specifies the URL for the source (defaulting to ``"STREAM"``). ``bufsize`` specifies the chunksize for reads from the stream. """ self.url = url_.URL(url if url is not None else "STREAM") self.stream = stream self.bufsize = bufsize
[docs] def __iter__(self): """ Produces an event stream of one ``"url"`` event followed by the ``"bytes"``/``"str"`` events for the data from the stream. """ yield ("url", self.url) while True: data = self.stream.read(self.bufsize) if data: if isinstance(data, bytes): yield ("bytes", data) elif isinstance(data, str): yield ("str", data) else: raise TypeError("data must be str or bytes") else: break
[docs] class File: """ Provides parser input from a file. """
[docs] def __init__(self, filename, bufsize=8192): """ Create a :class:`File` object. ``filename`` is the name of the file and may start with ``~`` or ``~user`` for the home directory of the current or the specified user. ``bufsize`` specifies the chunksize for reads from the file. """ self.url = url_.File(filename) self._filename = os.path.expanduser(filename) self.bufsize = bufsize
[docs] def __iter__(self): """ Produces an event stream of one ``"url"`` event followed by the ``"bytes"`` events for the data from the file. """ yield ("url", self.url) with open(self._filename, "rb") as stream: while True: data = stream.read(self.bufsize) if data: yield ("bytes", data) else: break
[docs] class URL: """ Provides parser input from a URL. """
[docs] def __init__(self, name, bufsize=8192, *args, **kwargs): """ Create a :class:`URL` object. ``name`` is the URL. ``bufsize`` specifies the chunksize for reads from the URL. ``args`` and ``kwargs`` will be passed on to the :meth:`open` method of the URL object. The URL for the input will be the final URL for the resource (i.e. it will include redirects). """ self.url = url_.URL(name) self.bufsize = bufsize self.args = args self.kwargs = kwargs
[docs] def __iter__(self): """ Produces an event stream of one ``"url"`` event followed by the ``"bytes"`` events for the data from the URL. """ stream = self.url.open("rb", *self.args, **self.kwargs) yield ("url", stream.finalurl()) with contextlib.closing(stream) as stream: while True: data = stream.read(self.bufsize) if data: yield ("bytes", data) else: break
[docs] class ETree: """ Produces a (namespaced) event stream from an object that supports the ElementTree__ API. __ http://effbot.org/zone/element-index.htm """
[docs] def __init__(self, data, url=None, defaultxmlns=None): """ Create an :class:`ETree` object. Arguments have the following meaning: ``data`` An object that supports the ElementTree API. ``url`` The URL of the source. Defaults to ``"ETREE"``. ``defaultxmlns`` The namespace name (or a namespace module containing a namespace name) that will be used for all elements that don't have a namespace. """ self.url = url_.URL(url if url is not None else "ETREE") self.data = data self.defaultxmlns = xsc.nsname(defaultxmlns)
def _asxist(self, node): name = type(node).__name__ if "Element" in name: elementname = node.tag if elementname.startswith("{"): (elementxmlns, sep, elementname) = elementname[1:].partition("}") else: elementxmlns = self.defaultxmlns yield ("enterstarttagns", (elementxmlns, elementname)) for (attrname, attrvalue) in node.items(): if attrname.startswith("{"): (attrxmlns, sep, attrname) = attrname[1:].partition("}") else: attrxmlns = None yield ("enterattrns", (attrxmlns, attrname)) yield ("text", attrvalue) yield ("leaveattrns", (attrxmlns, attrname)) yield ("leavestarttagns", (elementxmlns, elementname)) if node.text: yield ("text", node.text) for child in node: yield from self._asxist(child) if hasattr(child, "tail") and child.tail: yield ("text", child.tail) yield ("endtagns", (elementxmlns, elementname)) elif "ProcessingInstruction" in name: yield ("procinst", (node.target, node.text)) elif "Comment" in name: yield ("comment", node.text)
[docs] def __iter__(self): """ Produces an event stream of namespaced parsing events for the ElementTree object passed as ``data`` to the constructor. """ yield ("url", self.url) yield from self._asxist(self.data)
### ### Transformers: Classes that transform the event stream. ###
[docs] class Decoder: """ Decode the :class:`bytes` object produced by the previous object in the pipeline to :class:`str` object. This input object can be a source object or any other pipeline object that produces :class:`bytes` objects. """
[docs] def __init__(self, encoding=None): """ Create a :class:`Decoder` object. ``encoding`` is the encoding of the input. If ``encoding`` is :const:`None` it will be automatically detected from the XML data. """ self.encoding = encoding
def __call__(self, input): decoder = codecs.getincrementaldecoder("xml")(encoding=self.encoding) for (evtype, data) in input: if evtype == "bytes": data = decoder.decode(data, False) if data: yield ("str", data) elif evtype == "str": if data: yield ("str", data) elif evtype == "url": yield ("url", data) else: raise UnknownEventError(self, (evtype, data)) data = decoder.decode(b"", True) if data: yield ("str", data) def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__} object encoding={self.encoding!r} at {id(self):#x}>"
[docs] class Encoder: """ Encode the :class:`str` objects produced by the previous object in the pipeline to :class:`bytes` objects. This input object must be a pipeline object that produces string output (e.g. a :class:`Decoder` object). This can e.g. be used to parse a :class:`str` object instead of a :class:`bytes` object like this:: >>> from ll.xist import xsc, parse >>> from ll.xist.ns import html >>> source = "<a href='http://www.python.org/'>Python</a>" >>> doc = parse.tree( ... parse.String(source), ... parse.Encoder(encoding="utf-8"), ... parse.Expat(encoding="utf-8"), ... parse.NS(html), ... parse.Node(pool=xsc.Pool(html)), ... ) >>> doc.string() '<a href="http://www.python.org/">Python</a>' """
[docs] def __init__(self, encoding=None): """ Create an :class:`Encoder` object. ``encoding`` will be the encoding of the output. If ``encoding`` is :const:`None` it will be automatically detected from the XML declaration in the data. """ self.encoding = encoding
def __call__(self, input): encoder = codecs.getincrementalencoder("xml")(encoding=self.encoding) for (evtype, data) in input: if evtype == "str": data = encoder.encode(data, False) if data: yield ("bytes", data) elif evtype == "bytes": if data: yield ("bytes", data) elif evtype == "url": yield ("url", data) else: raise UnknownEventError(self, (evtype, data)) data = encoder.encode("", True) if data: yield ("bytes", data) def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__} object encoding={self.encoding!r} at {id(self):#x}>"
[docs] class Transcoder: """ Transcode the :class:`bytes` object of the input object into another encoding. This input object can be a source object or any other pipeline object that produces :class:`bytes` events. """
[docs] def __init__(self, fromencoding=None, toencoding=None): """ Create a :class:`Transcoder` object. ``fromencoding`` is the encoding of the input. ``toencoding`` is the encoding of the output. If any of them is :const:`None` the encoding will be detected from the data. """ self.fromencoding = fromencoding self.toencoding = toencoding
def __call__(self, input): decoder = codecs.getincrementaldecoder("xml")(encoding=self.fromencoding) encoder = codecs.getincrementalencoder("xml")(encoding=self.toencoding) for (evtype, data) in input: if evtype == "bytes": data = encoder.encode(decoder.decode(data, False), False) if data: yield ("bytes", data) elif evtype == "url": yield ("url", data) else: raise UnknownEventError(self, (evtype, data)) data = encoder.encode(decoder.decode(b"", True), True) if data: yield ("bytes", data) def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__} object fromencoding={self.fromencoding!r} toencoding={self.toencoding!r} at {id(self):#x}>"
### ### Parsers ###
[docs] class Parser: """ Basic parser interface. """ evxmldecl = "xmldecl" evbegindoctype = "begindoctype" evenddoctype = "enddoctype" evcomment = "comment" evtext = "text" evcdata = "cdata" eventerstarttag = "enterstarttag" eventerstarttagns = "enterstarttagns" eventerattr = "enterattr" eventerattrns = "enterattrns" evleaveattr = "leaveattr" evleaveattrns = "leaveattrns" evleavestarttag = "leavestarttag" evleavestarttagns = "leavestarttagns" evendtag = "endtag" evendtagns = "endtagns" evprocinst = "procinst" eventity = "entity" evposition = "position" evurl = "url"
[docs] class Expat(Parser): """ A parser using Pythons builtin :mod:`expat` parser. """
[docs] def __init__(self, encoding=None, xmldecl=False, doctype=False, loc=True, cdata=False, ns=False): """ Create an :class:`Expat` parser. Arguments have the following meaning: ``encoding`` : string or :const:`None` Forces the parser to use the specified encoding. The default :const:`None` results in the encoding being detected from the XML itself. ``xmldecl`` : bool Should the parser produce events for the XML declaration? ``doctype`` : bool Should the parser produce events for the document type? ``loc`` : bool Should the parser produce ``"location"`` events? ``cdata`` : bool Should the parser output CDATA sections as ``"cdata"`` events? (If ``cdata`` is false ``"text"`` events are output instead.) ``ns`` : bool If ``ns`` is true, the parser performs namespace processing itself, i.e. it will emit ``"enterstarttagns"``, ``"leavestarttagns"``, ``"endtagns"``, ``"enterattrns"`` and ``"leaveattrns"`` events instead of ``"enterstarttag"``, ``"leavestarttag"``, ``"endtag"``, ``"enterattr"`` and ``"leaveattr"`` events. """ self.encoding = encoding self.xmldecl = xmldecl self.doctype = doctype self.loc = loc self.cdata = cdata self.ns = ns
def __repr__(self): v = [] if self.encoding is not None: v.append(f" encoding={self.encoding!r}") if self.xmldecl is not None: v.append(f" xmldecl={self.xmldecl!r}") if self.doctype is not None: v.append(f" doctype={self.doctype!r}") if self.loc is not None: v.append(f" loc={self.loc!r}") if self.cdata is not None: v.append(f" cdata={self.cdata!r}") if self.ns is not None: v.append(f" ns={self.ns!r}") attrs = "".join(v) return f"<{self.__class__.__module__}.{self.__class__.__qualname__} object{attrs} at {id(self):#x}>"
[docs] def __call__(self, input): """ Return an iterator over the events produced by ``input``. """ self._parser = expat.ParserCreate(self.encoding, "\x01" if self.ns else None) self._parser.buffer_text = True self._parser.ordered_attributes = True self._parser.UseForeignDTD(True) self._parser.CharacterDataHandler = self._handle_text self._parser.StartElementHandler = self._handle_startelement self._parser.EndElementHandler = self._handle_endelement self._parser.ProcessingInstructionHandler = self._handle_procinst self._parser.CommentHandler = self._handle_comment self._parser.DefaultHandler = self._handle_default if self.cdata: self._parser.StartCdataSectionHandler = self._handle_startcdata self._parser.EndCdataSectionHandler = self._handle_endcdata if self.xmldecl: self._parser.XmlDeclHandler = self._handle_xmldecl # Always required, as we want to recognize whether a comment or PI is in the internal DTD subset self._parser.StartDoctypeDeclHandler = self._handle_begindoctype self._parser.EndDoctypeDeclHandler = self._handle_enddoctype self._indoctype = False self._incdata = False self._currentloc = None # Remember the last reported position # Buffers the events generated during one call to ``Parse`` self._buffer = [] try: for (evtype, data) in input: if evtype == "bytes": try: self._parser.Parse(data, False) except Exception as exc: # In case of an exception we want to output the events we have gathered so far, before reraising the exception yield from self._flush(True) raise exc else: yield from self._flush(False) elif evtype == "url": yield (self.evurl, data) else: raise UnknownEventError(self, (evtype, data)) try: self._parser.Parse(b"", True) except Exception as exc: yield from self._flush(True) raise exc else: yield from self._flush(True) finally: del self._buffer del self._currentloc del self._incdata del self._indoctype del self._parser
def _event(self, evtype, evdata): loc = None if self.loc: loc = (self._parser.CurrentLineNumber-1, self._parser.CurrentColumnNumber) if loc == self._currentloc: loc = None if self._buffer and evtype == self._buffer[-1][0] == self.evtext: self._buffer[-1] = (evtype, self._buffer[-1][1] + evdata) else: if loc: self._buffer.append((self.evposition, loc)) self._buffer.append((evtype, evdata)) if loc: self._currentloc = loc def _flush(self, force): # Flush ``self._buffer`` as far as possible if force or not self._buffer or self._buffer[-1][0] != self.evtext: yield from self._buffer del self._buffer[:] else: # hold back the last text event, because there might be more yield from self._buffer[:-1] del self._buffer[:-1] def _getname(self, name): if self.ns: if "\x01" in name: return tuple(name.split("\x01")) return (None, name) return name def _handle_startcdata(self): self._incdata = True def _handle_endcdata(self): self._incdata = False def _handle_xmldecl(self, version, encoding, standalone): standalone = (bool(standalone) if standalone != -1 else None) self._event(self.evxmldecl, {"version": version, "encoding": encoding, "standalone": standalone}) def _handle_begindoctype(self, doctypename, systemid, publicid, has_internal_subset): if self.doctype: self._event(self.evbegindoctype, {"name": doctypename, "publicid": publicid, "systemid": systemid}) def _handle_enddoctype(self): if self.doctype: self._event(self.evenddoctype, None) def _handle_default(self, data): if data.startswith("&") and data.endswith(";"): self._event(self.eventity, data[1:-1]) def _handle_comment(self, data): if not self._indoctype: self._event(self.evcomment, data) def _handle_text(self, data): self._event(self.evcdata if self._incdata else self.evtext, data) def _handle_startelement(self, name, attrs): name = self._getname(name) self._event(self.eventerstarttagns if self.ns else self.eventerstarttag, name) for i in range(0, len(attrs), 2): key = self._getname(attrs[i]) self._event(self.eventerattrns if self.ns else self.eventerattr, key) self._event(self.evtext, attrs[i+1]) self._event(self.evleaveattrns if self.ns else self.evleaveattr, key) self._event(self.evleavestarttagns if self.ns else self.evleavestarttag, name) def _handle_endelement(self, name): name = self._getname(name) self._event(self.evendtagns if self.ns else self.evendtag, name) def _handle_procinst(self, target, data): if not self._indoctype: self._event(self.evprocinst, (target, data))
[docs] class SGMLOP(Parser): """ A parser based on :mod:`sgmlop`. """
[docs] def __init__(self, encoding=None, cdata=False): """ Create a :class:`SGMLOP` parser. Arguments have the following meaning: ``encoding`` : string or :const:`None` Forces the parser to use the specified encoding. The default :const:`None` results in the encoding being detected from the XML itself. ``cdata`` : bool Should the parser output CDATA sections as ``"cdata"`` events? (If ``cdata`` is false output ``"text"`` events instead.) """ self.encoding = encoding self.cdata = cdata
def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__} object encoding={self.encoding!r} at {id(self):#x}>"
[docs] def __call__(self, input): """ Return an iterator over the events produced by ``input``. """ decoder = codecs.getincrementaldecoder("xml")(encoding=self.encoding) parser = sgmlop.XMLParser() parser.register(self) self._buffer = [] try: for (evtype, data) in input: if evtype == "bytes": data = decoder.decode(data, False) evtype = "str" if evtype == "str": try: parser.feed(data) except Exception as exc: # In case of an exception we want to output the events we have gathered so far, before reraising the exception yield from self._flush(True) self._parser.close() raise exc else: yield from self._flush(False) elif evtype == "url": yield (self.evurl, data) else: raise UnknownEventError(self, (evtype, data)) parser.close() yield from self._flush(True) finally: del self._buffer parser.register(None)
def _event(self, evtype, evdata): if self._buffer and evtype == self._buffer[-1][0] == self.evtext: self._buffer[-1] = (evtype, self._buffer[-1][1] + evdata) else: self._buffer.append((evtype, evdata)) def _flush(self, force): # Flush ``self._buffer`` as far as possible if force or not self._buffer or self._buffer[-1][0] != self.evtext: yield from self._buffer del self._buffer[:] else: # hold back the last text event, because there might be more yield from self._buffer[:-1] del self._buffer[:-1] def handle_comment(self, data): self._event(self.evcomment, data) def handle_text(self, data): self._event(self.evtext, data) def handle_cdata(self, data): self._event(self.evcdata if self.cdata else self.evtext, data) def handle_proc(self, target, data): if target.lower() != "xml": self._event(self.evprocinst, (target, data)) def handle_entityref(self, name): self._event(self.eventity, name) def handle_enterstarttag(self, name): self._event(self.eventerstarttag, name) def handle_leavestarttag(self, name): self._event(self.evleavestarttag, name) def handle_enterattr(self, name): self._event(self.eventerattr, name) def handle_leaveattr(self, name): self._event(self.evleaveattr, name) def handle_endtag(self, name): self._event(self.evendtag, name)
[docs] class NS: """ An :class:`NS` object is used in a parsing pipeline to add support for XML namespaces. It replaces the ``"enterstarttag"``, ``"leavestarttag"``, ``"endtag"``, ``"enterattr"`` and ``"leaveattr"`` events with the appropriate namespace version of the events (i.e. ``"enterstarttagns"`` etc.) where the event data is a ``(namespace, name)`` tuple. The output of an :class:`NS` object in the stream looks like this:: >>> from ll.xist import parse >>> from ll.xist.ns import html >>> list(parse.events( ... parse.String(b"<a href='http://www.python.org/'>Python</a>"), ... parse.Expat(), ... parse.NS(html) ... )) [('url', URL('STRING')), ('position', (0, 0)), ('enterstarttagns', ('http://www.w3.org/1999/xhtml', 'a')), ('enterattrns', (None, 'href')), ('text', 'http://www.python.org/'), ('leaveattrns', (None, 'href')), ('leavestarttagns', ('http://www.w3.org/1999/xhtml', 'a')), ('position', (0, 39)), ('text', 'Python'), ('endtagns', ('http://www.w3.org/1999/xhtml', 'a'))] """
[docs] def __init__(self, prefixes=None, **kwargs): """ Create an :class:`NS` object. ``prefixes`` (if not :const:`None`) can be a namespace name (or module), which will be used for the empty prefix, or a dictionary that maps prefixes to namespace names (or modules). ``kwargs`` maps prefixes to namespaces names too. If a prefix is in both ``prefixes`` and ``kwargs``, ``kwargs`` wins. """ # the currently active prefix mapping (will be replaced once xmlns attributes are encountered) newprefixes = {} def make(prefix, xmlns): if prefix is not None and not isinstance(prefix, str): raise TypeError(f"prefix must be None or string, not {type(prefix)!r}") xmlns = xsc.nsname(xmlns) if not isinstance(xmlns, str): raise TypeError(f"xmlns must be string, not {type(xmlns)!r}") newprefixes[prefix] = xmlns if prefixes is not None: if isinstance(prefixes, dict): for (prefix, xmlns) in prefixes.items(): make(prefix, xmlns) else: make(None, prefixes) for (prefix, xmlns) in kwargs.items(): make(prefix, xmlns) self._newprefixes = self._attrs = self._attr = None # A stack entry is an ``((namespacename, elementname), prefixdict)`` tuple self._prefixstack = [(None, newprefixes)]
def __call__(self, input): for (evtype, data) in input: try: handler = getattr(self, evtype) except AttributeError: raise UnknownEventError(self, (evtype, data)) yield from handler(data) def url(self, data): yield ("url", data) def xmldecl(self, data): data = ("xmldecl", data) if self._attr is not None: self._attr.append(data) else: yield data def begindoctype(self, data): data = ("begindoctype", data) if self._attr is not None: self._attr.append(data) else: yield data def enddoctype(self, data): data = ("enddoctype", data) if self._attr is not None: self._attr.append(data) else: yield data def comment(self, data): data = ("comment", data) if self._attr is not None: self._attr.append(data) else: yield data def text(self, data): data = ("text", data) if self._attr is not None: self._attr.append(data) else: yield data def cdata(self, data): data = ("cdata", data) if self._attr is not None: self._attr.append(data) else: yield data def procinst(self, data): data = ("procinst", data) if self._attr is not None: self._attr.append(data) else: yield data def entity(self, data): data = ("entity", data) if self._attr is not None: self._attr.append(data) else: yield data def position(self, data): data = ("position", data) if self._attr is not None: self._attr.append(data) else: yield data def enterstarttag(self, data): self._newprefixes = {} self._attrs = {} self._attr = None if 0: yield False def enterattr(self, data): if data == "xmlns" or data.startswith("xmlns:"): prefix = data[6:] or None self._newprefixes[prefix] = self._attr = [] else: self._attrs[data] = self._attr = [] if 0: yield False def leaveattr(self, data): self._attr = None if 0: yield False def leavestarttag(self, data): oldprefixes = self._prefixstack[-1][1] if self._newprefixes: prefixes = oldprefixes.copy() newprefixes = {key: "".join(d for (t, d) in value if t == "text") or None for (key, value) in self._newprefixes.items()} prefixes.update(newprefixes) else: prefixes = oldprefixes (prefix, sep, name) = data.rpartition(":") prefix = prefix or None try: data = (prefixes[prefix], name) except KeyError: raise xsc.IllegalPrefixError(prefix) self._prefixstack.append((data, prefixes)) yield ("enterstarttagns", data) for (attrname, attrvalue) in self._attrs.items(): if ":" in attrname: (attrprefix, attrname) = attrname.split(":", 1) if attrprefix == "xml": xmlns = xsc.xml_xmlns else: try: xmlns = prefixes[attrprefix] except KeyError: raise xsc.IllegalPrefixError(attrprefix) else: xmlns = None yield ("enterattrns", (xmlns, attrname)) yield from attrvalue yield ("leaveattrns", (xmlns, attrname)) yield ("leavestarttagns", data) self._newprefixes = self._attrs = self._attr = None def endtag(self, data): (data, prefixes) = self._prefixstack.pop() yield ("endtagns", data)
[docs] class Node: """ A :class:`Node` object is used in a parsing pipeline to instantiate XIST nodes. It consumes a namespaced event stream:: >>> from ll.xist import xsc, parse >>> from ll.xist.ns import html >>> list(parse.events( ... parse.String(b"<a href='http://www.python.org/'>Python</a>"), ... parse.Expat(), ... parse.NS(html), ... parse.Node(pool=xsc.Pool(html)) ... )) [('enterelementnode', <element ll.xist.ns.html.a xmlns='http://www.w3.org/1999/xhtml' (no children/1 attr) location='STRING:0:0' at 0x10a683550>), ('textnode', <ll.xist.xsc.Text content='Python' location='STRING:0:39' at 0x10a5e1170>), ('leaveelementnode', <element ll.xist.ns.html.a xmlns='http://www.w3.org/1999/xhtml' (no children/1 attr) location='STRING:0:0' at 0x10a683550>) ] The event data of all events are XIST nodes. The element node from the ``"enterelementnode"`` event already has all attributes set. There will be no events for attributes. """
[docs] def __init__(self, pool=None, base=None, loc=True): """ Create a :class:`Node` object. ``pool`` may be :const:`None` or a :class:`xsc.Pool` object and specifies which classes used for creating element, entity and processsing instruction instances. ``base`` specifies the base URL for interpreting relative links in the input. ``loc`` specified whether location information should be attached to the nodes that get generated (the ``startloc`` attribute (and ``endloc`` attribute for elements)) """ self.pool = (pool if pool is not None else xsc.threadlocalpool.pool) if base is not None: base = url_.URL(base) self._base = base self._url = url_.URL() self.loc = loc self._position = (None, None) self._stack = [] self._inattr = False self._indoctype = False
@property def base(self): if self._base is None: return self._url else: return self._base def __call__(self, input): for (evtype, data) in input: try: handler = getattr(self, evtype) except AttributeError: raise UnknownEventError(self, (evtype, data)) event = handler(data) if event: yield event def url(self, data): self._url = data def xmldecl(self, data): node = xml.XML(version=data["version"], encoding=data["encoding"], standalone=data["standalone"]) if self.loc: node.startloc = xsc.Location(self._url, *self._position) return ("xmldeclnode", node) def begindoctype(self, data): if data["publicid"]: content = f'{data["name"]} PUBLIC "{data["publicid"]}" "{data["systemid"]}"' elif data["systemid"]: content = f'{data["name"]} SYSTEM "{data["systemid"]}"' else: content = data["name"] node = xsc.DocType(content) if self.loc: node.startloc = xsc.Location(self._url, *self._position) self.doctype = node self._indoctype = True def enddoctype(self, data): result = ("doctypenode", self.doctype) del self.doctype self._indoctype = False return result def entity(self, data): node = self.pool.entity(data) if self.loc: node.startloc = xsc.Location(self._url, *self._position) node.parsed(self, "entity") if self._inattr: self._stack[-1].append(node) elif not self._indoctype: return ("entitynode", node) def comment(self, data): node = xsc.Comment(data) if self.loc: node.startloc = xsc.Location(self._url, *self._position) node.parsed(self, "comment") if self._inattr: self._stack[-1].append(node) elif not self._indoctype: return ("commentnode", node) def cdata(self, data): node = xsc.Text(data) if self.loc: node.startloc = xsc.Location(self._url, *self._position) node.parsed(self, "cdata") if self._inattr: self._stack[-1].append(node) elif not self._indoctype: return ("textnode", node) def text(self, data): node = xsc.Text(data) if self.loc: node.startloc = xsc.Location(self._url, *self._position) node.parsed(self, "text") if self._inattr: self._stack[-1].append(node) elif not self._indoctype: return ("textnode", node) def enterstarttagns(self, data): node = self.pool.element(*data) if self.loc: node.startloc = xsc.Location(self._url, *self._position) self._stack.append(node) node.parsed(self, "starttagns") def enterattrns(self, data): attrkey = self.pool.attrkey(*data) self._stack[-1].attrs[attrkey] = () node = self._stack[-1].attrs[attrkey] if self.loc: node.startloc = xsc.Location(self._url, *self._position) self._stack.append(node) self._inattr = True node.parsed(self, "enterattrns") def leaveattrns(self, data): node = self._stack.pop() self._inattr = False node.parsed(self, "leaveattrns") def leavestarttagns(self, data): node = self._stack[-1] node.parsed(self, "leavestarttagns") return ("enterelementnode", node) def endtagns(self, data): node = self._stack.pop() if self.loc: node.endloc = xsc.Location(self._url, *self._position) node.parsed(self, "endtagns") return ("leaveelementnode", node) def procinst(self, data): node = self.pool.procinst(*data) if self.loc: node.startloc = xsc.Location(self._url, *self._position) node.parsed(self, "procinst") if self._inattr: self._stack[-1].append(node) elif not self._indoctype: return ("procinstnode", node) def position(self, data): self._position = data
[docs] class Tidy: """ A :class:`Tidy` object parses (potentially ill-formed) HTML from a source into a (non-namespaced) event stream by using lxml__'s HTML parser:: >>> from ll.xist import parse >>> list(parse.events(parse.URL("http://www.yahoo.com/"), parse.Tidy())) [('url', URL('http://de.yahoo.com/?p=us')), ('enterstarttag', 'html'), ('enterattr', 'class'), ('text', 'y-fp-bg y-fp-pg-grad bkt708'), ('leaveattr', 'class'), ('enterattr', 'lang'), ('text', 'de-DE'), ('leaveattr', 'lang'), ('enterattr', 'style'), ('leaveattr', 'style'), ('leavestarttag', 'html'), ... __ http://lxml.de/ """
[docs] def __init__(self, encoding=None, xmldecl=False, doctype=False): """ Create a new :class:`Tidy` object. Parameters have the following meaning: ``encoding`` : string or :const:`None` The encoding of the input. If ``encoding`` is :const:`None` it will be automatically detected by the HTML parser. ``xmldecl`` : bool Should the parser produce events for the XML declaration? ``doctype`` : bool Should the parser produce events for the document type? """ self.encoding = encoding self.xmldecl = xmldecl self.doctype = doctype
def __repr__(self): return f"<{self.__class__.__module__}.{self.__class__.__qualname__} object encoding={self.encoding!r} at {id(self):#x}>" def _asxist(self, node): name = type(node).__name__ if "ElementTree" in name: if self.xmldecl: yield ("xmldecl", {"version": node.docinfo.xml_version or "1.0", "encoding": node.docinfo.encoding, "standalone": node.docinfo.standalone}) if self.doctype: yield ("begindoctype", {"name": node.docinfo.root_name, "publicid": node.docinfo.public_id, "systemid": node.docinfo.system_url}) yield ("enddoctype", None) yield from self._asxist(node.getroot()) elif "Element" in name: elementname = node.tag yield ("enterstarttag", elementname) for (attrname, attrvalue) in sorted(node.items()): yield ("enterattr", attrname) if attrvalue: yield ("text", attrvalue) yield ("leaveattr", attrname) yield ("leavestarttag", elementname) if node.text: yield ("text", node.text) for child in node: yield from self._asxist(child) if hasattr(child, "tail") and child.tail: yield ("text", child.tail) yield ("endtag", elementname) elif "ProcessingInstruction" in name: yield ("procinst", (node.target, node.text)) elif "Comment" in name: yield ("comment", node.text) # ignore all other types def __call__(self, input): from lxml import etree # This requires lxml (see http://lxml.de/) url = None collectdata = [] for (evtype, data) in input: if evtype == "url": if url is None: url = data else: raise ValueError("got multiple url events") elif evtype == "bytes": collectdata.append(data) else: raise UnknownEventError(self, (evtype, data)) data = b"".join(collectdata) if url is not None: yield ("url", url) if data: parser = etree.HTMLParser(encoding=self.encoding) doc = etree.parse(io.BytesIO(data), parser) yield from self._asxist(doc)
### ### Consumers: Functions that consume an event stream ###
[docs] def events(*pipeline): """ Return an iterator over the events produced by the pipeline objects in ``pipeline``. """ source = pipeline[0] # Propagate first pipeline object to a source object (if unambiguous, else use it as it is) if isinstance(source, (bytes, str)): source = String(source) elif isinstance(source, url_.URL): source = URL(source) # Execute the pipeline, propagating pipeline objects in the process output = iter(source) for pipe in pipeline[1:]: if isinstance(pipe, xsc.Pool): pipe = Node(pool=pipe) output = pipe(output) return output
[docs] def tree(*pipeline, validate=False): """ Return a tree of XIST nodes from the event stream ``pipeline``. ``pipeline`` must output only events that contain XIST nodes, i.e. the event types ``"xmldeclnode"``, ``"doctypenode"``, ``"commentnode"``, ``"textnode"``, ``"enterelementnode"``, ``"leaveelementnode"``, ``"procinstnode"`` and ``"entitynode"``. If ``validate`` is true, the tree is validated, i.e. it is checked if the structure of the tree is valid (according to the ``model`` attribute of each element node), if no undeclared elements or attributes have been encountered, all required attributes are specified and all attributes have allowed values. The node returned from :func:`tree` will always be a :class:`Frag` object. Example:: >>> from ll.xist import xsc, parse >>> from ll.xist.ns import xml, html, chars >>> doc = parse.tree( ... parse.URL("http://www.python.org/"), ... parse.Tidy(), ... parse.NS(html), ... parse.Node(pool=xsc.Pool(xml, html, chars)) ... ) >>> doc[0] <element ll.xist.ns.html.html xmlns='http://www.w3.org/1999/xhtml' (7 children/3 attrs) location='https://www.python.org/:?:?' at 0x110a4ecd0> """ path = [xsc.Frag()] for (evtype, node) in events(*pipeline): if evtype == "enterelementnode": path[-1].append(node) path.append(node) elif evtype == "leaveelementnode": if validate: for warning in node.validate(False, path): warnings.warn(warning) path.pop() else: path[-1].append(node) if validate: for warning in node.validate(False, path): warnings.warn(warning) return path[0]
[docs] def itertree(*pipeline, entercontent=True, enterattrs=False, enterattr=False, enterelementnode=False, leaveelementnode=True, enterattrnode=True, leaveattrnode=False, selector=None, validate=False): """ Parse the event stream ``pipeline`` iteratively. :func:`itertree` still builds a tree, but it returns an iterator of :class:`xsc.Cursor` objects that tracks changes to the tree as it is built. ``validate`` specifies whether each node should be validated after it has been fully parsed. The rest of the arguments can be used to control when :func:`itertree` returns to the calling code. For an explanation of their meaning see the class :class:`ll.xist.xsc.Cursor`. Example:: >>> from ll.xist import xsc, parse >>> from ll.xist.ns import xml, html, chars >>> for c in parse.itertree( ... parse.URL("http://www.python.org/"), ... parse.Tidy(), ... parse.NS(html), ... parse.Node(pool=xsc.Pool(xml, html, chars)), ... selector=html.a/html.img ... ): ... print(c.path[-1].attrs.src, "-->", c.path[-2].attrs.href) https://www.python.org/static/img/python-logo.png --> https://www.python.org/ """ selector = xfind.selector(selector) cursor = xsc.Cursor(xsc.Frag(), entercontent=entercontent, enterattrs=enterattrs, enterattr=enterattr, enterelementnode=enterelementnode, leaveelementnode=leaveelementnode, enterattrnode=enterattrnode, leaveattrnode=leaveattrnode) cursor.index.append(0) skipcontent = None # If this is not :const:`None`, we're currently skipping past the content of this element for (evtype, node) in events(*pipeline): cursor.event = evtype if evtype == "enterelementnode": cursor.path[-1].append(node) cursor.path.append(node) cursor.node = node enterattrs = cursor.enterattrs entercontent = cursor.entercontent if cursor.enterelementnode and cursor.path in selector and skipcontent is None: yield cursor enterattrs = cursor.enterattrs entercontent = cursor.entercontent cursor.restore() if enterattrs: yield from node.attrs._walk(cursor) cursor.index.append(0) if not entercontent and skipcontent is None: # Skip all events until we leave this element skipcontent = cursor.node elif evtype == "leaveelementnode": if validate: for warning in node.validate(False, cursor.path): warnings.warn(warning) cursor.index.pop() if skipcontent is cursor.node: skipcontent = None if cursor.leaveelementnode and cursor.path in selector and skipcontent is None: yield cursor cursor.restore() cursor.path.pop() cursor.node = cursor.path[-1] cursor.index[-1] += 1 else: cursor.path[-1].append(node) cursor.path.append(node) cursor.node = node if validate: for warning in node.validate(False, cursor.path): warnings.warn(warning) if cursor.path in selector and skipcontent is None: yield cursor cursor.restore() cursor.path.pop() cursor.node = cursor.path[-1] cursor.index[-1] += 1