Source code for clarin.sru.server.server

# from: https://pythonbasics.org/webserver/
# from http.server import BaseHTTPRequestHandler, HTTPServer

import io
import logging
from abc import ABCMeta
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from xml.sax import SAXException

import cql
from werkzeug.wrappers import Request
from werkzeug.wrappers import Response

from ..constants import RESPONSE_CONTENT_TYPE
from ..constants import RESPONSE_ENCODING
from ..constants import SRU_DIAGNOSTIC_RECORD_SCHEMA
from ..constants import SRUDiagnostics
from ..constants import SRUOperation
from ..constants import SRUQueryType
from ..constants import SRURecordPacking
from ..constants import SRURecordXmlEscaping
from ..constants import SRUVersion
from ..diagnostic import SRUDiagnostic
from ..diagnostic import SRUDiagnosticList
from ..exception import SRUException
from ..queryparser import SRUQuery
from ..queryparser import SRUQueryParserRegistry
from ..xml.writer import SRUXMLStreamWriter
from .auth import SRUAuthenticationInfoProvider
from .config import DatabaseInfo
from .config import IndexInfo
from .config import LegacyNamespaceMode
from .config import LocalizedString
from .config import SchemaInfo
from .config import SRUServerConfig
from .request import SRURequest
from .request import SRURequestImpl
from .result import SRUExplainResult
from .result import SRUScanResultSet
from .result import SRUSearchResultSet

# ---------------------------------------------------------------------------


LOGGER = logging.getLogger(__name__)


SRU_NS = "http://www.loc.gov/zing/srw/"
SRU_PREFIX = "sru"
SRU_RESPONSE_NS = "http://docs.oasis-open.org/ns/search-ws/sruResponse"
SRU_RESPONSE_PREFIX = "sruResponse"
SRU_SCAN_NS = "http://docs.oasis-open.org/ns/search-ws/scan"
SRU_SCAN_PREFIX = "scan"
SRU_DIAGNOSIC_NS = "http://docs.oasis-open.org/ns/search-ws/diagnostic"
SRU_DIAGNOSTIC_PREFIX = "diag"
SRU_EXPLAIN_NS = "http://explain.z3950.org/dtd/2.0/"
SRU_EXPLAIN_PREFIX = "zr"
SRU_XCQL_NS = "http://docs.oasis-open.org/ns/search-ws/xcql"


[docs]@dataclass(frozen=True) class SRUNamespaces: """Interface for decoupling SRU namespaces from implementation to allow to support SRU 1.1/1.2 and SRU 2.0.""" response_NS: str """The namespace URI for encoding **explain** and **searchRetrieve** operation responses.""" response_prefix: str """The namespace prefix for encoding **explain** and **searchRetrieve**""" scan_NS: str """The namespace URI for encoding **scan** operation responses.""" scan_prefix: str """The namespace prefix for encoding **scan** operation responses.""" diagnostic_NS: str """The namespace URI for encoding SRU diagnostics.""" XCQL_NS: str """The namespace URI for encoding XCQL fragments""" diagnostic_prefix: str = SRU_DIAGNOSTIC_PREFIX """The namespace prefix for encoding SRU diagnostics.""" explain_NS: str = SRU_EXPLAIN_NS """The namespace URI for encoding explain record data fragments.""" explain_prefix: str = SRU_EXPLAIN_PREFIX """The namespace prefix for encoding explain record data fragments.""" # ----------------------------------------------------
[docs] @staticmethod def for_legacy_LOC() -> "SRUNamespaces": return SRUNamespaces( response_NS=SRU_NS, response_prefix=SRU_PREFIX, scan_NS=SRU_NS, scan_prefix=SRU_PREFIX, diagnostic_NS="http://www.loc.gov/zing/srw/diagnostic/", XCQL_NS="http://www.loc.gov/zing/cql/xcql/", )
[docs] @staticmethod def for_1_2_OASIS() -> "SRUNamespaces": return SRUNamespaces( response_NS=SRU_RESPONSE_NS, response_prefix=SRU_RESPONSE_PREFIX, scan_NS=SRU_SCAN_NS, scan_prefix=SRU_SCAN_PREFIX, diagnostic_NS=SRU_DIAGNOSIC_NS, XCQL_NS=SRU_XCQL_NS, )
[docs] @staticmethod def for_2_0() -> "SRUNamespaces": return SRUNamespaces( response_NS=SRU_RESPONSE_NS, response_prefix=SRU_RESPONSE_PREFIX, scan_NS=SRU_SCAN_NS, scan_prefix=SRU_SCAN_PREFIX, diagnostic_NS=SRU_DIAGNOSIC_NS, XCQL_NS=SRU_XCQL_NS, )
[docs] @staticmethod def get_namespaces( version: SRUVersion, legacy_ns_mode: LegacyNamespaceMode ) -> "SRUNamespaces": if version is None: raise TypeError("version is None") if version in (SRUVersion.VERSION_1_1, SRUVersion.VERSION_1_2): if legacy_ns_mode == LegacyNamespaceMode.LOC: return SRUNamespaces.for_legacy_LOC() if legacy_ns_mode == LegacyNamespaceMode.OASIS: return SRUNamespaces.for_1_2_OASIS() raise ValueError(f"invalid legacy mode: {legacy_ns_mode}") if version == SRUVersion.VERSION_2_0: return SRUNamespaces.for_2_0() raise ValueError(f"invalid version: {version}")
# --------------------------------------------------------------------------- # TODO: update docstring when SRUServerApp (WSGI) is ready # SRUSearchEngine + SRUSearchEngineBase
[docs]class SRUSearchEngine(metaclass=ABCMeta): """Interface for connecting the SRU protocol implementation to an actual search engine. Base class required for an `SRUSearchEngine` implementation to be used with the `SRUServerApp`. Implementing the `explain` and `scan` is optional, but implementing `search` is mandatory. The implementation of these methods **must** be thread-safe. """
[docs] @abstractmethod def explain( self, config: SRUServerConfig, request: SRURequest, diagnostics: SRUDiagnosticList, ) -> Optional[SRUExplainResult]: """Handle an **explain** operation. Implementing this method is optional, but is required, if the **writeExtraResponseData** block of the SRU response needs to be filled. The arguments for this operation are provides by the `SRURequest` object. The implementation of this method **must** be thread-safe. Args: config: the `SRUEndpointConfig` object that contains the endpoint configuration request: the `SRURequest` object that contains the request made to the endpoint diagnostics: the `SRUDiagnosticList` object for storing non-fatal diagnostics Returns: SRUExplainResult: a `SRUExplainResult` object or ``None`` if the search engine does not want to provide `write_extra_response_data` Raises: `SRUException`: if an fatal error occurred """
[docs] @abstractmethod def search( self, config: SRUServerConfig, request: SRURequest, diagnostics: SRUDiagnosticList, ) -> SRUSearchResultSet: """Handle a **searchRetrieve** operation. Implementing this method is mandatory. The arguments for this operation are provides by the `SRURequest` object. The implementation of this method **must** be thread-safe. Args: config: the `SRUEndpointConfig` object that contains the endpoint configuration request: the `SRURequest` object that contains the request made to the endpoint diagnostics: the `SRUDiagnosticList` object for storing non-fatal diagnostics Returns: SRUSearchResultSet: a `SRUSearchResultSet` object Raises: `SRUException`: if an fatal error occurred """
[docs] @abstractmethod def scan( self, config: SRUServerConfig, request: SRURequest, diagnostics: SRUDiagnosticList, ) -> Optional[SRUScanResultSet]: """Handle a **scan** operation. Implementing this method is optional. If you don't need to handle the **scan** operation, just return ``None`` and the SRU server will return the appropiate diagnostic to the client. The arguments for this operation are provides by the `SRURequest` object. The implementation of this method **must** be thread-safe. Args: config: the `SRUEndpointConfig` object that contains the endpoint configuration request: the `SRURequest` object that contains the request made to the endpoint diagnostics: the `SRUDiagnosticList` object for storing non-fatal diagnostics Returns: SRUScanResultSet: a `SRUScanResultSet` object or ``None`` if this operation is not supported by this search engine Raises: `SRUException`: if an fatal error occurred """
# ----------------------------------------------------
[docs] def init( self, config: SRUServerConfig, query_parser_registry_builder: SRUQueryParserRegistry.Builder, params: Dict[str, str], ) -> None: """Initialize the search engine. Args: config: the `SRUServerConfig` object for this search engine query_parser_registry_builder: the `SRUQueryParserRegistry.Builder` object to be used for this search engine. Use to register additional query parsers with the `SRUServer` params: additional parameters from the server Raises: SRUConfigException: an error occurred during initialization of the search engine """
[docs] def destroy(self) -> None: """Destroy the search engine. Use this method for any cleanup the search engine needs to perform upon termination. """
# ---------------------------------------------------------------------------
[docs]class SRUServer: """SRU/CQL protocol implementation for the server-side (SRU/S). This class implements SRU/CQL version 1.1 and and 1.2. See also: SRU/CQL protocol 1.2: http://www.loc.gov/standards/sru/ """ def __init__( self, config: SRUServerConfig, query_parsers: SRUQueryParserRegistry, search_engine: SRUSearchEngine, authentication_info_provider: Optional[SRUAuthenticationInfoProvider] = None, ): if config is None: raise TypeError("config is None") if query_parsers is None: raise TypeError("query_parsers is None") if search_engine is None: raise TypeError("search_engine is None") self.config = config self.query_parsers = query_parsers self.search_engine = search_engine self.authentication_info_provider = authentication_info_provider # ----------------------------------------------------
[docs] def handle_request(self, request: Request, response: Response): """Handle a SRU request.""" req = SRURequestImpl( self.config, self.query_parsers, request, authentication_info_provider=self.authentication_info_provider, ) try: # set response properties response.content_type = RESPONSE_CONTENT_TYPE response.content_encoding = RESPONSE_ENCODING response.status = 200 # type: ignore # TODO: buffer size? stream # self.config.response_buffer_size try: if req.check_parameters(): operation = req.get_operation() if operation == SRUOperation.EXPLAIN: self.explain(req, response) elif operation == SRUOperation.SCAN: self.scan(req, response) elif operation == SRUOperation.SEARCH_RETRIEVE: self.search(req, response) else: # (some) parameters are malformed, send error buf = io.StringIO() out = self._create_XML_builder( buf, SRURecordXmlEscaping.XML, False, req.get_indent_response() ) ns = SRUNamespaces.get_namespaces( req.get_version(), self.config.legacy_namespace_mode ) self._write_fatal_error(out, ns, req, req.diagnostics) response.set_data(buf.getvalue()) except SAXException as ex: LOGGER.error("An error occurred while serializing response", ex) raise SRUException( SRUDiagnostics.GENERAL_SYSTEM_ERROR, message="An error occurred while serializing response.", ) from ex except Exception: # Well, can't really do anything useful here ... LOGGER.error("An unexpected exception occurred", exc_info=True) except SRUException as ex: # check if response.isCommitted does not exist for werkzeug if LOGGER.isEnabledFor(logging.INFO): message = ex.get_diagnostic().message if message: LOGGER.info( "Sending fatal diagnostic '%s' with message '%s'", ex.get_diagnostic().uri, message, ) else: LOGGER.info( "Sending fatal diagnostic '%s'", ex.get_diagnostic().uri ) LOGGER.debug("Fatal diagnostic was caused by this exception: %s", ex) # reset response buffer try: diagnostics = req.diagnostics if not diagnostics: diagnostics = list() diagnostics.append(ex.get_diagnostic()) buf = io.StringIO() out = self._create_XML_builder( buf, SRURecordXmlEscaping.XML, False, req.get_indent_response() ) ns = SRUNamespaces.get_namespaces( req.get_version(), self.config.legacy_namespace_mode ) self._write_fatal_error(out, ns, req, diagnostics) response.set_data(buf.getvalue()) except Exception as ex: LOGGER.error("An exception occurred while in error state", ex)
# TODO: temporary skip output buffering TEMP_OUTPUT_BUFFERING = False
[docs] def explain(self, request: SRURequestImpl, response: Response): LOGGER.info("explain") # commence explain ... result = self.search_engine.explain(self.config, request, request) try: ns: SRUNamespaces = SRUNamespaces.get_namespaces( request.get_version(), self.config.legacy_namespace_mode ) # send results buf = io.StringIO() out = self._create_XML_builder( buf, request.get_record_xml_escaping(), SRUServer.TEMP_OUTPUT_BUFFERING, request.get_indent_response(), ) self._begin_response_with_request(out, ns, request) # write the explain record self._write_explain_record(out, ns, request) if self.config.echo_requests: self._write_echoed_explain_request(out, ns, request) # diagnostics self._write_diagnostics(out, ns, ns.response_NS, request.diagnostics) # extraResponseData if result: if result.has_extra_response_data: with out.element("extraResponseData", ns.response_NS): result.write_extra_response_data(out) self._end_response_with_request(out, ns, request) response.set_data(buf.getvalue()) finally: if result: result.close()
[docs] def scan(self, request: SRURequestImpl, response: Response): LOGGER.info("scan: scanClause = '%s'", request.get_scan_clause_raw()) # commence scan ... result = self.search_engine.scan(self.config, request, request) if not result: raise SRUException( SRUDiagnostics.UNSUPPORTED_OPERATION, message="The 'scan' operation is not supported by this endpoint.", ) try: ns: SRUNamespaces = SRUNamespaces.get_namespaces( request.get_version(), self.config.legacy_namespace_mode ) # FIXME: re-check, if while scan response needs to be put # in scan namespace for SRU 2.0! # send results buf = io.StringIO() out = self._create_XML_builder( buf, request.get_record_xml_escaping(), SRUServer.TEMP_OUTPUT_BUFFERING, request.get_indent_response(), ) self._begin_response_with_request(out, ns, request) try: # a scan result without a list of terms is a valid response; # make sure, to produce the correct output and omit in that # case the <terms> ... wrote_terms = False while result.next_term(): if not wrote_terms: if ns.response_NS != ns.scan_NS: out.startPrefixMapping(ns.scan_prefix, ns.scan_NS) out.startElementNS((ns.scan_NS, "terms")) wrote_terms = True with out.element("term", ns.scan_NS): with out.element("value", ns.scan_NS): out.characters(result.get_value()) if result.get_number_of_records() > -1: with out.element("numberOfRecords", ns.scan_NS): out.characters(str(result.get_number_of_records())) if result.get_display_term(): with out.element("displayTerm", ns.scan_NS): out.characters(str(result.get_display_term())) if result.get_WhereInList(): with out.element("whereInList", ns.scan_NS): # NOTE: here it is not None out.characters(result.get_WhereInList().lower()) # type: ignore if result.has_extra_term_data(): with out.element("extraTermData", ns.scan_NS): result.write_extra_term_data(out) if wrote_terms: out.endElementNS((ns.scan_NS, "terms")) except StopIteration as ex: raise SRUException( SRUDiagnostics.GENERAL_SYSTEM_ERROR, message="An internal error occurred while serializing scan results.", ) from ex # echoedScanRequest if self.config.echo_requests: self._write_echoed_scan_request( out, ns, request, request.get_scan_clause() ) # diagnostics self._write_diagnostics(out, ns, ns.scan_NS, request.diagnostics) # extraResponseData if result.has_extra_response_data: with out.element("extraResponseData", ns.response_NS): result.write_extra_response_data(out) self._end_response_with_request(out, ns, request) response.set_data(buf.getvalue()) finally: if result: result.close()
[docs] def search(self, request: SRURequestImpl, response: Response): LOGGER.info( "searchRetrieve: query = '%s', startRecord = %s, " "maximumRecords = %s, recordSchema = %s, resultSetTTL = %s", request.get_query_raw(), request.get_start_record(), request.get_maximum_records(), request.get_record_schema_identifier(), request.get_resultSet_TTL(), ) # commence search ... result = self.search_engine.search(self.config, request, request) if not result: raise SRUException( SRUDiagnostics.GENERAL_SYSTEM_ERROR, message="SRUSearchEngine implementation returned invalid result (null).", ) # check, of startRecord position is greater than total record set if ( result.get_total_record_count() >= 0 and request.get_start_record() > 1 and request.get_start_record() > result.get_total_record_count() ): raise SRUException(SRUDiagnostics.FIRST_RECORD_POSITION_OUT_OF_RANGE) try: ns: SRUNamespaces = SRUNamespaces.get_namespaces( request.get_version(), self.config.legacy_namespace_mode ) # send results buf = io.StringIO() out = self._create_XML_builder( buf, request.get_record_xml_escaping(), SRUServer.TEMP_OUTPUT_BUFFERING, request.get_indent_response(), ) self._begin_response_with_request(out, ns, request) # numberOfRecords with out.element("numberOfRecords", ns.response_NS): out.characters(str(result.get_total_record_count())) # resultSetId if result.get_resultSet_id(): with out.element("resultSetId", ns.response_NS): out.characters(result.get_resultSet_id()) # resultSetIdleTime (SRU 1.1 and SRU 1.2) if ( not request.is_version(SRUVersion.VERSION_2_0) and result.get_resultSet_TTL() >= 0 ): with out.element("resultSetIdleTime", ns.response_NS): out.characters(str(result.get_resultSet_TTL())) position = ( request.get_start_record() if request.get_start_record() > 0 else 1 ) if result.get_record_count() > 0: max_position_offset = ( (position + request.get_maximum_records() - 1) if request.get_maximum_records() != -1 else -1 ) try: out.startElementNS((ns.response_NS, "records")) while result.next_record(): # Sanity check: do not return more then the maximum # requested records. If the search engine # implementation does not honor limit truncate the # result set. if max_position_offset != -1 and position > max_position_offset: LOGGER.error( "SRUSearchEngine implementation did not honor limit " "for the amount of requsted records. Result set truncated!" ) break out.startElementNS((ns.response_NS, "record")) # We need to output either the record or a surrogate # diagnostic. In case of the latter, we need to output # the appropriate record schema ... diagnostic = result.get_surrogate_diagnostic() with out.element("recordSchema", ns.response_NS): if not diagnostic: out.characters(result.get_record_schema_identifier()) else: out.characters(SRU_DIAGNOSTIC_RECORD_SCHEMA) # recordPacking (SRU 2.0). Only serialize, if it was in # request. # XXX: not sure, how to support 'unpacked' record # packing anyways :/ if ( request.is_version(SRUVersion.VERSION_2_0) and request.get_record_packing_raw() ): self._write_record_packing( out, ns, request.get_record_packing() ) # recordXMLEscaping (SRU 2.0) or # recordPacking (SRU 1.1 and 1.2) self._write_record_xml_escaping(out, ns, request) # Output either record data or surrogate diagnostic ... with out.element("recordData", ns.response_NS), out.record(): if diagnostic is None: result.write_record(out) else: # write a surrogate diagnostic self._write_diagnostic(out, ns, diagnostic, True) # recordIdentifier is version 1.2+ only if request.is_version_between( SRUVersion.VERSION_1_2, SRUVersion.VERSION_2_0 ): identifier = result.get_record_identifier() if identifier: with out.element("recordIdentifier", ns.response_NS): out.characters(identifier) with out.element("recordPosition", ns.response_NS): out.characters(str(position)) if result.has_extra_record_data: with out.element("extraRecordData", ns.response_NS): result.write_extra_record_data(out) out.endElementNS((ns.response_NS, "record")) position += 1 out.endElementNS((ns.response_NS, "records")) except StopIteration as ex: raise SRUException( SRUDiagnostics.GENERAL_SYSTEM_ERROR, message="An internal error occurred while serializing search result set.", ) from ex # nextRecordPosition if position <= result.get_total_record_count(): with out.element("nextRecordPosition", ns.response_NS): out.characters(str(position)) # echoedSearchRetrieveRequest if self.config.echo_requests: # TODO: need to check, query should not be None self._write_echoed_searchRetrieve_request( out, ns, request, request.get_query() # type: ignore ) # diagnostics self._write_diagnostics(out, ns, ns.response_NS, request.diagnostics) # extraResponseData if result.has_extra_response_data: with out.element("extraResponseData", ns.response_NS): result.write_extra_response_data(out) # SRU 2.0 stuff ... if request.is_version(SRUVersion.VERSION_2_0): # resultSetTTL if result.get_resultSet_TTL() >= 0: with out.element("resultSetTTL", ns.response_NS): out.characters(str(result.get_resultSet_TTL())) # resultCountPrecision precision = result.get_result_count_precision() if precision: with out.element("resultCountPrecision", ns.response_NS): prefix = "info:srw/vocabulary/resultCountPrecision/1/" out.characters(f"{prefix}{precision.lower()}") # facetedResults # NOTE: NOT YET SUPPORTED # searchResultAnalysis # NOTE: NOT YET SUPPORTED self._end_response_with_request(out, ns, request) response.set_data(buf.getvalue()) finally: if result: result.close()
# ---------------------------------------------------- def _begin_response( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, operation: SRUOperation, version: SRUVersion, stylesheet: Optional[str], ): out.startDocument() if stylesheet: out.processingInstruction( "xml-stylesheet", f'type="text/xsl" href="{stylesheet}"' ) if operation == SRUOperation.EXPLAIN: out.startPrefixMapping(ns.response_prefix, ns.response_NS) out.startElementNS((ns.response_NS, "explainResponse")) self._write_version(out, ns.response_NS, version) elif operation == SRUOperation.SCAN: out.startPrefixMapping(ns.scan_prefix, ns.scan_NS) out.startElementNS((ns.scan_NS, "scanResponse")) self._write_version(out, ns.scan_NS, version) elif operation == SRUOperation.SEARCH_RETRIEVE: out.startPrefixMapping(ns.response_prefix, ns.response_NS) out.startElementNS((ns.response_NS, "searchRetrieveResponse")) self._write_version(out, ns.response_NS, version) def _begin_response_with_request( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, request: SRURequest ): self._begin_response( out, ns, request.get_operation(), request.get_version(), request.get_stylesheet(), ) def _end_response( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, operation: SRUOperation ): if operation == SRUOperation.EXPLAIN: out.endElementNS((ns.response_NS, "explainResponse")) elif operation == SRUOperation.SCAN: out.endElementNS((ns.scan_NS, "scanResponse")) elif operation == SRUOperation.SEARCH_RETRIEVE: out.endElementNS((ns.response_NS, "searchRetrieveResponse")) out.endDocument() try: out.output_stream.flush() out.output_stream_raw.flush() # if we use buffers internally, then don't close them # otherwise we can't access the content anymore ... if not isinstance(out.output_stream, io.StringIO): out.output_stream.close() if not isinstance(out.output_stream_raw, io.StringIO): out.output_stream_raw.close() except Exception: pass def _end_response_with_request( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, request: SRURequest ): self._end_response(out, ns, request.get_operation()) # ---------------------------------------------------- def _write_fatal_error( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, request: SRURequestImpl, diagnostics: Optional[List[SRUDiagnostic]], ): # if operation is unknown, default to 'explain' operation = request.get_operation() if operation is None: operation = SRUOperation.EXPLAIN version = request.get_version() if version is None: version = self.config.default_version # write a response which conforms to the schema self._begin_response(out, ns, operation, version, None) if operation == SRUOperation.EXPLAIN: # 'explain' requires a complete explain record ... self._write_explain_record(out, ns, request) self._write_diagnostics(out, ns, ns.response_NS, diagnostics) elif operation == SRUOperation.SCAN: # 'scan' fortunately does not need any elements ... self._write_diagnostics(out, ns, ns.scan_NS, diagnostics) elif operation == SRUOperation.SEARCH_RETRIEVE: # 'searchRetrieve' needs numberOfRecords .. with out.element("numberOfRecords", ns.response_NS): out.characters("0") self._write_diagnostics(out, ns, ns.response_NS, diagnostics) self._end_response(out, ns, operation) def _write_diagnostics( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, envelope_NS: str, diagnostics: Optional[List[SRUDiagnostic]], ): if not diagnostics: return out.startPrefixMapping(ns.diagnostic_prefix, ns.diagnostic_NS) with out.element("diagnostics", envelope_NS): for diagnostic in diagnostics: self._write_diagnostic(out, ns, diagnostic, False) def _write_diagnostic( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, diagnostic: SRUDiagnostic, write_NS_decl: bool, ): if write_NS_decl: out.startPrefixMapping(ns.diagnostic_prefix, ns.diagnostic_NS) with out.element("diagnostic", ns.diagnostic_NS): with out.element("uri", ns.diagnostic_NS): out.characters(diagnostic.uri) if diagnostic.details: with out.element("details", ns.diagnostic_NS): out.characters(diagnostic.details) if diagnostic.message: with out.element("message", ns.diagnostic_NS): out.characters(diagnostic.message) # ---------------------------------------------------- def _write_explain_record( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, request: SRURequestImpl ): def _write_DatabaseInfo(info: Optional[DatabaseInfo]): if not info: return with out.element("databaseInfo", ns.explain_NS): self._write_LocalizedString(out, ns, "title", info.title) self._write_LocalizedString(out, ns, "description", info.description) self._write_LocalizedString(out, ns, "author", info.author) self._write_LocalizedString(out, ns, "extent", info.extent) self._write_LocalizedString(out, ns, "history", info.history) self._write_LocalizedString(out, ns, "langUsage", info.langUsage) self._write_LocalizedString(out, ns, "restrictions", info.restrictions) self._write_LocalizedString(out, ns, "subjects", info.subjects) self._write_LocalizedString(out, ns, "links", info.links) self._write_LocalizedString( out, ns, "implementation", info.implementation ) def _write_IndexInfo(info: Optional[IndexInfo]): if not info: return with out.element("indexInfo", ns.explain_NS): _write_IndexInfo_Sets(info.sets) _write_IndexInfo_Indexes(info.indexes) def _write_IndexInfo_Sets(sets: Optional[List[IndexInfo.Set]]): if not sets: return for set in sets: with out.element( "set", ns.explain_NS, attrs={"identifier": set.identifier, "name": set.name}, ): self._write_LocalizedString(out, ns, "title", set.title) def _write_IndexInfo_Indexes(indexes: Optional[List[IndexInfo.Index]]): if not indexes: return for index in indexes: with out.element( "index", ns.explain_NS, attrs={ "search": "true" if index.can_search else "false", "scan": "true" if index.can_scan else "false", "sort": "true" if index.can_sort else "false", }, ): self._write_LocalizedString(out, ns, "title", index.title) _write_IndexInfo_Index_Maps(index.maps) def _write_IndexInfo_Index_Maps(maps: Optional[List[IndexInfo.Index.Map]]): if not maps: return for map in maps: attrs: Dict[str, str] = dict() if map.primary: attrs.update(primary="true") with out.element("map", ns.explain_NS, attrs=attrs): with out.element( "name", ns.explain_NS, attrs={"set": map.set}, ): out.characters(map.name) def _write_SchemaInfos(infos: Optional[List[SchemaInfo]]): if not infos: return with out.element("schemaInfo", ns.explain_NS): for schema in infos: attrs = { "identifier": schema.identifier, "name": schema.name, } # default is "false", so only add attribute if set to true if schema.sort: attrs.update({"sort": "true"}) # default is "true", so only add attribute if set to false if not schema.retrieve: attrs.update({"retrieve": "false"}) with out.element("schema", ns.explain_NS, attrs=attrs): self._write_LocalizedString(out, ns, "title", schema.title) # ---------------------------- with out.element("record", ns.response_NS): with out.element("recordSchema", ns.response_NS): out.characters(ns.explain_NS) # recordPacking (SRU 2.0) # Only serialize, if it was in request. # XXX: not sure, if this makes sense for explain if ( request.is_version(SRUVersion.VERSION_2_0) and request.get_record_packing_raw() ): self._write_record_packing(out, ns, request.get_record_packing()) # recordXMLEscaping (SRU 2.0) or recordPacking (SRU 1.1 and 1.2) self._write_record_xml_escaping(out, ns, request) with out.element("recordData", ns.response_NS), out.record(): # explain ... out.startPrefixMapping(ns.explain_prefix, ns.explain_NS) with out.element("explain", ns.explain_NS): # explain/serverInfo with out.element( "serverInfo", ns.explain_NS, attrs={ "protocol": "SRU", "version": self.config.default_version.version_string, "transport": self.config.transport, }, ): with out.element("host", ns.explain_NS): out.characters(self.config.host) with out.element("port", ns.explain_NS): out.characters(str(self.config.port)) with out.element("database", ns.explain_NS): out.characters(self.config.database) # explain/databaseInfo _write_DatabaseInfo(self.config.database_info) # explain/indexInfo _write_IndexInfo(self.config.index_info) # explain/schemaInfo _write_SchemaInfos(self.config.schema_info) # explain/configInfo with out.element("configInfo", ns.explain_NS): # numberOfRecords (default) with out.element( "default", ns.explain_NS, attrs={"type": "numberOfRecords"} ): out.characters(str(self.config.number_of_records)) # maximumRecords (setting) with out.element( "setting", ns.explain_NS, attrs={"type": "maximumRecords"} ): out.characters(str(self.config.maximum_records)) def _write_echoed_explain_request( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, request: SRURequestImpl ): # echoedSearchRetrieveRequest ? with out.element("echoedExplainRequest", ns.response_NS): # echoedExplainRequest/version if request.get_version_raw() is not None: # NOTE: version is not None self._write_version(out, ns.response_NS, request.get_version_raw()) # type: ignore # echoedExplainRequest/recordXmlEscpaing / recordPacking if request.get_record_packing_raw(): self._write_record_xml_escaping(out, ns, request) # echoedExplainRequest/stylesheet if request.stylesheet: with out.element("stylesheet", ns.response_NS): out.characters(request.get_stylesheet()) def _write_echoed_scan_request( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, request: SRURequestImpl, query: Optional[cql.CQLQuery], ): # echoedScanRequest with out.element("echoedScanRequest", ns.response_NS): # echoedScanRequest/version if request.get_version_raw() is not None: # NOTE: version is not None self._write_version(out, ns.response_NS, request.get_version_raw()) # type: ignore # echoedScanRequest/scanClause with out.element("scanClause", ns.response_NS): out.characters(request.get_scan_clause_raw()) # echoedScanRequest/xScanClause out.startPrefixMapping(None, ns.XCQL_NS) with out.element("xScanClause", ns.response_NS): # TODO: can this be None? it should not, need to test out.writeXCQL(query, False) # type: ignore # echoedScanRequest/responsePosition if request.get_response_position() != -1: with out.element("responsePosition", ns.response_NS): out.characters(str(request.get_response_position())) # echoedScanRequest/maximumTerms if request.get_maximum_terms() != -1: with out.element("maximumTerms", ns.response_NS): out.characters(str(request.get_maximum_terms())) # echoedScanRequest/stylesheet if request.get_stylesheet(): with out.element("stylesheet", ns.response_NS): out.characters(request.get_stylesheet()) def _write_echoed_searchRetrieve_request( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, request: SRURequestImpl, query: SRUQuery[Any], ): # echoedSearchRetrieveRequest with out.element("echoedSearchRetrieveRequest", ns.response_NS): # echoedSearchRetrieveRequest/version if request.get_version_raw() is not None: # NOTE: version is not None self._write_version(out, ns.response_NS, request.get_version_raw()) # type: ignore # XXX: unclear, if <query> should only be echoed if queryType is CQL!? if SRUQueryType.CQL == query.query_type: # echoedSearchRetrieveRequest/query with out.element("query", ns.response_NS): out.characters(query.raw_query) # echoedSearchRetrieveRequest/xQuery out.startPrefixMapping(None, ns.XCQL_NS) with out.element("xQuery", ns.response_NS): out.writeXCQL(query.parsed_query, True) # echoedSearchRetrieveRequest/startRecord if request.get_start_record() > 0: with out.element("startRecord", ns.response_NS): out.characters(str(request.get_start_record())) # echoedSearchRetrieveRequest/maximumRecords if request.get_maximum_records_raw() > 0: with out.element("maximumRecords", ns.response_NS): out.characters(str(request.get_maximum_records_raw())) # (SRU 2.0) echoedSearchRetrieveRequest/recordPacking if ( request.is_version(SRUVersion.VERSION_2_0) and request.get_record_packing_raw() ): with out.element("recordPacking", ns.response_NS): out.characters(request.get_record_packing_raw()) # echoedSearchRetrieveRequest/recordXmlEscaping / recordPacking if request.get_record_xml_escaping_raw(): tag = ( "recordXMLEscaping" if request.is_version(SRUVersion.VERSION_2_0) else "recordPacking" ) with out.element(tag, ns.response_NS): out.characters(request.get_record_xml_escaping_raw()) # echoedSearchRetrieveRequest/recordSchema if request.get_record_schema_identifier_raw(): with out.element("recordSchema", ns.response_NS): out.characters(request.get_record_schema_identifier_raw()) # echoedSearchRetrieveRequest/recordXPath (1.1) if ( request.is_version(SRUVersion.VERSION_1_1) and request.get_record_xpath() ): with out.element("recordXPath", ns.response_NS): out.characters(request.get_record_xpath()) # echoedSearchRetrieveRequest/resultSetTTL if request.get_resultSet_TTL() > 0: with out.element("resultSetTTL", ns.response_NS): out.characters(str(request.get_resultSet_TTL())) # echoedSearchRetrieveRequest/sortKeys if request.is_version(SRUVersion.VERSION_1_1) and request.get_sortKeys(): with out.element("sortKeys", ns.response_NS): out.characters(request.get_sortKeys()) # echoedSearchRetrieveRequest/xsortKeys # echoedSearchRetrieveRequest/stylesheet if request.get_stylesheet(): with out.element("stylesheet", ns.response_NS): out.characters(request.get_stylesheet()) # echoedSearchRetrieveRequest/renderedBy if request.is_version(SRUVersion.VERSION_2_0) and request.get_renderBy(): with out.element("renderedBy", ns.response_NS): out.characters(request.get_renderBy().lower()) # type: ignore # echoedSearchRetrieveRequest/extraRequestParameter # FIXME: NOT YET IMPLEMENTED # echoedSearchRetrieveRequest/httpAccept # NOTE: broken in java version? uses renderedBy if ( request.is_version(SRUVersion.VERSION_2_0) and request.get_http_accept_raw() ): with out.element("httpAccept", ns.response_NS): out.characters(request.get_http_accept_raw()) # echoedSearchRetrieveRequest/responseType if ( request.is_version(SRUVersion.VERSION_2_0) and request.get_response_type() ): with out.element("responseType", ns.response_NS): out.characters(request.get_response_type()) # ---------------------------------------------------- def _write_version( self, out: SRUXMLStreamWriter, envelope_NS: str, version: SRUVersion ): with out.element("version", envelope_NS): out.characters(version.version_string) def _write_record_xml_escaping( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, request: SRURequest ): tag = ( "recordXMLEscaping" if request.is_version(SRUVersion.VERSION_2_0) else "recordPacking" ) with out.element(tag, ns.response_NS): out.characters(request.get_record_xml_escaping().lower()) def _write_record_packing( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, record_packing: SRURecordPacking, ): with out.element("recordPacking", ns.response_NS): out.characters(record_packing.lower()) def _write_LocalizedString( self, out: SRUXMLStreamWriter, ns: SRUNamespaces, name: str, items: Optional[List[LocalizedString]], ): if not items: return for item in items: attrs: Dict[str, str] = dict() if item.lang: attrs.update(lang=item.lang) if item.primary: attrs.update(primary="true") with out.element(name, ns.explain_NS, attrs=attrs): out.characters(item.value) # ---------------------------------------------------- def _create_XML_builder( self, output_stream: io.TextIOBase, record_packing: SRURecordXmlEscaping, skip_flush: bool, indent: int, ): try: if skip_flush: """ Add a BufferedWriter(?) to delay flush() as long as possible. Doing so, enabled us to send an appropriate SRU diagnostic in case an error occurs during the serialization of the response. Of course, if an error occurs when the server response buffer already had been flushed, because it was to large, we cannot fail gracefully and we will produce ill-formed XML output. """ # FIXME: do we have some...? # output_stream = io.BufferedWriter( # output_stream, buffer_size=self.config.response_buffer_size # ) # NOTE: might only make sense in async(io) use-case # otherwise always buffered in memory LOGGER.debug( "Function for 'skip_flush' in '_create_XML_builder' not used." ) return SRUXMLStreamWriter( output_stream, record_escaping=record_packing, indent=indent ) except Exception as ex: raise SRUException( SRUDiagnostics.GENERAL_SYSTEM_ERROR, message="Error creating output stream.", ) from ex
# ---------------------------------------------------------------------------