-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathStreamQuery.js
98 lines (81 loc) · 2.96 KB
/
StreamQuery.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import N3Parser from '@rdfjs/parser-n3'
import asyncToReadabe from './lib/asyncToReadabe.js'
import checkResponse from './lib/checkResponse.js'
import mergeHeaders from './lib/mergeHeaders.js'
import RawQuery from './RawQuery.js'
import ResultParser from './ResultParser.js'
/**
* A query implementation based on {@link RawQuery} that parses SPARQL results into Readable streams of RDF/JS Quad
* objects (CONSTRUCT/DESCRIBE) or Readable streams of objects (SELECT).
*
* @extends RawQuery
*/
class StreamQuery extends RawQuery {
/**
* Sends a request for a ASK query
*
* @param {string} query ASK query
* @param {Object} [options]
* @param {Headers} [options.headers] additional request headers
* @param {'get'|'postUrlencoded'|'postDirect'} [options.operation='get'] SPARQL Protocol operation
* @return {Promise<boolean>}
*/
async ask (query, { headers, operation } = {}) {
const res = await super.ask(query, { headers, operation })
await checkResponse(res)
const json = await res.json()
return json.boolean
}
/**
* Sends a request for a CONSTRUCT or DESCRIBE query
*
* @param {string} query CONSTRUCT or DESCRIBE query
* @param {Object} [options]
* @param {Headers} [options.headers] additional request headers
* @param {'get'|'postUrlencoded'|'postDirect'} [options.operation='get'] SPARQL Protocol operation
* @return {Readable}
*/
construct (query, { headers, operation } = {}) {
return asyncToReadabe(async () => {
headers = mergeHeaders(headers)
if (!headers.has('accept')) {
headers.set('accept', 'application/n-triples, text/turtle')
}
const res = await super.construct(query, { headers, operation })
await checkResponse(res)
const parser = new N3Parser({ factory: this.client.factory })
return parser.import(res.body)
})
}
/**
* Sends a request for a SELECT query
*
* @param {string} query SELECT query
* @param {Object} [options]
* @param {Headers} [options.headers] additional request headers
* @param {'get'|'postUrlencoded'|'postDirect'} [options.operation='get'] SPARQL Protocol operation
* @return {Readable}
*/
select (query, { headers, operation } = {}) {
return asyncToReadabe(async () => {
const res = await super.select(query, { headers, operation })
await checkResponse(res)
const parser = new ResultParser({ factory: this.client.factory })
return res.body.pipe(parser)
})
}
/**
* Sends a request for an update query
*
* @param {string} query update query
* @param {Object} [options]
* @param {Headers} [options.headers] additional request headers
* @param {'get'|'postUrlencoded'|'postDirect'} [options.operation='postUrlencoded'] SPARQL Protocol operation
* @return {Promise<void>}
*/
async update (query, { headers, operation } = {}) {
const res = await super.update(query, { headers, operation })
await checkResponse(res)
}
}
export default StreamQuery