forked from ravendb/ravendb-go-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery_operation.go
286 lines (240 loc) · 8.85 KB
/
query_operation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
package ravendb
import (
"fmt"
"io"
"reflect"
"time"
)
// queryOperation describes query operation
type queryOperation struct {
session *InMemoryDocumentSessionOperations
indexName string
indexQuery *IndexQuery
metadataOnly bool
indexEntriesOnly bool
currentQueryResults *QueryResult
fieldsToFetch *fieldsToFetchToken
startTime time.Time
disableEntitiesTracking bool
// static Log logger = LogFactory.getLog(queryOperation.class);
}
func newQueryOperation(session *InMemoryDocumentSessionOperations, indexName string, indexQuery *IndexQuery, fieldsToFetch *fieldsToFetchToken, disableEntitiesTracking bool, metadataOnly bool, indexEntriesOnly bool) (*queryOperation, error) {
res := &queryOperation{
session: session,
indexName: indexName,
indexQuery: indexQuery,
fieldsToFetch: fieldsToFetch,
disableEntitiesTracking: disableEntitiesTracking,
metadataOnly: metadataOnly,
indexEntriesOnly: indexEntriesOnly,
}
if err := res.assertPageSizeSet(); err != nil {
return nil, err
}
return res, nil
}
func (o *queryOperation) createRequest() (*QueryCommand, error) {
if err := o.session.incrementRequestCount(); err != nil {
return nil, err
}
//o.logQuery();
return NewQueryCommand(o.session.GetConventions(), o.indexQuery, o.metadataOnly, o.indexEntriesOnly)
}
func (o *queryOperation) setResult(queryResult *QueryResult) error {
return o.ensureIsAcceptableAndSaveResult(queryResult)
}
func (o *queryOperation) assertPageSizeSet() error {
if !o.session.GetConventions().ErrorIfQueryPageSizeIsNotSet {
return nil
}
if o.indexQuery.pageSize > 0 {
return nil
}
return newIllegalStateError("Attempt to query without explicitly specifying a page size. " +
"You can use .take() methods to set maximum number of results. By default the page //size is set to Integer.MAX_VALUE and can cause severe performance degradation.")
}
func (o *queryOperation) startTiming() {
o.startTime = time.Now()
}
func (o *queryOperation) logQuery() {
/*
if (logger.isInfoEnabled()) {
logger.info("Executing query " + _indexQuery.getQuery() + " on index " + _indexName + " in " + _session.storeIdentifier());
}
*/
}
func (o *queryOperation) enterQueryContext() io.Closer {
o.startTiming()
if !o.indexQuery.waitForNonStaleResults {
var res *nilCloser
return res
}
return o.session.GetDocumentStore().DisableAggressiveCaching(o.session.DatabaseName)
}
// results must be *[]<type>. If results is a nil pointer to slice,
// we create a slice and set pointer.
// we return reflect.Value that represents the slice
func makeSliceForResults(results interface{}) (reflect.Value, error) {
slicePtr := reflect.ValueOf(results)
rt := slicePtr.Type()
if rt.Kind() != reflect.Ptr || rt.Elem().Kind() != reflect.Slice {
return reflect.Value{}, fmt.Errorf("results should *[]<type>, is %T. rt: %s", results, rt)
}
slice := slicePtr.Elem()
// if this is a pointer to nil slice, create a new slice
// otherwise we use the slice that was provided by the caller
// TODO: should this always be a new slice? (in which case we should error
// if provided non-nil slice, since that implies user error
// r at least we should reset the slice to empty. Appending to existing
// slice might be confusing/unexpected to callers
if slice.IsNil() {
slice.Set(reflect.MakeSlice(slice.Type(), 0, 0))
}
return slice, nil
}
// results is *[]<type> and we'll create the slice and fill it with values
// of <type> and do the equivalent of: *results = our_slice
func (o *queryOperation) complete(results interface{}) error {
queryResult := o.currentQueryResults.createSnapshot()
if !o.disableEntitiesTracking {
o.session.registerIncludes(queryResult.Includes)
}
slice, err := makeSliceForResults(results)
if err != nil {
return err
}
tmpSlice := slice
clazz := slice.Type().Elem()
for _, document := range queryResult.Results {
metadataI, ok := document[MetadataKey]
if !ok {
return newIllegalStateError("missing metadata")
}
metadata := metadataI.(map[string]interface{})
id, _ := jsonGetAsText(metadata, MetadataID)
result := reflect.New(clazz) // this is a pointer to desired value
err := queryOperationDeserialize(result.Interface(), id, document, metadata, o.fieldsToFetch, o.disableEntitiesTracking, o.session)
if err != nil {
return newRuntimeError("Unable to read json: %s", err)
}
// de-reference pointer value
tmpSlice = reflect.Append(tmpSlice, result.Elem())
}
if !o.disableEntitiesTracking {
o.session.registerMissingIncludes(queryResult.Results, queryResult.Includes, queryResult.IncludedPaths)
}
// appending to slice might re-allocate slice value
if tmpSlice != slice {
slice.Set(tmpSlice)
}
return nil
}
func jsonIsValueNode(v interface{}) bool {
switch v.(type) {
case string, float64, bool:
return true
case []interface{}, map[string]interface{}:
return false
}
panicIf(true, "unhandled type %T", v)
return false
}
// result is pointer to value that will be set with value decoded from JSON
func queryOperationDeserialize(result interface{}, id string, document map[string]interface{}, metadata map[string]interface{}, fieldsToFetch *fieldsToFetchToken, disableEntitiesTracking bool, session *InMemoryDocumentSessionOperations) error {
_, ok := jsonGetAsBool(metadata, MetadataProjection)
if !ok {
return session.TrackEntity(result, id, document, metadata, disableEntitiesTracking)
}
tp := reflect.TypeOf(result)
panicIf(tp.Kind() != reflect.Ptr, "result should be a *<type>, is %T", result)
clazz := tp.Elem()
if fieldsToFetch != nil && len(fieldsToFetch.projections) == 1 {
// we only select a single field
isString := clazz.Kind() == reflect.String
if isString || isPrimitiveOrWrapper(clazz) || typeIsEnum(clazz) {
projectionField := fieldsToFetch.projections[0]
if fieldsToFetch.sourceAlias != "" {
// remove source-alias from projection name
projectionField = projectionField[len(fieldsToFetch.sourceAlias)+1:]
}
jsonNode, ok := document[projectionField]
if ok && jsonIsValueNode(jsonNode) {
res, err := treeToValue(clazz, jsonNode)
if err != nil {
return err
}
if res != nil {
return setInterfaceToValue(result, res)
}
return nil
}
}
inner, ok := document[fieldsToFetch.projections[0]]
if !ok {
return nil
}
if fieldsToFetch.fieldsToFetch != nil && fieldsToFetch.fieldsToFetch[0] == fieldsToFetch.projections[0] {
doc, ok := inner.(map[string]interface{})
if ok {
// extraction from original type
document = doc
}
}
}
res, err := treeToValue(clazz, document)
if err != nil {
return err
}
if stringIsNotEmpty(id) {
// we need to make an additional check, since it is possible that a value was explicitly stated
// for the identity property, in which case we don't want to override it.
identityProperty := session.GetConventions().GetIdentityProperty(clazz)
if identityProperty != "" {
if _, ok := document[identityProperty]; !ok {
session.generateEntityIDOnTheClient.trySetIdentity(res, id)
}
}
}
return setInterfaceToValue(result, res)
}
func (o *queryOperation) ensureIsAcceptableAndSaveResult(result *QueryResult) error {
if result == nil {
return newIndexDoesNotExistError("Could not find index " + o.indexName)
}
err := queryOperationEnsureIsAcceptable(result, o.indexQuery.waitForNonStaleResults, o.startTime, o.session)
if err != nil {
return err
}
o.currentQueryResults = result
// TODO: port me when we have logger
/*
if (logger.isInfoEnabled()) {
string isStale = result.isStale() ? " stale " : " ";
stringBuilder parameters = new stringBuilder();
if (_indexQuery.getQueryParameters() != null && !_indexQuery.getQueryParameters().isEmpty()) {
parameters.append("(parameters: ");
bool first = true;
for (Map.Entry<string, Object> parameter : _indexQuery.getQueryParameters().entrySet()) {
if (!first) {
parameters.append(", ");
}
parameters.append(parameter.getKey())
.append(" = ")
.append(parameter.getValue());
first = false;
}
parameters.append(") ");
}
logger.info("Query " + _indexQuery.getQuery() + " " + parameters.tostring() + "returned " + result.getResults().size() + isStale + "results (total index results: " + result.getTotalResults() + ")");
}
*/
return nil
}
func queryOperationEnsureIsAcceptable(result *QueryResult, waitForNonStaleResults bool, startTime time.Time, session *InMemoryDocumentSessionOperations) error {
if waitForNonStaleResults && result.IsStale {
duration := time.Since(startTime)
msg := "Waited for " + duration.String() + " for the query to return non stale result."
return NewTimeoutError(msg)
}
return nil
}