-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapi_stream.go
123 lines (96 loc) · 2.69 KB
/
api_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package raptor
import (
"errors"
"fmt"
"strconv"
"github.com/raptorbox/raptor-sdk-go/models"
)
//CreateStream instantiate a new API client
func CreateStream(r *Raptor) *Stream {
return &Stream{
Raptor: r,
}
}
//Stream API client
type Stream struct {
Raptor *Raptor
}
//GetConfig return the configuration
func (s *Stream) GetConfig() models.Config {
return s.Raptor.GetConfig()
}
//GetClient return a client instance
func (s *Stream) GetClient() models.Client {
return s.Raptor.GetClient()
}
//NewRecord return a new record instance
func (s *Stream) NewRecord(s1 *models.Stream) *models.Record {
return models.NewRecord(s1)
}
//Pull stored data for a stream
func (s *Stream) Pull(stream *models.Stream, offset int, size int) ([]models.Record, error) {
pager := "?" + strconv.Itoa(offset) + "&size=" + strconv.Itoa(size)
raw, err := s.GetClient().Get(fmt.Sprintf(STREAM_LIST, stream.GetDevice().ID, stream.Name)+pager, nil)
if err != nil {
return nil, err
}
res := make([]models.Record, 0)
err = s.GetClient().FromJSON(raw, res)
if err != nil {
return nil, err
}
for i := 0; i < len(res); i++ {
res[i].SetStream(stream)
}
return res, nil
}
//LastUpdate fetch the last record stored
func (s *Stream) LastUpdate(stream *models.Stream) (*models.Record, error) {
if stream == nil {
return nil, errors.New("Stream is empty")
}
if stream.GetDevice() == nil {
return nil, errors.New("Stream device is empty")
}
raw, err := s.GetClient().Get(fmt.Sprintf(STREAM_LAST_UPDATE, stream.GetDevice().ID, stream.Name), nil)
if err != nil {
return nil, err
}
res := &models.Record{}
err = s.GetClient().FromJSON(raw, res)
if err != nil {
return nil, err
}
res.SetStream(stream)
return res, nil
}
//Search stored data for a stream
func (s *Stream) Search(stream *models.Stream, q *models.DataQuery) ([]models.Record, error) {
raw, err := s.GetClient().Post(fmt.Sprintf(STREAM_LIST, stream.GetDevice().ID, stream.Name), q, nil)
if err != nil {
return nil, err
}
res := make([]models.Record, 0)
err = s.GetClient().FromJSON(raw, &res)
if err != nil {
return nil, err
}
for i := 0; i < len(res); i++ {
res[i].SetStream(stream)
}
return res, nil
}
//Push data to the backend
func (s *Stream) Push(r *models.Record) error {
log.Debugf("%v", r.GetStream().Name)
stream := r.GetStream()
if stream == nil {
return errors.New("record stream is required, use Stream.CreateRecord")
}
_, err := s.GetClient().Put(fmt.Sprintf(STREAM_PUSH, stream.GetDevice().ID, stream.Name), r, nil)
return err
}
//Delete drop the data of a stream
func (s *Stream) Delete(stream *models.Stream) error {
return s.GetClient().Delete(fmt.Sprintf(STREAM_PUSH, stream.GetDevice().ID, stream.Name), nil)
}