-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathrows.go
318 lines (279 loc) · 9.58 KB
/
rows.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
// Go driver for MySQL X Protocol
// Based heavily on Go MySQL Driver - A MySQL-Driver for Go's database/sql package
//
// Copyright 2012 The Go-MySQL-Driver Authors. All rights reserved.
// Copyright 2016 Simon J Mudd.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package mysql
import (
"database/sql/driver"
"fmt"
"io"
"log"
"github.com/golang/protobuf/proto"
"github.com/sjmudd/go-mysqlx-driver/Mysqlx"
"github.com/sjmudd/go-mysqlx-driver/Mysqlx_Resultset"
"github.com/sjmudd/go-mysqlx-driver/debug"
)
type mysqlXRows struct {
columns [](*Mysqlx_Resultset.ColumnMetaData) // holds column metadata (if present) for a row
mc *mysqlXConn
state queryState
err error // provides the error received from a query (if present)
}
// readMsgIfNecessary reads in a message only if we don't have one already
func (rows *mysqlXRows) readMsgIfNecessary() error {
// safety checks (which maybe can removed later
if rows == nil {
return fmt.Errorf("mysqlXRows.readMsgIfNecessary: rows == nil")
}
if rows.mc == nil {
return fmt.Errorf("mysqlXRows.readMsgIfNecessary: rows.mc == nil")
}
// if we already have a protobuf message then no need to read a new one
if rows.mc.pb != nil {
debug.Msg("mysqlXRows.readMsgIfNecessary: DO NOT read new message (pb != nil)")
return nil
}
debug.Msg("mysqlXRows.readMsgIfNecessary: read NEW MESSAGE")
var err error
rows.mc.pb, err = rows.mc.readMsg()
if err != nil {
err = fmt.Errorf("mysqlXRows.readMsgIfNecessary rows.mc.readMsg failed: %v", err)
rows.err = err
rows.state = queryStateError
}
return err
}
// Columns returns the column meta data of a row and may need to
// read in some of the metadata messages from the network.
func (rows *mysqlXRows) Columns() []string {
rows.collectColumnMetaData()
columns := make([]string, len(rows.columns))
for i := range rows.columns {
// FIXME: handle: if rows.mc.cfg.columnsWithAlias { ....
columns[i] = string(rows.columns[i].GetName())
}
if len(columns) == 0 {
debug.Msg("mysqlXRows.Columns: return empty []string with %d entries (probably due to SQL error)", len(columns))
} else {
debug.Msg("mysqlXRows.Columns: return %+v", columns)
}
return columns
}
// we have finished with the iterator
// - given Close can be called at any time we may have pending
// messages in the queue which need skipping so we really need
// to keep the state of where we are.
func (rows *mysqlXRows) Close() error {
debug.Msg("mysqlXRows.Close: entry")
// safety checks
if rows == nil {
debug.Msg("mysqlXRows.Close: rows == nil, ignoring")
return nil // to avoid breakage. Fix the calling code later
}
if rows.mc == nil {
return nil // no connection information
}
if rows.mc.netConn == nil {
return ErrInvalidConn
}
// We may have "query packets" which have not yet been
// processed. If so just let them through but ignore them.
for rows.state != queryStateDone && rows.state != queryStateError {
if err := rows.readMsgIfNecessary(); err != nil {
debug.Msg("mysqlXRows.Close: got an error trying to read rows: %v", err)
break
}
// Finish if we get an error or if the mssage type is EXECUTE_OK or ERROR
switch Mysqlx.ServerMessages_Type(rows.mc.pb.msgType) {
case Mysqlx.ServerMessages_ERROR:
rows.mc.processErrorMsg()
rows.state = queryStateError
case Mysqlx.ServerMessages_SQL_STMT_EXECUTE_OK:
rows.state = queryStateDone
case Mysqlx.ServerMessages_NOTICE:
rows.mc.processNotice("mysqlXRows.Close")
default:
// do nothing
}
rows.mc.pb = nil
}
// clean up
rows.columns = nil
rows.mc.pb = nil
rows.mc = nil
rows.state = queryStateStart
debug.Msg("mysqlXRows.Close: exit")
return nil
}
// add the column information to the row
func (rows *mysqlXRows) addColumnMetaData() error {
if rows == nil {
return fmt.Errorf("mysqlXrows.addColumnMetaData: rows == nil")
}
column := new(Mysqlx_Resultset.ColumnMetaData)
if err := proto.Unmarshal(rows.mc.pb.payload, column); err != nil {
return fmt.Errorf("error unmarshalling ColumnMetaData: %v", err)
}
debug.Msg("mysqlXRows.addColumnMetaData: %s", printableColumnMetaData(rows.mc.pb))
rows.columns = append(rows.columns, column)
rows.mc.pb = nil
return nil
}
// process a single row (in rows.mc.pb) and return if there was an error
func processRow(rows *mysqlXRows, dest []driver.Value) error {
var err error
myRow := new(Mysqlx_Resultset.Row)
if err = proto.Unmarshal(rows.mc.pb.payload, myRow); err != nil {
return fmt.Errorf("error unmarshalling Row: %v", err)
}
rows.mc.pb = nil // consume the message
debug.Msg("processRow: row has %d columns", len(myRow.GetField()))
// copy over data converting each type to a dest type
for i := range dest {
if dest[i], err = convertColumnData(rows.columns[i], myRow.GetField()[i]); err != nil {
return fmt.Errorf("processRow: failed to convert data for column %d: %v", i, err)
}
}
return nil // no error
}
// Read a row of data from the connection until no more and then return io.EOF to indicate we have finished
func (rows *mysqlXRows) Next(dest []driver.Value) error {
debug.Msg("ENTER mysqlXrows.Next()")
// safety checks
if rows == nil {
log.Fatal("mysqlXRows.Next: rows == nil")
}
if rows.mc == nil {
log.Fatal("mysqlXRows.Next: rows.mc == nil")
}
debug.Msg("mysqlXrows.Next: entry state: %q", rows.state.String())
// Finished? Don't continue
if rows.state.Finished() {
debug.Msg("EXIT mysqlXrows.Next(): rows.state.Finished() is true, returning io.EOF")
return io.EOF
}
// Have we read the column data yet? If not read it.
if rows.state == queryStateWaitingColumnMetaData {
if err := rows.collectColumnMetaData(); err != nil {
return err
}
}
debug.Msg("PRELOOP mysqlXrows.Next() rows.state: %v, dest has %d elements", rows.state.String(), len(dest))
// clean this logic up into a smaller more readable loop
done := false
for !done {
debug.Msg("LOOP mysqlXrows.Next() state: %v", rows.state.String())
switch rows.state {
case queryStateWaitingRow:
{
debug.Msg("mysqlXrows.Next() START queryStateWaitingRow")
// pull in a message if needed
if err := rows.readMsgIfNecessary(); err != nil {
log.Fatalf("DEBUG: mysqlXRow.Next: failed to read data if necessary")
}
// check if it's a Row message!
switch Mysqlx.ServerMessages_Type(rows.mc.pb.msgType) {
case Mysqlx.ServerMessages_RESULTSET_ROW:
{
debug.Msg("mysqlXrows.Next() process ROW")
if err := processRow(rows, dest); err != nil {
return err
}
done = true
}
case Mysqlx.ServerMessages_NOTICE:
{
debug.Msg("mysqlXrows.Next() process NOTICE")
rows.mc.processNotice("mysqlXRows.Next")
}
case Mysqlx.ServerMessages_RESULTSET_FETCH_DONE:
{
debug.Msg("mysqlXrows.Next() process FESULTSET_FETCH_DONE")
rows.state = queryStateWaitingExecuteOk
// done = true SKIP to next message
rows.mc.pb = nil
}
case Mysqlx.ServerMessages_ERROR:
{
// should treat each message
debug.Msg("mysqlXrows.Next() process ERROR")
rows.state = queryStateDone
done = true
rows.mc.pb = nil
}
default:
{
log.Fatalf("mysqlXRowx.Next received unexpected message type: %s", printableMsgTypeIn(Mysqlx.ServerMessages_Type(rows.mc.pb.msgType)))
}
}
debug.Msg("mysqlXrows.Next() END queryStateWaitingRow")
}
case queryStateDone, queryStateWaitingExecuteOk:
{
debug.Msg("mysqlXrows.Next() START queryStateWaitingExecuteOk")
return io.EOF
}
default:
{
log.Fatalf("mysqlXRows.Next: called in unexpected state: %v", rows.state.String())
// otherwise assume everything is fine
}
}
}
return nil
}
// Expectation here is to receive one of
// - RESULTSET_COLUMN_META_DATA (expected)
// - NOTICE (may happen, not expected)
// - RESULTSET_ROW (expected, changes state)
func (rows *mysqlXRows) collectColumnMetaData() error {
if rows == nil {
return fmt.Errorf("BUG: mysqlXRows.collectColumnMetaData: rows == nil")
}
debug.Msg("mysqlXRows.collectColumnMetaData: entry, rows.state: %q", rows.state.String())
for !rows.state.Finished() && rows.state != queryStateWaitingRow {
// debug.Msg("mysqlXRows.collectColumnMetaData: loop")
if err := rows.readMsgIfNecessary(); err != nil {
return fmt.Errorf("DEBUG: mysqlXRows.collectColumnMetaData: failed to read data if necessary")
}
switch Mysqlx.ServerMessages_Type(rows.mc.pb.msgType) {
case Mysqlx.ServerMessages_RESULTSET_COLUMN_META_DATA:
{
if err := rows.addColumnMetaData(); err != nil {
return fmt.Errorf("DEBUG: mysqlXRows.collectColumnMetaData: failed to addColumnMetaData: %v", err)
}
}
case Mysqlx.ServerMessages_RESULTSET_ROW:
{
rows.state = queryStateWaitingRow
debug.Msg("mysqlXRows.collectColumnMetaData: got RESULTSET_ROW: change state to %q", rows.state.String())
}
case Mysqlx.ServerMessages_NOTICE:
{
// don't really expect a notice but process it
debug.Msg("mysqlXRows.collectColumnMetaData: got NOTICE: processing it")
rows.mc.processNotice("mysqlxRows.collectColumnMetaData")
}
case Mysqlx.ServerMessages_ERROR:
{
debug.Msg("mysqlXRows.collectColumnMetaData: got ERROR: process it and change state to queryStateDone")
rows.mc.processErrorMsg()
rows.state = queryStateError
}
default:
{
e := fmt.Errorf("mysqlXRows.collectColumnMetaData: received unexpected message type: %s",
printableMsgTypeIn(Mysqlx.ServerMessages_Type(rows.mc.pb.msgType)))
rows.state = queryStateError
rows.mc.pb = nil
return e
}
}
}
return nil
}