import io
import xml.etree.ElementTree as ET
import xml.sax.saxutils
from collections import deque
from contextlib import contextmanager
from enum import Enum
from enum import auto
from typing import Deque
from typing import Union
from xml.sax import SAXException
from xml.sax.handler import ContentHandler
from xml.sax.saxutils import XMLGenerator
import cql
from lxml import etree
from lxml.sax import saxify
from ..constants import RESPONSE_ENCODING
from ..constants import SRURecordXmlEscaping
# ---------------------------------------------------------------------------
[docs]class SRUXMLStreamWriter(ContentHandler):
[docs] class IndentingState(Enum):
SEEN_NOTHING = auto()
SEEN_ELEMENT = auto()
SEEN_DATA = auto()
def __init__(
self,
output_stream: io.TextIOBase,
record_escaping: SRURecordXmlEscaping,
indent: int = -1,
encoding: str = RESPONSE_ENCODING,
short_empty_elements: bool = False,
) -> None:
super().__init__()
self.record_escaping = record_escaping
self.writing_record = False
# TODO: SRURecordXmlEscaping.STRING
# https://www.loc.gov/standards/sru/sru-1-1.html#packing
# if self.writing_record and self.record_escaping == SRURecordXmlEscaping.STRING:
# content = escape(content)
self.depth = 0
self.indent = indent
self.indent_state = SRUXMLStreamWriter.IndentingState.SEEN_NOTHING
self.indent_state_stack: Deque[SRUXMLStreamWriter.IndentingState] = deque()
self.output_stream = self.output_stream_raw = output_stream
if self.record_escaping == SRURecordXmlEscaping.STRING:
# output_stream.__class__ ?
class SRURecordXmlEscapingStream(io.TextIOBase):
def __init__(self, writer) -> None:
super().__init__()
self.writer = writer
def write(self, __s: str) -> int:
if self.writer.writing_record:
__s = xml.sax.saxutils.escape(__s)
return self.writer.output_stream.write(__s)
def flush(self) -> None:
return self.writer.output_stream.flush()
# FIXME: is that stable, even required?
# self.output_stream = xml.sax.saxutils._gettextwriter(
# self.output_stream, encoding
# )
xml_output_stream: Union[
SRURecordXmlEscapingStream, io.TextIOBase
] = SRURecordXmlEscapingStream(self)
else:
xml_output_stream = self.output_stream
self.xmlwriter = XMLGenerator(
xml_output_stream,
encoding=encoding,
short_empty_elements=short_empty_elements,
)
# ----------------------------------------------------
def _should_do_indent_stuff(self):
if not self.writing_record:
return True
if self.record_escaping != SRURecordXmlEscaping.STRING:
return True
return False
[docs] def onStartElement(self):
if self._should_do_indent_stuff():
self.indent_state_stack.append(
SRUXMLStreamWriter.IndentingState.SEEN_ELEMENT
)
self.indent_state = SRUXMLStreamWriter.IndentingState.SEEN_NOTHING
if self.depth > 0:
self.xmlwriter.characters("\n")
self.doIndent()
self.depth += 1
[docs] def onEndElement(self):
if self._should_do_indent_stuff():
self.depth -= 1
if self.indent_state == SRUXMLStreamWriter.IndentingState.SEEN_ELEMENT:
self.xmlwriter.characters("\n")
self.doIndent()
self.indent_state = self.indent_state_stack.pop()
[docs] def onEmptyElement(self):
if self._should_do_indent_stuff:
self.indent_state = SRUXMLStreamWriter.IndentingState.SEEN_ELEMENT
if self.depth > 0:
self.xmlwriter.characters("\n")
self.doIndent()
[docs] def doIndent(self):
if self.depth > 0:
self.xmlwriter.characters(" " * self.depth * self.indent)
# ----------------------------------------------------
[docs] def startRecord(self):
if self.writing_record:
raise ValueError("was already writing record")
self.xmlwriter._flush() # or call on my stream variable?
# force writer to close/finish any pending start or end elements
self.xmlwriter._finish_pending_start_element()
self.writing_record = True
[docs] def endRecord(self):
if not self.writing_record:
raise ValueError("was not writing record")
# force writer to close/finish any pending start or end elements
self.xmlwriter._finish_pending_start_element()
self.xmlwriter._flush() # or call on my stream variable?
self.writing_record = False
# ----------------------------------------------------
# ContentHandler methods
[docs] def setDocumentLocator(self, locator):
self.xmlwriter.setDocumentLocator(locator)
[docs] def startPrefixMapping(self, prefix, uri):
self.xmlwriter.startPrefixMapping(prefix, uri)
[docs] def endPrefixMapping(self, prefix):
self.xmlwriter.endPrefixMapping(prefix)
[docs] def processingInstruction(self, target, data):
self.xmlwriter.processingInstruction(target, data)
[docs] def startDocument(self):
self.xmlwriter.startDocument()
# if self.indent > 0:
# self.xmlwriter.characters("\n")
[docs] def endDocument(self):
self.xmlwriter.endDocument()
[docs] def startElement(self, name, attrs=None):
if self.indent > 0:
self.onStartElement()
if attrs is None:
attrs = dict()
self.xmlwriter.startElement(name, attrs)
[docs] def endElement(self, name):
if self.indent > 0:
self.onEndElement()
self.xmlwriter.endElement(name)
[docs] def startElementNS(self, name, qname=None, attrs=None):
if self.indent > 0:
self.onStartElement()
if attrs is None:
attrs = dict()
else:
# small helper to set None-namespace for attributes
# that did not supply them
if not all(isinstance(key, tuple) for key in attrs.keys()):
attrs_copy = dict()
for key, value in attrs.items():
if not isinstance(key, tuple):
key = (None, key)
attrs_copy[key] = value
attrs = attrs_copy
self.xmlwriter.startElementNS(name, qname, attrs)
[docs] def endElementNS(self, name, qname=None):
if self.indent > 0:
self.onEndElement()
self.xmlwriter.endElementNS(name, qname)
[docs] def characters(self, content):
if self.indent > 0:
self.indent_state = SRUXMLStreamWriter.IndentingState.SEEN_DATA
self.xmlwriter.characters(content)
[docs] def ignorableWhitespace(self, whitespace):
self.xmlwriter.ignorableWhitespace(whitespace)
[docs] def skippedEntity(self, name):
self.xmlwriter.skippedEntity(name)
# ----------------------------------------------------
[docs] def writeXCQL(self, query: cql.CQLQuery, search_retrieve_mode: bool):
# HACK: Parsing the XCQL to serialize is wasting resources.
# Alternative would be to serialize to XCQL from CQLNode, but
# I'm not yet enthusiastic on writing the serializer myself.
class XCQLHandler(ContentHandler):
def __init__(self, writer):
super().__init__()
self.writer = writer
def startElementNS(self, name, qname, attrs):
if not search_retrieve_mode and qname == "searchClause":
return
self.writer.startElementNS(name, qname, attrs)
def endElementNS(self, name, qname):
if not search_retrieve_mode and qname == "searchClause":
return
self.writer.endElementNS(name, qname)
def characters(self, content):
if not content or content.isspace():
return
self.writer.characters(content)
try:
# tree = query.toXCQL()
# content = query.toXCQLString()
tree = query.root.toXCQL()
content = ET.tostring(tree)
tree = etree.fromstring(content)
handler = XCQLHandler(self)
saxify(tree, handler)
except Exception as ex:
raise SAXException("serializing xcql failed") from ex
# ----------------------------------------------------
[docs] @contextmanager
def prefix(self, prefix, uri):
self.startPrefixMapping(prefix, uri)
yield
self.endPrefixMapping(prefix)
[docs] @contextmanager
def element(self, name, namespace=None, attrs=None):
self.startElementNS((namespace, name), attrs=attrs)
yield
self.endElementNS((namespace, name))
[docs] def elementcontent(self, name, content=None, namespace=None, attrs=None):
self.startElementNS((namespace, name), attrs=attrs)
self.characters(content)
self.endElementNS((namespace, name))
[docs] @contextmanager
def record(self):
self.startRecord()
yield
self.endRecord()
# ---------------------------------------------------------------------------
[docs]def copy_XML_into_writer(writer: ContentHandler, xml: Union[bytes, str]):
class XMLCopyHandler(ContentHandler):
def __init__(self, writer):
super().__init__()
self.writer = writer
def startPrefixMapping(self, prefix, uri):
self.writer.startPrefixMapping(prefix, uri)
def endPrefixMapping(self, prefix):
self.writer.endPrefixMapping(prefix)
def startElementNS(self, name, qname, attrs):
self.writer.startElementNS(name, qname, attrs)
def endElementNS(self, name, qname):
self.writer.endElementNS(name, qname)
def characters(self, content):
# if not content or content.isspace():
# return
self.writer.characters(content)
try:
tree = etree.fromstring(xml)
handler = XMLCopyHandler(writer)
saxify(tree, handler)
except Exception as ex:
raise SAXException("serializing xml failed") from ex
[docs]class XMLStreamWriterHelper(ContentHandler):
def __init__(self, xmlwriter: ContentHandler) -> None:
super().__init__()
self.xmlwriter = xmlwriter
# unwrap to avoid uneccessary call chains
if (
isinstance(self.xmlwriter, XMLStreamWriterHelper)
and self.xmlwriter.__class__ == XMLStreamWriterHelper
):
self.xmlwriter = self.xmlwriter.xmlwriter
# ----------------------------------------------------
# ContentHandler methods
[docs] def setDocumentLocator(self, locator):
self.xmlwriter.setDocumentLocator(locator)
[docs] def startPrefixMapping(self, prefix, uri):
self.xmlwriter.startPrefixMapping(prefix, uri)
[docs] def endPrefixMapping(self, prefix):
self.xmlwriter.endPrefixMapping(prefix)
[docs] def processingInstruction(self, target, data):
self.xmlwriter.processingInstruction(target, data)
[docs] def startDocument(self):
self.xmlwriter.startDocument()
[docs] def endDocument(self):
self.xmlwriter.endDocument()
[docs] def startElement(self, name, attrs=None):
if attrs is None:
attrs = dict()
self.xmlwriter.startElement(name, attrs)
[docs] def endElement(self, name):
self.xmlwriter.endElement(name)
[docs] def startElementNS(self, name, qname=None, attrs=None):
if attrs is None:
attrs = dict()
else:
# small helper to set None-namespace for attributes
# that did not supply them
if not all(isinstance(key, tuple) for key in attrs.keys()):
attrs_copy = dict()
for key, value in attrs.items():
if not isinstance(key, tuple):
key = (None, key)
attrs_copy[key] = value
attrs = attrs_copy
self.xmlwriter.startElementNS(name, qname, attrs)
[docs] def endElementNS(self, name, qname=None):
self.xmlwriter.endElementNS(name, qname)
[docs] def characters(self, content):
self.xmlwriter.characters(content)
[docs] def ignorableWhitespace(self, whitespace):
self.xmlwriter.ignorableWhitespace(whitespace)
[docs] def skippedEntity(self, name):
self.xmlwriter.skippedEntity(name)
# ----------------------------------------------------
[docs] def writeXML(self, xml: Union[bytes, str]):
copy_XML_into_writer(self, xml)
[docs] def writeXMLdocument(self, xmldoc: ET.Element):
try:
content = ET.tostring(xmldoc)
self.writeXML(content)
except SAXException:
raise
except Exception as ex:
raise SAXException("serializing xmldoc failed") from ex
# ----------------------------------------------------
# contextmanagers + SRUXMLStreamWriter methods
[docs] @contextmanager
def prefix(self, prefix, uri):
self.startPrefixMapping(prefix, uri)
yield
self.endPrefixMapping(prefix)
[docs] @contextmanager
def element(self, name, namespace=None, attrs=None):
self.startElementNS((namespace, name), attrs=attrs)
yield
self.endElementNS((namespace, name))
[docs] def elementcontent(self, name, content=None, namespace=None, attrs=None):
self.startElementNS((namespace, name), attrs=attrs)
self.characters(content)
self.endElementNS((namespace, name))
[docs] def startRecord(self):
if isinstance(self.xmlwriter, SRUXMLStreamWriter):
self.xmlwriter.startRecord()
[docs] def endRecord(self):
if isinstance(self.xmlwriter, SRUXMLStreamWriter):
self.xmlwriter.endRecord()
[docs] @contextmanager
def record(self):
self.startRecord()
yield
self.endRecord()
# ---------------------------------------------------------------------------