diff --git a/quit/helpers.py b/quit/helpers.py index 2611a641..844cd3e6 100644 --- a/quit/helpers.py +++ b/quit/helpers.py @@ -6,7 +6,7 @@ from rdflib.term import URIRef from rdflib.plugins.sparql.parserutils import CompValue from rdflib.plugins.sparql.parser import parseQuery, parseUpdate -from quit.tools.algebra import translateQuery, translateUpdate +from rdflib.plugins.sparql.algebra import translateQuery, translateUpdate from rdflib.plugins.serializers.nt import _nt_row as _nt from rdflib.plugins.sparql import parser, algebra from rdflib.plugins import sparql diff --git a/quit/tools/algebra.py b/quit/tools/algebra.py deleted file mode 100644 index 5ce5364c..00000000 --- a/quit/tools/algebra.py +++ /dev/null @@ -1,804 +0,0 @@ - -""" -Converting the 'parse-tree' output of pyparsing to a SPARQL Algebra expression - -http://www.w3.org/TR/sparql11-query/#sparqlQuery - -""" - -import functools -import operator -import collections - -from rdflib import Literal, Variable, URIRef, BNode - -from rdflib.plugins.sparql.sparql import Prologue, Query -from rdflib.plugins.sparql.parserutils import CompValue, Expr -from rdflib.plugins.sparql.operators import ( - and_, TrueFilter, simplify as simplifyFilters) -from rdflib.paths import ( - InvPath, AlternativePath, SequencePath, MulPath, NegatedPath) - -from pyparsing import ParseResults -from functools import reduce - - -# --------------------------- -# Some convenience methods -def OrderBy(p, expr): - return CompValue('OrderBy', p=p, expr=expr) - - -def ToMultiSet(p): - return CompValue('ToMultiSet', p=p) - - -def Union(p1, p2): - return CompValue('Union', p1=p1, p2=p2) - - -def Join(p1, p2): - return CompValue('Join', p1=p1, p2=p2) - - -def Minus(p1, p2): - return CompValue('Minus', p1=p1, p2=p2) - - -def Graph(term, graph): - return CompValue('Graph', term=term, p=graph) - - -def BGP(triples=None): - return CompValue('BGP', triples=triples or []) - - -def LeftJoin(p1, p2, expr): - return CompValue('LeftJoin', p1=p1, p2=p2, expr=expr) - - -def Filter(expr, p): - return CompValue('Filter', expr=expr, p=p) - - -def Extend(p, expr, var): - return CompValue('Extend', p=p, expr=expr, var=var) - - -def Project(p, PV): - return CompValue('Project', p=p, PV=PV) - - -def Group(p, expr=None): - return CompValue('Group', p=p, expr=expr) - -def Service(term, graph): - return CompValue('Service', term=term, p=graph) - -def _knownTerms(triple, varsknown, varscount): - return (len([_f for _f in (x not in varsknown and - isinstance( - x, (Variable, BNode)) for x in triple) if _f]), - -sum(varscount.get(x, 0) for x in triple), - not isinstance(triple[2], Literal), - ) - - -def reorderTriples(l): - """ - Reorder triple patterns so that we execute the - ones with most bindings first - """ - - def _addvar(term, varsknown): - if isinstance(term, (Variable, BNode)): - varsknown.add(term) - - l = [(None, x) for x in l] - varsknown = set() - varscount = collections.defaultdict(int) - for t in l: - for c in t[1]: - if isinstance(c, (Variable, BNode)): - varscount[c] += 1 - i = 0 - - # Done in steps, sort by number of bound terms - # the top block of patterns with the most bound terms is kept - # the rest is resorted based on the vars bound after the first - # block is evaluated - - # we sort by decorate/undecorate, since we need the value of the sort keys - - while i < len(l): - l[i:] = sorted((_knownTerms(x[ - 1], varsknown, varscount), x[1]) for x in l[i:]) - t = l[i][0][0] # top block has this many terms bound - j = 0 - while i+j < len(l) and l[i+j][0][0] == t: - for c in l[i+j][1]: - _addvar(c, varsknown) - j += 1 - i += 1 - - return [x[1] for x in l] - - -def triples(l): - - l = reduce(lambda x, y: x + y, l) - if (len(l) % 3) != 0: - raise Exception('these aint triples') - return reorderTriples((l[x], l[x + 1], l[x + 2]) - for x in range(0, len(l), 3)) - - -def translatePName(p, prologue): - """ - Expand prefixed/relative URIs - """ - if isinstance(p, CompValue): - if p.name == 'pname': - return prologue.absolutize(p) - if p.name == 'literal': - return Literal(p.string, lang=p.lang, - datatype=prologue.absolutize(p.datatype)) - elif isinstance(p, URIRef): - return prologue.absolutize(p) - - -def translatePath(p): - - """ - Translate PropertyPath expressions - """ - - if isinstance(p, CompValue): - if p.name == 'PathAlternative': - if len(p.part) == 1: - return p.part[0] - else: - return AlternativePath(*p.part) - - elif p.name == 'PathSequence': - if len(p.part) == 1: - return p.part[0] - else: - return SequencePath(*p.part) - - elif p.name == 'PathElt': - if not p.mod: - return p.part - else: - if isinstance(p.part, list): - if len(p.part) != 1: - raise Exception('Denkfehler!') - - return MulPath(p.part[0], p.mod) - else: - return MulPath(p.part, p.mod) - - elif p.name == 'PathEltOrInverse': - if isinstance(p.part, list): - if len(p.part) != 1: - raise Exception('Denkfehler!') - return InvPath(p.part[0]) - else: - return InvPath(p.part) - - elif p.name == 'PathNegatedPropertySet': - if isinstance(p.part, list): - return NegatedPath(AlternativePath(*p.part)) - else: - return NegatedPath(p.part) - - -def translateExists(e): - - """ - Translate the graph pattern used by EXISTS and NOT EXISTS - http://www.w3.org/TR/sparql11-query/#sparqlCollectFilters - """ - - def _c(n): - if isinstance(n, CompValue): - if n.name in ('Builtin_EXISTS', 'Builtin_NOTEXISTS'): - n.graph = translateGroupGraphPattern(n.graph) - - e = traverse(e, visitPost=_c) - - return e - - -def collectAndRemoveFilters(parts): - - """ - - FILTER expressions apply to the whole group graph pattern in which - they appear. - - http://www.w3.org/TR/sparql11-query/#sparqlCollectFilters - """ - - filters = [] - - i = 0 - while i < len(parts): - p = parts[i] - if p.name == 'Filter': - filters.append(translateExists(p.expr)) - parts.pop(i) - else: - i += 1 - - if filters: - return and_(*filters) - - return None - - -def translateGroupOrUnionGraphPattern(graphPattern): - A = None - - for g in graphPattern.graph: - g = translateGroupGraphPattern(g) - if not A: - A = g - else: - A = Union(A, g) - return A - -def translateServiceGraphPattern(graphPattern): - return Service(graphPattern.term, - translateGroupGraphPattern(graphPattern.graph)) - -def translateGraphGraphPattern(graphPattern): - return Graph(graphPattern.term, - translateGroupGraphPattern(graphPattern.graph)) - - -def translateInlineData(graphPattern): - return ToMultiSet(translateValues(graphPattern)) - - -def translateGroupGraphPattern(graphPattern): - """ - http://www.w3.org/TR/sparql11-query/#convertGraphPattern - """ - - if graphPattern.name == 'SubSelect': - return ToMultiSet(translate(graphPattern)[0]) - - if not graphPattern.part: - graphPattern.part = [] # empty { } - - filters = collectAndRemoveFilters(graphPattern.part) - - g = [] - for p in graphPattern.part: - if p.name == 'TriplesBlock': - # merge adjacent TripleBlocks - if not (g and g[-1].name == 'BGP'): - g.append(BGP()) - g[-1]["triples"] += triples(p.triples) - else: - g.append(p) - - G = BGP() - for p in g: - if p.name == 'OptionalGraphPattern': - A = translateGroupGraphPattern(p.graph) - if A.name == 'Filter': - G = LeftJoin(G, A.p, A.expr) - else: - G = LeftJoin(G, A, TrueFilter) - elif p.name == 'MinusGraphPattern': - G = Minus(p1=G, p2=translateGroupGraphPattern(p.graph)) - elif p.name == 'GroupOrUnionGraphPattern': - G = Join(p1=G, p2=translateGroupOrUnionGraphPattern(p)) - elif p.name == 'GraphGraphPattern': - G = Join(p1=G, p2=translateGraphGraphPattern(p)) - elif p.name == 'InlineData': - G = Join(p1=G, p2=translateInlineData(p)) - elif p.name == 'ServiceGraphPattern': - G = Join(p1=G, p2=translateServiceGraphPattern(p)) - elif p.name in ('BGP', 'Extend'): - G = Join(p1=G, p2=p) - elif p.name == 'Bind': - G = Extend(G, p.expr, p.var) - - else: - raise Exception('Unknown part in GroupGraphPattern: %s - %s' % - (type(p), p.name)) - - if filters: - G = Filter(expr=filters, p=G) - - return G - - -class StopTraversal(Exception): - def __init__(self, rv): - self.rv = rv - - -def _traverse(e, visitPre=lambda n: None, visitPost=lambda n: None): - """ - Traverse a parse-tree, visit each node - - if visit functions return a value, replace current node - """ - _e = visitPre(e) - if _e is not None: - return _e - - if e is None: - return None - - if isinstance(e, (list, ParseResults)): - return [_traverse(x, visitPre, visitPost) for x in e] - elif isinstance(e, tuple): - return tuple([_traverse(x, visitPre, visitPost) for x in e]) - - elif isinstance(e, CompValue): - for k, val in e.items(): - e[k] = _traverse(val, visitPre, visitPost) - - _e = visitPost(e) - if _e is not None: - return _e - - return e - - -def _traverseAgg(e, visitor=lambda n, v: None): - """ - Traverse a parse-tree, visit each node - - if visit functions return a value, replace current node - """ - - res = [] - - if isinstance(e, (list, ParseResults, tuple)): - res = [_traverseAgg(x, visitor) for x in e] - - elif isinstance(e, CompValue): - for k, val in e.items(): - if val != None: - res.append(_traverseAgg(val, visitor)) - - return visitor(e, res) - - -def traverse( - tree, visitPre=lambda n: None, - visitPost=lambda n: None, complete=None): - """ - Traverse tree, visit each node with visit function - visit function may raise StopTraversal to stop traversal - if complete!=None, it is returned on complete traversal, - otherwise the transformed tree is returned - """ - try: - r = _traverse(tree, visitPre, visitPost) - if complete is not None: - return complete - return r - except StopTraversal as st: - return st.rv - - -def _hasAggregate(x): - """ - Traverse parse(sub)Tree - return true if any aggregates are used - """ - - if isinstance(x, CompValue): - if x.name.startswith('Aggregate_'): - raise StopTraversal(True) - - -def _aggs(e, A): - """ - Collect Aggregates in A - replaces aggregates with variable references - """ - - # TODO: nested Aggregates? - - if isinstance(e, CompValue) and e.name.startswith('Aggregate_'): - A.append(e) - aggvar = Variable('__agg_%d__' % len(A)) - e["res"] = aggvar - return aggvar - - -def _findVars(x, res): - """ - Find all variables in a tree - """ - if isinstance(x, Variable): - res.add(x) - if isinstance(x, CompValue): - if x.name == "Bind": - res.add(x.var) - return x # stop recursion and finding vars in the expr - elif x.name == 'SubSelect': - if x.projection: - res.update(v.var or v.evar for v in x.projection) - return x - - -def _addVars(x, children): - # import pdb; pdb.set_trace() - if isinstance(x, Variable): - return set([x]) - elif isinstance(x, CompValue): - x["_vars"] = set(reduce(operator.or_, children, set())) - if x.name == "Bind": - return set([x.var]) - elif x.name == 'SubSelect': - if x.projection: - s = set(v.var or v.evar for v in x.projection) - else: - s = set() - - return s - return reduce(operator.or_, children, set()) - - -def _sample(e, v=None): - """ - For each unaggregated variable V in expr - Replace V with Sample(V) - """ - if isinstance(e, CompValue) and e.name.startswith("Aggregate_"): - return e # do not replace vars in aggregates - if isinstance(e, Variable) and v != e: - return CompValue('Aggregate_Sample', vars=e) - - -def _simplifyFilters(e): - if isinstance(e, Expr): - return simplifyFilters(e) - - -def translateAggregates(q, M): - E = [] - A = [] - - # collect/replace aggs in : - # select expr as ?var - if q.projection: - for v in q.projection: - if v.evar: - v.expr = traverse(v.expr, functools.partial(_sample, v=v.evar)) - v.expr = traverse(v.expr, functools.partial(_aggs, A=A)) - - - # having clause - if traverse(q.having, _hasAggregate, complete=False): - q.having = traverse(q.having, _sample) - traverse(q.having, functools.partial(_aggs, A=A)) - - # order by - if traverse(q.orderby, _hasAggregate, complete=False): - q.orderby = traverse(q.orderby, _sample) - traverse(q.orderby, functools.partial(_aggs, A=A)) - - # sample all other select vars - # TODO: only allowed for vars in group-by? - if q.projection: - for v in q.projection: - if v.var: - rv = Variable('__agg_%d__' % (len(A) + 1)) - A.append(CompValue('Aggregate_Sample', vars=v.var, res=rv)) - E.append((rv, v.var)) - - return CompValue('AggregateJoin', A=A, p=M), E - - -def translateValues(v): - # if len(v.var)!=len(v.value): - # raise Exception("Unmatched vars and values in ValueClause: "+str(v)) - - res = [] - if not v.var: - return res - if not v.value: - return res - if not isinstance(v.value[0], list): - - for val in v.value: - res.append({v.var[0]: val}) - else: - for vals in v.value: - res.append(dict(list(zip(v.var, vals)))) - - return CompValue('values', res=res) - - -def translate(q): - """ - http://www.w3.org/TR/sparql11-query/#convertSolMod - - """ - - _traverse(q, _simplifyFilters) - - q.where = traverse(q.where, visitPost=translatePath) - - # TODO: Var scope test - VS = set() - traverse(q.where, functools.partial(_findVars, res=VS)) - - # all query types have a where part - M = translateGroupGraphPattern(q.where) - - aggregate = False - if q.groupby: - conditions = [] - # convert "GROUP BY (?expr as ?var)" to an Extend - for c in q.groupby.condition: - if isinstance(c, CompValue) and c.name == 'GroupAs': - M = Extend(M, c.expr, c.var) - c = c.var - conditions.append(c) - - M = Group(p=M, expr=conditions) - aggregate = True - elif traverse(q.having, _hasAggregate, complete=False) or \ - traverse(q.orderby, _hasAggregate, complete=False) or \ - any(traverse(x.expr, _hasAggregate, complete=False) - for x in q.projection or [] if x.evar): - # if any aggregate is used, implicit group by - M = Group(p=M) - aggregate = True - - if aggregate: - M, E = translateAggregates(q, M) - else: - E = [] - - # HAVING - if q.having: - M = Filter(expr=and_(*q.having.condition), p=M) - - # VALUES - if q.valuesClause: - M = Join(p1=M, p2=ToMultiSet(translateValues(q.valuesClause))) - - if not q.projection: - # select * - PV = list(VS) - else: - PV = list() - for v in q.projection: - if v.var: - if v not in PV: - PV.append(v.var) - elif v.evar: - if v not in PV: - PV.append(v.evar) - - E.append((v.expr, v.evar)) - else: - raise Exception("I expected a var or evar here!") - - for e, v in E: - M = Extend(M, e, v) - - # ORDER BY - if q.orderby: - M = OrderBy(M, [CompValue('OrderCondition', expr=c.expr, - order=c.order) for c in q.orderby.condition]) - - # PROJECT - M = Project(M, PV) - - if q.modifier: - if q.modifier == 'DISTINCT': - M = CompValue('Distinct', p=M) - elif q.modifier == 'REDUCED': - M = CompValue('Reduced', p=M) - - if q.limitoffset: - offset = 0 - if q.limitoffset.offset != None: - offset = q.limitoffset.offset.toPython() - - if q.limitoffset.limit != None: - M = CompValue('Slice', p=M, start=offset, - length=q.limitoffset.limit.toPython()) - else: - M = CompValue('Slice', p=M, start=offset) - - return M, PV - - -def simplify(n): - """Remove joins to empty BGPs""" - if isinstance(n, CompValue): - if n.name == 'Join': - if n.p1.name == 'BGP' and len(n.p1.triples) == 0: - return n.p2 - if n.p2.name == 'BGP' and len(n.p2.triples) == 0: - return n.p1 - elif n.name == 'BGP': - n["triples"] = reorderTriples(n.triples) - return n - - -def analyse(n, children): - - if isinstance(n, CompValue): - if n.name == 'Join': - n["lazy"] = all(children) - return False - elif n.name in ('Slice', 'Distinct'): - return False - else: - return all(children) - else: - return True - - -def translatePrologue(p, base, initNs=None, prologue=None): - - if prologue is None: - prologue = Prologue() - prologue.base = "" - if base: - prologue.base = base - if initNs: - for k, v in initNs.items(): - prologue.bind(k, v) - - for x in p: - if x.name == 'Base': - prologue.base = x.iri - elif x.name == 'PrefixDecl': - prologue.bind(x.prefix, prologue.absolutize(x.iri)) - - return prologue - - -def translateQuads(quads): - if quads.triples: - alltriples = triples(quads.triples) - else: - alltriples = [] - - allquads = collections.defaultdict(list) - - if quads.quadsNotTriples: - for q in quads.quadsNotTriples: - if q.triples: - allquads[q.term] += triples(q.triples) - - return alltriples, allquads - - -def translateUpdate1(u, prologue): - if u.name in ('Load', 'Clear', 'Drop', 'Create'): - pass # no translation needed - elif u.name in ('Add', 'Move', 'Copy'): - pass - elif u.name in ('InsertData', 'DeleteData', 'DeleteWhere'): - t, q = translateQuads(u.quads) - u["quads"] = q - u["triples"] = t - if u.name in ('DeleteWhere', 'DeleteData'): - pass # TODO: check for bnodes in triples - elif u.name == 'Modify': - if u.delete: - u.delete["triples"], u.delete[ - "quads"] = translateQuads(u.delete.quads) - if u.insert: - u.insert["triples"], u.insert[ - "quads"] = translateQuads(u.insert.quads) - u["where"] = translateGroupGraphPattern(u.where) - else: - raise Exception('Unknown type of update operation: %s' % u) - - u.prologue = prologue - return u - - -def translateUpdate(q, base=None, initNs=None): - """ - Returns a list of SPARQL Update Algebra expressions - """ - - res = [] - prologue = None - if not q.request: - return res - for p, u in zip(q.prologue, q.request): - prologue = translatePrologue(p, base, initNs, prologue) - - # absolutize/resolve prefixes - u = traverse( - u, visitPost=functools.partial(translatePName, prologue=prologue)) - u = _traverse(u, _simplifyFilters) - - u = traverse(u, visitPost=translatePath) - - res.append(translateUpdate1(u, prologue)) - - return res - - -def translateQuery(q, base=None, initNs=None): - """ - Translate a query-parsetree to a SPARQL Algebra Expression - - Return a rdflib.plugins.sparql.sparql.Query object - """ - - # We get in: (prologue, query) - - prologue = translatePrologue(q[0], base, initNs) - - # absolutize/resolve prefixes - q[1] = traverse( - q[1], visitPost=functools.partial(translatePName, prologue=prologue)) - - P, PV = translate(q[1]) - datasetClause = q[1].datasetClause - if q[1].name == 'ConstructQuery': - - template = triples(q[1].template) if q[1].template else None - - res = CompValue(q[1].name, p=P, - template=template, - datasetClause=datasetClause) - else: - res = CompValue(q[1].name, p=P, datasetClause=datasetClause, PV=PV) - - res = traverse(res, visitPost=simplify) - _traverseAgg(res, visitor=analyse) - _traverseAgg(res, _addVars) - - return Query(prologue, res) - - -def pprintAlgebra(q): - def pp(p, ind=" "): - # if isinstance(p, list): - # print "[ " - # for x in p: pp(x,ind) - # print "%s ]"%ind - # return - if not isinstance(p, CompValue): - print(p) - return - print("%s(" % (p.name, )) - for k in p: - print("%s%s =" % (ind, k,), end=' ') - pp(p[k], ind + " ") - print("%s)" % ind) - - try: - pp(q.algebra) - except AttributeError: - # it's update, just a list - for x in q: - pp(x) - -if __name__ == '__main__': - import sys - from rdflib.plugins.sparql import parser - import os.path - - if os.path.exists(sys.argv[1]): - q = file(sys.argv[1]) - else: - q = sys.argv[1] - - pq = parser.parseQuery(q) - print(pq) - tq = translateQuery(pq) - print(pprintAlgebra(tq)) diff --git a/quit/tools/evaluate.py b/quit/tools/evaluate.py deleted file mode 100644 index 0323217a..00000000 --- a/quit/tools/evaluate.py +++ /dev/null @@ -1,490 +0,0 @@ -""" -These method recursively evaluate the SPARQL Algebra - -evalQuery is the entry-point, it will setup context and -return the SPARQLResult object - -evalPart is called on each level and will delegate to the right method - -A rdflib.plugins.sparql.sparql.QueryContext is passed along, keeping -information needed for evaluation - -A list of dicts (solution mappings) is returned, apart from GroupBy which may -also return a dict of list of dicts - -""" - -import collections - -from rdflib import Variable, Graph, BNode, URIRef, Literal -from six import iteritems, itervalues - -from rdflib.plugins.sparql import CUSTOM_EVALS -from rdflib.plugins.sparql.parserutils import value -from rdflib.plugins.sparql.sparql import ( - QueryContext, AlreadyBound, FrozenBindings, SPARQLError) -from rdflib.plugins.sparql.evalutils import ( - _filter, _eval, _join, _diff, _minus, _fillTemplate, _ebv, _val) - -from rdflib.plugins.sparql.aggregates import Aggregator -from rdflib.plugins.sparql.algebra import Join, ToMultiSet, Values - -from quit.web import service -from quit.exceptions import UnSupportedQuery, UnSupportedQueryType, FromNamedError - -def evalBGP(ctx, bgp): - - """ - A basic graph pattern - """ - - if not bgp: - yield ctx.solution() - return - - s, p, o = bgp[0] - - _s = ctx[s] - _p = ctx[p] - _o = ctx[o] - - for ss, sp, so in ctx.graph.triples((_s, _p, _o)): - if None in (_s, _p, _o): - c = ctx.push() - else: - c = ctx - - if _s is None: - c[s] = ss - - try: - if _p is None: - c[p] = sp - except AlreadyBound: - continue - - try: - if _o is None: - c[o] = so - except AlreadyBound: - continue - - for x in evalBGP(c, bgp[1:]): - yield x - - -def evalExtend(ctx, extend): - # TODO: Deal with dict returned from evalPart from GROUP BY - - for c in evalPart(ctx, extend.p): - try: - e = _eval(extend.expr, c.forget(ctx, _except=extend._vars)) - if isinstance(e, SPARQLError): - raise e - - yield c.merge({extend.var: e}) - - except SPARQLError: - yield c - - -def evalLazyJoin(ctx, join): - """ - A lazy join will push the variables bound - in the first part to the second part, - essentially doing the join implicitly - hopefully evaluating much fewer triples - """ - for a in evalPart(ctx, join.p1): - c = ctx.thaw(a) - for b in evalPart(c, join.p2): - yield b.merge(a) # merge, as some bindings may have been forgotten - - -def evalJoin(ctx, join): - - # TODO: Deal with dict returned from evalPart from GROUP BY - # only ever for join.p1 - - if join.lazy: - return evalLazyJoin(ctx, join) - else: - a = evalPart(ctx, join.p1) - b = set(evalPart(ctx, join.p2)) - return _join(a, b) - - -def evalUnion(ctx, union): - res = set() - for x in evalPart(ctx, union.p1): - res.add(x) - yield x - for x in evalPart(ctx, union.p2): - if x not in res: - yield x - - -def evalMinus(ctx, minus): - a = evalPart(ctx, minus.p1) - b = set(evalPart(ctx, minus.p2)) - return _minus(a, b) - - -def evalLeftJoin(ctx, join): - # import pdb; pdb.set_trace() - for a in evalPart(ctx, join.p1): - ok = False - c = ctx.thaw(a) - for b in evalPart(c, join.p2): - if _ebv(join.expr, b.forget(ctx)): - ok = True - yield b - if not ok: - # we've cheated, the ctx above may contain - # vars bound outside our scope - # before we yield a solution without the OPTIONAL part - # check that we would have had no OPTIONAL matches - # even without prior bindings... - p1_vars = join.p1._vars - if p1_vars is None \ - or not any(_ebv(join.expr, b) for b in - evalPart(ctx.thaw(a.remember(p1_vars)), join.p2)): - - yield a - - -def evalFilter(ctx, part): - # TODO: Deal with dict returned from evalPart! - for c in evalPart(ctx, part.p): - if _ebv(part.expr, c.forget(ctx, _except=part._vars) if not part.no_isolated_scope else c): - yield c - - -def evalGraph(ctx, part): - - if ctx.dataset is None: - raise Exception( - "Non-conjunctive-graph doesn't know about " + - "graphs. Try a query without GRAPH.") - - ctx = ctx.clone() - graph = ctx[part.term] - if graph is None: - - for graph in ctx.dataset.contexts(): - - # in SPARQL the default graph is NOT a named graph - if graph == ctx.dataset.default_context: - continue - - c = ctx.pushGraph(graph) - c = c.push() - graphSolution = [{part.term: graph.identifier}] - for x in _join(evalPart(c, part.p), graphSolution): - yield x - - else: - c = ctx.pushGraph(ctx.dataset.get_context(graph)) - for x in evalPart(c, part.p): - yield x - - -def evalValues(ctx, part): - for r in part.p.res: - c = ctx.push() - try: - for k, v in r.items(): - if v != 'UNDEF': - c[k] = v - except AlreadyBound: - continue - - yield c.solution() - - -def evalMultiset(ctx, part): - - if part.p.name == 'values': - return evalValues(ctx, part) - - return evalPart(ctx, part.p) - - -def evalPart(ctx, part): - - # try custom evaluation functions - for name, c in CUSTOM_EVALS.items(): - try: - return c(ctx, part) - except NotImplementedError: - pass # the given custome-function did not handle this part - - if part.name == 'BGP': - # Reorder triples patterns by number of bound nodes in the current ctx - # Do patterns with more bound nodes first - triples = sorted(part.triples, key=lambda t: len([n for n in t if ctx[n] is None])) - - return evalBGP(ctx, triples) - elif part.name == 'Filter': - return evalFilter(ctx, part) - elif part.name == 'Join': - return evalJoin(ctx, part) - elif part.name == 'LeftJoin': - return evalLeftJoin(ctx, part) - elif part.name == 'Graph': - return evalGraph(ctx, part) - elif part.name == 'Union': - return evalUnion(ctx, part) - elif part.name == 'ToMultiSet': - return evalMultiset(ctx, part) - elif part.name == 'Extend': - return evalExtend(ctx, part) - elif part.name == 'Minus': - return evalMinus(ctx, part) - - elif part.name == 'Project': - return evalProject(ctx, part) - elif part.name == 'Slice': - return evalSlice(ctx, part) - elif part.name == 'Distinct': - return evalDistinct(ctx, part) - elif part.name == 'Reduced': - return evalReduced(ctx, part) - - elif part.name == 'OrderBy': - return evalOrderBy(ctx, part) - elif part.name == 'Group': - return evalGroup(ctx, part) - elif part.name == 'AggregateJoin': - return evalAggregateJoin(ctx, part) - - elif part.name == 'SelectQuery': - return evalSelectQuery(ctx, part) - elif part.name == 'AskQuery': - return evalAskQuery(ctx, part) - elif part.name == 'ConstructQuery': - return evalConstructQuery(ctx, part) - - elif part.name == 'Service': - return evalService(ctx, part) - - elif part.name == 'ServiceGraphPattern': - raise UnSupportedQuery('ServiceGraphPattern not implemented') - - elif part.name == 'DescribeQuery': - raise UnSupportedQueryType('DESCRIBE not implemented') - - else: - # import pdb ; pdb.set_trace() - raise UnSupportedQuery('I dont know: %s' % part.name) - - -def evalGroup(ctx, group): - - """ - http://www.w3.org/TR/sparql11-query/#defn_algGroup - """ - # grouping should be implemented by evalAggregateJoin - return evalPart(ctx, group.p) - - -def evalAggregateJoin(ctx, agg): - # import pdb ; pdb.set_trace() - p = evalPart(ctx, agg.p) - # p is always a Group, we always get a dict back - - group_expr = agg.p.expr - res = collections.defaultdict(lambda: Aggregator(aggregations=agg.A)) - - if group_expr is None: - # no grouping, just COUNT in SELECT clause - # get 1 aggregator for counting - aggregator = res[True] - for row in p: - aggregator.update(row) - else: - for row in p: - # determine right group aggregator for row - k = tuple(_eval(e, row, False) for e in group_expr) - res[k].update(row) - - # all rows are done; yield aggregated values - for aggregator in itervalues(res): - yield FrozenBindings(ctx, aggregator.get_bindings()) - - # there were no matches - if len(res) == 0: - yield FrozenBindings(ctx) - - -def evalOrderBy(ctx, part): - - res = evalPart(ctx, part.p) - - for e in reversed(part.expr): - - reverse = bool(e.order and e.order == 'DESC') - res = sorted(res, key=lambda x: _val(value(x, e.expr, variables=True)), reverse=reverse) - - return res - - -def evalSlice(ctx, slice): - # import pdb; pdb.set_trace() - res = evalPart(ctx, slice.p) - i = 0 - while i < slice.start: - next(res) - i += 1 - i = 0 - for x in res: - i += 1 - if slice.length is None: - yield x - else: - if i <= slice.length: - yield x - else: - break - - -def evalReduced(ctx, part): - """apply REDUCED to result - - REDUCED is not as strict as DISTINCT, but if the incoming rows were sorted - it should produce the same result with limited extra memory and time per - incoming row. - """ - - # This implementation uses a most recently used strategy and a limited - # buffer size. It relates to a LRU caching algorithm: - # https://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used_.28LRU.29 - MAX = 1 - # TODO: add configuration or determine "best" size for most use cases - # 0: No reduction - # 1: compare only with the last row, almost no reduction with - # unordered incoming rows - # N: The greater the buffer size the greater the reduction but more - # memory and time are needed - - # mixed data structure: set for lookup, deque for append/pop/remove - mru_set = set() - mru_queue = collections.deque() - - for row in evalPart(ctx, part.p): - if row in mru_set: - # forget last position of row - mru_queue.remove(row) - else: - #row seems to be new - yield row - mru_set.add(row) - if len(mru_set) > MAX: - # drop the least recently used row from buffer - mru_set.remove(mru_queue.pop()) - # put row to the front - mru_queue.appendleft(row) - - -def evalDistinct(ctx, part): - res = evalPart(ctx, part.p) - - done = set() - for x in res: - if x not in done: - yield x - done.add(x) - - -def evalProject(ctx, project): - res = evalPart(ctx, project.p) - - return (row.project(project.PV) for row in res) - - -def evalSelectQuery(ctx, query): - - res = {} - res["type_"] = "SELECT" - res["bindings"] = evalPart(ctx, query.p) - res["vars_"] = query.PV - return res - - -def evalAskQuery(ctx, query): - res = {} - res["type_"] = "ASK" - res["askAnswer"] = False - for x in evalPart(ctx, query.p): - res["askAnswer"] = True - break - - return res - - -def evalConstructQuery(ctx, query): - template = query.template - - if not template: - # a construct-where query - template = query.p.p.triples # query->project->bgp ... - - graph = Graph() - - for c in evalPart(ctx, query.p): - graph += _fillTemplate(template, c) - - res = {} - res["type_"] = "CONSTRUCT" - res["graph"] = graph - - return res - -def evalService(ctx, part): - - srv = service.get(part.term) - if srv is None: - raise Exception('SERVICE not implemented') - else: - c = ctx.pushGraph(srv) - c._dataset = srv - for x in evalPart(c, part.p): - yield x - -def evalQuery(graph, query, initBindings, base=None): - - initBindings = dict( ( Variable(k),v ) for k,v in iteritems(initBindings) ) - - ctx = QueryContext(graph, initBindings=initBindings) - - ctx.prologue = query.prologue - main = query.algebra - - if main.datasetClause: - if ctx.dataset is None: - raise Exception( - "Non-conjunctive-graph doesn't know about " + - "graphs! Try a query without FROM (NAMED).") - - ctx = ctx.clone() # or push/pop? - - firstDefault = False - for d in main.datasetClause: - if d.default: - - if firstDefault: - # replace current default graph - dg = ctx.dataset.get_context(BNode()) - ctx = ctx.pushGraph(dg) - firstDefault = True - - ctx.load(d.default, default=True) - - # TODO re-enable original behaviour if FROM NAMED works with named graphs - # https://github.com/AKSW/QuitStore/issues/144 - elif d.named: - raise FromNamedError - # g = d.named - # ctx.load(g, default=False) - - return evalPart(ctx, main) diff --git a/quit/tools/processor.py b/quit/tools/processor.py index e91ef3e2..f2cc18c5 100644 --- a/quit/tools/processor.py +++ b/quit/tools/processor.py @@ -1,40 +1,8 @@ -from rdflib.query import Processor, Result, UpdateProcessor -from rdflib.plugins.sparql.sparql import Query -from rdflib.plugins.sparql.parser import parseQuery, parseUpdate - -from quit.tools.algebra import translateQuery, translateUpdate -from quit.tools.evaluate import evalQuery from quit.tools.update import evalUpdate -class SPARQLUpdateProcessor(UpdateProcessor): - def __init__(self, graph): - self.graph = graph - - def update(self, strOrQuery, initBindings={}, initNs={}): - if isinstance(strOrQuery, str): - strOrQuery=translateUpdate(parseUpdate(strOrQuery), initNs=initNs) - - return evalUpdate(self.graph, strOrQuery, initBindings) - - -class SPARQLProcessor(Processor): - - def __init__(self, graph): - self.graph = graph - - def query( - self, strOrQuery, initBindings={}, - initNs={}, base=None, DEBUG=False): - """ - Evaluate a query with the given initial bindings, and initial - namespaces. The given base is used to resolve relative URIs in - the query and will be overridden by any BASE given in the query. - """ +import rdflib.plugins.sparql.processor - if not isinstance(strOrQuery, Query): - parsetree = parseQuery(strOrQuery) - query = translateQuery(parsetree, base, initNs) - else: - query = strOrQuery +rdflib.plugins.sparql.processor.evalUpdate = evalUpdate - return evalQuery(self.graph, query, initBindings, base) \ No newline at end of file +SPARQLUpdateProcessor = rdflib.plugins.sparql.processor.SPARQLUpdateProcessor +SPARQLProcessor = rdflib.plugins.sparql.processor.SPARQLProcessor diff --git a/quit/tools/update.py b/quit/tools/update.py index 5e2c7c5c..7e8823e3 100644 --- a/quit/tools/update.py +++ b/quit/tools/update.py @@ -3,19 +3,23 @@ Code for carrying out Update Operations """ -import functools - -from rdflib import Graph, Variable, URIRef +from rdflib import Graph from rdflib.term import Node -from rdflib.plugins.sparql.sparql import QueryContext from rdflib.plugins.sparql.evalutils import _fillTemplate, _join from rdflib.plugins.sparql.evaluate import evalBGP, evalPart -from collections import defaultdict from itertools import tee from quit.exceptions import UnSupportedQuery +from typing import Mapping, Optional, Sequence +from rdflib.plugins.sparql.parserutils import CompValue +from rdflib.plugins.sparql.sparql import QueryContext, Update +from rdflib.term import Identifier, URIRef, Variable + + +import rdflib.plugins.sparql.update as rdflib_update + def _append(dct, identifier, action, items): if items: if not isinstance(identifier, Node): @@ -24,31 +28,15 @@ def _append(dct, identifier, action, items): changes.append((action, items)) dct[identifier] = changes +def _filterExistingTriples(g, triples): + return list(filter(lambda triple: triple not in g, triples)) -def _graphOrDefault(ctx, g): - if g == 'DEFAULT': - return ctx.graph - else: - return ctx.dataset.get_context(g) - - -def _graphAll(ctx, g): - """ - return a list of graphs - """ - if g == 'DEFAULT': - return [ctx.graph] - elif g == 'NAMED': - return [c for c in ctx.dataset.contexts() - if c.identifier != ctx.graph.identifier] - elif g == 'ALL': - return list(ctx.dataset.contexts()) - else: - return [ctx.dataset.get_context(g)] - +def _filterNonExistingTriples(g, triples): + return list(filter(lambda triple: triple in g, triples)) def evalLoad(ctx, u): """ + TODO http://www.w3.org/TR/sparql11-update/#load """ res = {} @@ -86,38 +74,9 @@ def evalLoad(ctx, u): return res - -def evalCreate(ctx, u): - """ - http://www.w3.org/TR/sparql11-update/#create - """ - g = ctx.datset.get_context(u.graphiri) - if len(g) > 0: - raise Exception("Graph %s already exists." % g.identifier) - raise Exception("Create not implemented!") - - -def evalClear(ctx, u): - """ - http://www.w3.org/TR/sparql11-update/#clear - """ - for g in _graphAll(ctx, u.graphiri): - g.remove((None, None, None)) - - -def evalDrop(ctx, u): - """ - http://www.w3.org/TR/sparql11-update/#drop - """ - if ctx.dataset.store.graph_aware: - for g in _graphAll(ctx, u.graphiri): - ctx.dataset.store.remove_graph(g) - else: - evalClear(ctx, u) - - -def evalInsertData(ctx, u): +def evalInsertData(ctx: QueryContext, u: CompValue) -> dict: """ + Updated according to rdflib:1c256765ac7d5e7327695a44269be09e51bd88b1 http://www.w3.org/TR/sparql11-update/#insertData """ @@ -127,52 +86,53 @@ def evalInsertData(ctx, u): # add triples g = ctx.graph - filled = list(filter(lambda triple: triple not in g, u.triples)) - if filled: - _append(res["delta"], 'default', 'additions', filled) - g += filled + filled = _filterNonExistingTriples(g, u.triples) + _append(res["delta"], 'default', 'additions', filled) # add quads # u.quads is a dict of graphURI=>[triples] for g in u.quads: - cg = ctx.dataset.get_context(g) - filledq = list(filter(lambda triple: triple not in cg, u.quads[g])) - if filledq: - _append(res["delta"], cg.identifier, 'additions', filledq) - cg += filledq + # type error: Argument 1 to "get_context" of "ConjunctiveGraph" has incompatible type "Optional[Graph]"; expected "Union[IdentifiedNode, str, None]" + cg = ctx.dataset.get_context(g) # type: ignore[arg-type] + filledq = _filterExistingTriples(cg, u.quads[g]) + _append(res["delta"], cg.identifier, 'additions', filledq) + + rdflib_update.evalInsertData(ctx, u) return res -def evalDeleteData(ctx, u): +def evalDeleteData(ctx: QueryContext, u: CompValue) -> dict: """ + Updated according to rdflib:1c256765ac7d5e7327695a44269be09e51bd88b1 http://www.w3.org/TR/sparql11-update/#deleteData """ + res = {} res["type"] = "DELETE" res["delta"] = {} # remove triples g = ctx.graph - filled = list(filter(lambda triple: triple in g, u.triples)) - if filled: - _append(res["delta"], 'default', 'removals', filled) - g -= filled + filled = _filterNonExistingTriples(g, u.triples) + _append(res["delta"], 'default', 'removals', filled) # remove quads # u.quads is a dict of graphURI=>[triples] for g in u.quads: + # type error: Argument 1 to "get_context" of "ConjunctiveGraph" has incompatible type "Optional[Graph]"; expected "Union[IdentifiedNode, str, None]" cg = ctx.dataset.get_context(g) - filledq = list(filter(lambda triple: triple in cg, u.quads[g])) - if filledq: - _append(res["delta"], cg.identifier, 'removals', filledq) - cg -= filledq + filledq = _filterNonExistingTriples(cg, u.quads[g]) + _append(res["delta"], cg.identifier, 'removals', filledq) + + rdflib_update.evalDeleteData(ctx, u) return res -def evalDeleteWhere(ctx, u): +def evalDeleteWhere(ctx: QueryContext, u: CompValue) -> dict: """ + TODO http://www.w3.org/TR/sparql11-update/#deleteWhere """ @@ -198,10 +158,15 @@ def evalDeleteWhere(ctx, u): _append(res["delta"], cg.identifier, 'removals', list(filledq_delta)) cg -= filledq + #rdflib_update.evalDeleteWhere(ctx, u) + return res def evalModify(ctx, u): + """ + TODO + """ originalctx = ctx res = {} @@ -211,7 +176,6 @@ def evalModify(ctx, u): # Using replaces the dataset for evaluating the where-clause if u.using: otherDefault = False - for d in u.using: if d.default: @@ -284,74 +248,11 @@ def evalModify(ctx, u): return res -def evalAdd(ctx, u): - """ - - add all triples from src to dst - - http://www.w3.org/TR/sparql11-update/#add - """ - src, dst = u.graph - - srcg = _graphOrDefault(ctx, src) - dstg = _graphOrDefault(ctx, dst) - - if srcg.identifier == dstg.identifier: - return - - dstg += srcg - - -def evalMove(ctx, u): - """ - remove all triples from dst - add all triples from src to dst - remove all triples from src - - http://www.w3.org/TR/sparql11-update/#move - """ - - src, dst = u.graph - - srcg = _graphOrDefault(ctx, src) - dstg = _graphOrDefault(ctx, dst) - - if srcg.identifier == dstg.identifier: - return - - dstg.remove((None, None, None)) - - dstg += srcg - - if ctx.dataset.store.graph_aware: - ctx.dataset.store.remove_graph(srcg) - else: - srcg.remove((None, None, None)) - - -def evalCopy(ctx, u): - """ - remove all triples from dst - add all triples from src to dst - - http://www.w3.org/TR/sparql11-update/#copy - """ - - src, dst = u.graph - - srcg = _graphOrDefault(ctx, src) - dstg = _graphOrDefault(ctx, dst) - - if srcg.identifier == dstg.identifier: - return - - dstg.remove((None, None, None)) - - dstg += srcg - - -def evalUpdate(graph, update, initBindings=None, actionLog=False): +def evalUpdate( + graph: Graph, update: Update, initBindings: Mapping[str, Identifier] = {} +) -> None: """ + Updated according to rdflib:1c256765ac7d5e7327695a44269be09e51bd88b1 http://www.w3.org/TR/sparql11-update/#updateLanguage 'A request is a sequence of operations [...] Implementations MUST @@ -367,38 +268,46 @@ def evalUpdate(graph, update, initBindings=None, actionLog=False): This will return None on success and raise Exceptions on error + .. caution:: + + This method can access indirectly requested network endpoints, for + example, query processing will attempt to access network endpoints + specified in ``SERVICE`` directives. + + When processing untrusted or potentially malicious queries, measures + should be taken to restrict network and file access. + + For information on available security measures, see the RDFLib + :doc:`Security Considerations ` + documentation. + """ res = [] - for u in update: + for u in update.algebra: + initBindings = dict((Variable(k), v) for k, v in initBindings.items()) - ctx = QueryContext(graph) + ctx = QueryContext(graph, initBindings=initBindings) ctx.prologue = u.prologue - if initBindings: - for k, v in initBindings.items(): - if not isinstance(k, Variable): - k = Variable(k) - ctx[k] = v - try: if u.name == 'Load': result = evalLoad(ctx, u) if result: res.append(result) elif u.name == 'Clear': - evalClear(ctx, u) + rdflib_update.evalClear(ctx, u) elif u.name == 'Drop': - evalDrop(ctx, u) + rdflib_update.evalDrop(ctx, u) elif u.name == 'Create': - evalCreate(ctx, u) + rdflib_update.evalCreate(ctx, u) elif u.name == 'Add': - evalAdd(ctx, u) + rdflib_update.evalAdd(ctx, u) elif u.name == 'Move': - evalMove(ctx, u) + rdflib_update.evalMove(ctx, u) elif u.name == 'Copy': - evalCopy(ctx, u) + rdflib_update.evalCopy(ctx, u) elif u.name == 'InsertData': result = evalInsertData(ctx, u) if result: diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 5f96a4d5..734fe662 100755 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -22,9 +22,10 @@ def testBaseNamespace(self): self.assertEqual(queryType, 'SelectQuery') queryType, parsedQuery = parse_update_type(update) - self.assertEqual(parsedQuery[0]['triples'][0][0], URIRef('http://good.example/1')) - self.assertEqual(parsedQuery[0]['triples'][0][1], URIRef('http://good.example/2')) - self.assertEqual(parsedQuery[0]['triples'][0][2], URIRef('http://good.example/3')) + parsedQueryAlgebra = parsedQuery.algebra + self.assertEqual(parsedQueryAlgebra[0]['triples'][0][0], URIRef('http://good.example/1')) + self.assertEqual(parsedQueryAlgebra[0]['triples'][0][1], URIRef('http://good.example/2')) + self.assertEqual(parsedQueryAlgebra[0]['triples'][0][2], URIRef('http://good.example/3')) self.assertEqual(queryType, 'InsertData') queryType, parsedQuery = parse_query_type(construct) @@ -44,15 +45,17 @@ def testOverwrittenBaseNamespace(self): update2 = "PREFIX ex: BASE INSERT DATA { <1> <2> <3> }" queryType, parsedQuery = parse_update_type(update1, 'http://argument/') - self.assertEqual(parsedQuery[0]['triples'][0][0], URIRef('http://argument/1')) - self.assertEqual(parsedQuery[0]['triples'][0][1], URIRef('http://argument/2')) - self.assertEqual(parsedQuery[0]['triples'][0][2], URIRef('http://argument/3')) + parsedQueryAlgebra = parsedQuery.algebra + self.assertEqual(parsedQueryAlgebra[0]['triples'][0][0], URIRef('http://argument/1')) + self.assertEqual(parsedQueryAlgebra[0]['triples'][0][1], URIRef('http://argument/2')) + self.assertEqual(parsedQueryAlgebra[0]['triples'][0][2], URIRef('http://argument/3')) self.assertEqual(queryType, 'InsertData') queryType, parsedQuery = parse_update_type(update2, 'http://argument/') - self.assertEqual(parsedQuery[0]['triples'][0][0], URIRef('http://in-query/1')) - self.assertEqual(parsedQuery[0]['triples'][0][1], URIRef('http://in-query/2')) - self.assertEqual(parsedQuery[0]['triples'][0][2], URIRef('http://in-query/3')) + parsedQueryAlgebra = parsedQuery.algebra + self.assertEqual(parsedQueryAlgebra[0]['triples'][0][0], URIRef('http://in-query/1')) + self.assertEqual(parsedQueryAlgebra[0]['triples'][0][1], URIRef('http://in-query/2')) + self.assertEqual(parsedQueryAlgebra[0]['triples'][0][2], URIRef('http://in-query/3')) self.assertEqual(queryType, 'InsertData') def testOverwrittenBadBaseNamespace(self):