-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinfluxdb3.go
66 lines (55 loc) · 1.82 KB
/
influxdb3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main
import (
"context"
"fmt"
"time"
"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/hashicorp/go-retryablehttp"
"github.com/influxdata/line-protocol/v2/lineprotocol"
)
// Measurement represents a single measurement
type Measurement struct {
Measurement string `lp:"measurement"`
Timestamp time.Time `lp:"timestamp"`
// InfluxDB Line protocol's field set
Speed float64 `lp:"field,speed"`
Temperature float32 `lp:"field,temperature"`
// InfluxDB Line protocol's tag set
Site string `lp:"tag,site"`
Line string `lp:"tag,line"`
}
// influxdb represents an InfluxDB client
type influxdb struct {
client influxdb3.Client
}
// NewClient creates a new InfluxDB client
func NewClient(ctx context.Context, cfg InfluxdbConfig) (*influxdb, error) {
retryClient := retryablehttp.NewClient()
retryClient.RetryMax = 10
options := influxdb3.DefaultWriteOptions
options.Precision = lineprotocol.Millisecond
client, err := influxdb3.New(influxdb3.ClientConfig{
Host: cfg.Host,
Token: cfg.Token,
Database: cfg.Database,
HTTPClient: retryClient.StandardClient(),
})
if err != nil {
return nil, fmt.Errorf("failed to create influx client: %w", err)
}
return &influxdb{client: *client}, nil
}
// BatchWrite writes a batch of measurements to InfluxDB3
func (i *influxdb) BatchWrite(ctx context.Context, measurements []*Measurement) error {
fmt.Printf("ingesting %d measurement(s) to InfluxDb3\n", len(measurements))
measurementsBatch := make([]any, len(measurements))
for i, measurement := range measurements {
measurementsBatch[i] = measurement
}
err := i.client.WriteData(context.Background(), measurementsBatch)
if err != nil {
return fmt.Errorf("failed to write measurement(s) to InfluxDB3: %w", err)
}
fmt.Printf("wrote %d measurement(s)\n", len(measurementsBatch))
return nil
}