Source code for clarin.sru.queryparser
from abc import ABC
from abc import abstractmethod
from typing import Any
from typing import Dict
from typing import Generic
from typing import List
from typing import Optional
from typing import Type
from typing import TypeVar
import cql
from .constants import SRUDiagnostics
from .constants import SRUParam
from .constants import SRUQueryType
from .constants import SRUVersion
from .diagnostic import SRUDiagnosticList
from .exception import SRUConfigException
_T = TypeVar("_T")
# ---------------------------------------------------------------------------
[docs]class SRUQuery(ABC, Generic[_T]):
"""Holder class for a parsed query to be returned from a
`SRUQueryParser`."""
def __init__(self, raw_query: str, parsed_query: _T):
super().__init__()
if raw_query is None:
raise TypeError("raw_query is None")
if parsed_query is None:
raise TypeError("parsed_query is None")
self._raw_query = raw_query
self._parsed_query = parsed_query
@property
@abstractmethod
def query_type(self) -> str:
"""Get the short name for supported query, e.g. "cql"."""
@property
def raw_query(self) -> str:
"""Get the original query as a string."""
return self._raw_query
@property
def parsed_query(self) -> _T:
"""Get the parsed query as an abstract syntax tree."""
return self._parsed_query
# ---------------------------------------------------------------------------
[docs]class SRUQueryParser(ABC, Generic[_T]):
"""Interface for implementing pluggable query parsers.
Parameterized by 'abstract syntax tree (object) for parsed queries.'
"""
@property
@abstractmethod
def query_type(self) -> str:
"""Get the short name for supported query, e.g. "cql"."""
[docs] @abstractmethod
def supports_version(self, version: Optional[SRUVersion]) -> bool:
"""Check if query is supported by a specific version of SRU/CQL."""
@property
def query_type_definition(self) -> Optional[str]:
"""The URI for the for the query type's definition."""
return None
@property
@abstractmethod
def query_parameter_names(self) -> List[str]:
"""Get the list of query parameters."""
[docs] @abstractmethod
def parse_query(
self,
version: SRUVersion,
parameters: Dict[str, str],
diagnostics: SRUDiagnosticList,
) -> Optional[SRUQuery[_T]]:
"""Parse a query into an abstract syntax tree.
Args:
version: the SRU version the request was made
parameters: the request parameters containing the query
diagnostics: a `SRUDiagnosticList` for storing fatal and
non-fatal diagnostics
Returns:
the parsed query or ``None`` if the query could not be parsed
"""
# ---------------------------------------------------------------------------
[docs]class SRUQueryParserRegistry:
"""A registry to keep track of registered `SRUQueryParser` to be
used by the `SRUServer`.
See also:
`SRUQueryParser`
"""
def __init__(self, parsers: List[SRUQueryParser[Any]]):
if parsers is None:
raise TypeError("parsers is None")
if not parsers:
raise ValueError("parsers is empty")
self.parsers = list(parsers)
@property
def query_parsers(self) -> List[SRUQueryParser[Any]]:
"""Get a list of all registered query parsers.
Returns:
List[SRUQueryParser[Any]]: a list of registered query
parsers
"""
return self.parsers
[docs] def find_query_parser(self, query_type: str) -> Optional[SRUQueryParser[Any]]:
"""Find a query parser by query type.
Args:
query_type: the query type to search for
Returns:
SRUQueryParser[Any]: the matching `SRUQueryParser`
instance or ``None`` if no matching parser was found.
"""
if query_type is None:
raise TypeError("query_type is None")
return SRUQueryParserRegistry._find_parser(self.parsers, query_type)
@staticmethod
def _find_parser(
parsers: List[SRUQueryParser[Any]], query_type: str
) -> Optional[SRUQueryParser[Any]]:
for parser in parsers:
if query_type == parser.query_type:
return parser
return None
# ----------------------------------------------------
[docs] class Builder:
"""Builder for creating `SRUQueryParserRegistry` instances."""
def __init__(self, register_defaults: bool = True):
"""[Constructor]
Args:
register_defaults: if ``True``,
register SRU/CQL standard query parsers
(queryType **cql** and **searchTerms**),
otherwise do nothing. Defaults to True.
"""
self.parsers: List[SRUQueryParser[Any]] = list()
if register_defaults:
self.register_defaults()
[docs] def register_defaults(self) -> "SRUQueryParserRegistry.Builder":
"""Registers registers SRU/CQL standard query parsers
(queryType **cql** and **searchTerms**)."""
if not SRUQueryParserRegistry._find_parser(self.parsers, SRUQueryType.CQL):
try:
self.register(CQLQueryParser())
except SRUConfigException:
pass
if not SRUQueryParserRegistry._find_parser(
self.parsers, SRUQueryType.SEARCH_TERMS
):
try:
self.register(SearchTermsQueryParser())
except SRUConfigException:
pass
return self
[docs] def register(
self, parser: SRUQueryParser[Any]
) -> "SRUQueryParserRegistry.Builder":
"""Register a new query parser
Args:
parser (SRUQueryParser[Any]): the query parser
instance to be registered
Raises:
`SRUConfigException`: if a query parser for the same
query type was already registered
"""
if parser is None:
raise TypeError("parser is None")
if parser.query_type is None:
raise TypeError("parser.query_type is None")
# duplicate-save add ...
if not SRUQueryParserRegistry._find_parser(self.parsers, parser.query_type):
self.parsers.append(parser)
else:
raise SRUConfigException(
f"query parser for query_type '{parser.query_type}' is already registered"
)
return self
[docs] def build(self) -> "SRUQueryParserRegistry":
"""Create a configured `SRUQueryParserRegistry` instance
from this builder.
Returns:
SRUQueryParserRegistry: a `SRUQueryParserRegistry`
instance
"""
return SRUQueryParserRegistry(self.parsers)
# ---------------------------------------------------------------------------
[docs]class SearchTermsQuery(SRUQuery[List[str]]):
@property
def query_type(self) -> str:
return SRUQueryType.SEARCH_TERMS.value
[docs]class SearchTermsQueryParser(SRUQueryParser[List[str]]):
@property
def query_type(self) -> str:
return SRUQueryType.SEARCH_TERMS
@property
def query_parameter_names(self) -> List[str]:
return [SRUParam.QUERY.value]
[docs] def supports_version(self, version: Optional[SRUVersion]) -> bool:
if not version:
raise TypeError("Argument version is invalid/None.")
# java: version.compareTo(SRUVersion.VERSION_2_0) >= 0
return version < SRUVersion.VERSION_2_0
[docs] def parse_query(
self,
version: SRUVersion,
parameters: Dict[str, str],
diagnostics: SRUDiagnosticList,
) -> Optional[SRUQuery[List[str]]]:
raw_query = parameters.get(SRUParam.QUERY)
if raw_query is None:
diagnostics.add_diagnostic(
SRUDiagnostics.GENERAL_SYSTEM_ERROR,
message="no query passed to query parser",
)
return None
terms = raw_query.split()
return SearchTermsQuery(raw_query, terms)
# ---------------------------------------------------------------------------
[docs]class CQLQuery(SRUQuery[cql.CQLQuery]):
@property
def query_type(self) -> str:
return SRUQueryType.CQL.value
[docs]class CQLQueryParser(SRUQueryParser[cql.CQLQuery]):
"""Default query parser to parse CQL."""
@property
def query_type(self) -> str:
return SRUQueryType.CQL
@property
def query_parameter_names(self) -> List[str]:
return [SRUParam.QUERY.value]
[docs] def supports_version(self, version: Optional[SRUVersion]) -> bool:
if not version:
raise TypeError("Argument version is invalid/None.")
# CQL is supported by all SRU versions ...
return True
[docs] def parse_query(
self,
version: SRUVersion,
parameters: Dict[str, str],
diagnostics: SRUDiagnosticList,
) -> Optional[SRUQuery[cql.CQLQuery]]:
raw_query = parameters.get(SRUParam.QUERY)
if raw_query is None:
diagnostics.add_diagnostic(
SRUDiagnostics.GENERAL_SYSTEM_ERROR,
message="no query passed to query parser",
)
return None
# XXX: maybe query length against limit and return
# "Too many characters in query" error?
try:
parser_cls: Type[cql.CQLParser] = cql.CQLParser12
if version == SRUVersion.VERSION_1_1:
parser_cls = cql.CQLParser11
elif version in (SRUVersion.VERSION_1_2, SRUVersion.VERSION_2_0):
parser_cls = cql.CQLParser12
parser = parser_cls()
parser.build()
parsed_query = parser.parse(raw_query)
return CQLQuery(raw_query, parsed_query)
except cql.parser.CQLParserError:
diagnostics.add_diagnostic(
SRUDiagnostics.QUERY_SYNTAX_ERROR, message="error parsing query"
)
except Exception:
diagnostics.add_diagnostic(
SRUDiagnostics.QUERY_SYNTAX_ERROR, message="error parsing query"
)
return None
# ---------------------------------------------------------------------------