Skip to content

Commit

Permalink
feat: monitor SingleStore pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
kakao-levin-kitty committed Nov 27, 2024
1 parent 39d8fdd commit 0888921
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 58 deletions.
5 changes: 3 additions & 2 deletions collector/active_transaction.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"singlestore_exporter/log"
Expand Down Expand Up @@ -66,13 +67,13 @@ func (s *ScrapeActiveTransactions) Help() string {
return "Collect active transactions"
}

func (s *ScrapeActiveTransactions) Scrape(db *sqlx.DB, ch chan<- prometheus.Metric) {
func (s *ScrapeActiveTransactions) Scrape(ctx context.Context, db *sqlx.DB, ch chan<- prometheus.Metric) {
if db == nil {
return
}

views := make([]ActiveDistributedTransactionsView, 0)
if err := db.Select(&views, infoSchemaActiveDistributedTransactionsViewExistsQuery); err != nil {
if err := db.SelectContext(ctx, &views, infoSchemaActiveDistributedTransactionsViewExistsQuery); err != nil {
log.ErrorLogger.Errorf("checking existence view query failed: query=%s error=%v", infoSchemaActiveDistributedTransactionsViewExistsQuery, err)
} else if len(views) == 0 {
return
Expand Down
5 changes: 3 additions & 2 deletions collector/cached_blobs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"singlestore_exporter/log"

"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -47,13 +48,13 @@ func (s *ScrapeCachedBlobs) Help() string {
return "Collect metrics from information_schema.MV_CACHED_BLOBS"
}

func (s *ScrapeCachedBlobs) Scrape(db *sqlx.DB, ch chan<- prometheus.Metric) {
func (s *ScrapeCachedBlobs) Scrape(ctx context.Context, db *sqlx.DB, ch chan<- prometheus.Metric) {
if db == nil {
return
}

rows := make([]CachedBlobs, 0)
if err := db.Select(&rows, infoSchemaCachedBlobQuery); err != nil {
if err := db.SelectContext(ctx, &rows, infoSchemaCachedBlobQuery); err != nil {
log.ErrorLogger.Errorf("scraping query failed: query=%s error=%v", infoSchemaCachedBlobQuery, err)
return
}
Expand Down
32 changes: 19 additions & 13 deletions collector/data_disk_usage.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"encoding/json"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -48,45 +49,50 @@ var (
)

func ScrapeDataDiskUsages() {
dataDiskUsagesMu.Lock()
defer dataDiskUsagesMu.Unlock()
var totalRows []DataDiskUsage
var err error

defer func() {
dataDiskUsagesMu.Lock()
defer dataDiskUsagesMu.Unlock()

if err != nil {
dataDiskUsages = nil
} else {
dataDiskUsages = totalRows
}
}()

// get memsql nodes first
out, err := exec.Command("/usr/bin/memsqlctl", "list-nodes", "--json").Output()
var out []byte
out, err = exec.Command("/usr/bin/memsqlctl", "list-nodes", "--json").Output()
if err != nil {
log.ErrorLogger.Errorf("scraping command failed: command='memsqlctl list-nodes --json' out=%s error=%v", string(out), err)
dataDiskUsages = nil
return
}

var memsqlNodes MemsqlNodes
if err := json.Unmarshal(out, &memsqlNodes); err != nil {
if err = json.Unmarshal(out, &memsqlNodes); err != nil {
log.ErrorLogger.Errorf("unmarshal output failed: command='memsqlctl list-nodes --json' out=%s error=%v", string(out), err)
dataDiskUsages = nil
return
}

// get data disk usage per node
totalRows := make([]DataDiskUsage, 0)
for _, node := range memsqlNodes.Nodes {
out, err = exec.Command("/usr/bin/memsqlctl", "query", "--memsql-id", node.MemsqlId, "--sql", infoSchemaDataDiskUsageQuery, "--json").Output()
if err != nil {
log.ErrorLogger.Errorf("scraping command failed: command='memsqlctl query --sql '%s' --json' out=%s error=%v", infoSchemaDataDiskUsageQuery, string(out), err)
dataDiskUsages = nil
return
}

var rows DataDiskUsageRows
if err := json.Unmarshal(out, &rows); err != nil {
if err = json.Unmarshal(out, &rows); err != nil {
log.ErrorLogger.Errorf("unmarshal output failed: command='memsqlctl query --sql '%s' --json' out=%s error=%v", infoSchemaDataDiskUsageQuery, string(out), err)
dataDiskUsages = nil
return
}

totalRows = append(totalRows, rows.Rows...)
}

dataDiskUsages = totalRows
}

const (
Expand Down Expand Up @@ -138,7 +144,7 @@ func (s *ScrapeDataDiskUsage) Help() string {
return "Collect data disk usage by memsqlctl"
}

func (s *ScrapeDataDiskUsage) Scrape(db *sqlx.DB, ch chan<- prometheus.Metric) {
func (s *ScrapeDataDiskUsage) Scrape(ctx context.Context, db *sqlx.DB, ch chan<- prometheus.Metric) {
if dataDiskUsages == nil {
return
}
Expand Down
35 changes: 22 additions & 13 deletions collector/exporter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -33,44 +34,52 @@ var (
)

type Exporter struct {
ctx context.Context
version string
dsn string
scrapers []Scraper
}

type ExporterFlags struct {
FlagSlowQuery bool
FlagSlowQueryThreshold int
FlagReplicationStatus bool
FlagDataDiskUsage bool
FlagActiveTransactionPtr bool
FlagSlowQueryExceptionHosts []string
FlagSlowQueryExceptionInfoPatterns []string
}

func New(
ctx context.Context,
version string,
dsn string,
flagSlowQuery bool,
flagSlowQueryThreshold int,
flagReplicationStatus bool,
flagDataDiskUsage bool,
flagActiveTransactionPtr bool,
slowQueryExceptionHosts []string,
slowQueryExceptionInfoPatterns []string,
flags *ExporterFlags,
) *Exporter {
scrapers := []Scraper{
&ScrapeNodes{},
}
if dsn != "" {
scrapers = append(scrapers,
&ScrapeCachedBlobs{},
&ScrapePipeline{},
)
if flagSlowQuery {
scrapers = append(scrapers, NewScrapeProcessList(flagSlowQueryThreshold, slowQueryExceptionHosts, slowQueryExceptionInfoPatterns))
if flags.FlagSlowQuery {
scrapers = append(scrapers, NewScrapeProcessList(flags.FlagSlowQueryThreshold, flags.FlagSlowQueryExceptionHosts, flags.FlagSlowQueryExceptionInfoPatterns))
}
if flagReplicationStatus {
if flags.FlagReplicationStatus {
scrapers = append(scrapers, &ScrapeReplicationStatus{})
}
if flagActiveTransactionPtr {
if flags.FlagActiveTransactionPtr {
scrapers = append(scrapers, &ScrapeActiveTransactions{})
}
}
if flagDataDiskUsage {
if flags.FlagDataDiskUsage {
scrapers = append(scrapers, &ScrapeDataDiskUsage{})
}

return &Exporter{
ctx,
version,
dsn,
scrapers,
Expand Down Expand Up @@ -114,7 +123,7 @@ func (e *Exporter) scrape(dsn string, ch chan<- prometheus.Metric) {
wg.Add(1)
go func(scraper Scraper) {
defer wg.Done()
scraper.Scrape(db, ch)
scraper.Scrape(e.ctx, db, ch)
}(scraper)
}
}
Expand Down
5 changes: 3 additions & 2 deletions collector/nodes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"encoding/json"
"os/exec"
"strconv"
Expand Down Expand Up @@ -46,7 +47,7 @@ func (s *ScrapeNodes) Help() string {
return "Collect node state by memsqlctl"
}

func (s *ScrapeNodes) Scrape(db *sqlx.DB, ch chan<- prometheus.Metric) {
func (s *ScrapeNodes) Scrape(ctx context.Context, db *sqlx.DB, ch chan<- prometheus.Metric) {
errorMetric := func() {
ch <- prometheus.MustNewConstMetric(
nodeStateDesc, prometheus.GaugeValue, float64(0),
Expand All @@ -60,7 +61,7 @@ func (s *ScrapeNodes) Scrape(db *sqlx.DB, ch chan<- prometheus.Metric) {
)
}

out, err := exec.Command("/usr/bin/memsqlctl", "list-nodes", "--json", "--yes").Output()
out, err := exec.CommandContext(ctx, "/usr/bin/memsqlctl", "list-nodes", "--json", "--yes").Output()
if err != nil {
log.ErrorLogger.Errorf("scraping command failed: command='/usr/bin/memsqlctl list-nodes --json --yes' error=%v", err)
errorMetric()
Expand Down
68 changes: 68 additions & 0 deletions collector/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package collector

import (
"context"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"singlestore_exporter/log"
)

type PipelineState struct {
DatabaseName string `db:"DATABASE_NAME"`
PipelineName string `db:"PIPELINE_NAME"`
State string `db:"STATE"`
ErrorCount int `db:"ERROR_COUNT"`
}

const (
pipline = "pipeline"

infoSchemaPipelineStateQuery = `SELECT p.DATABASE_NAME, p.PIPELINE_NAME, p.STATE
FROM information_schema.PIPELINES p`
)

var (
pipelineStateDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, pipline, "state"),
"The state of the pipeline. Running = 0, Stopped = 1, Error = 2",
[]string{"database", "pipeline"},
nil,
)

pipelineStateMap = map[string]float64{
"Running": 0,
"Stopped": 1,
"Error": 2,
}
)

type ScrapePipeline struct{}

func (s *ScrapePipeline) Help() string {
return "Collect metrics from information_schema.PIPELINES"
}

func (s *ScrapePipeline) Scrape(ctx context.Context, db *sqlx.DB, ch chan<- prometheus.Metric) {
if db == nil {
return
}

rows := make([]PipelineState, 0)
if err := db.SelectContext(ctx, &rows, infoSchemaPipelineStateQuery); err != nil {
log.ErrorLogger.Errorf("scraping query failed: query=%s error=%v", infoSchemaPipelineStateQuery, err)
return
}

for _, row := range rows {
if state, exists := pipelineStateMap[row.State]; exists {
ch <- prometheus.MustNewConstMetric(
pipelineStateDesc, prometheus.GaugeValue, state,
row.DatabaseName,
row.PipelineName,
)
} else {
log.ErrorLogger.Errorf("unknown pipeline state: %s", row.State)
continue
}
}
}
5 changes: 3 additions & 2 deletions collector/processlist.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"database/sql"
"time"

Expand Down Expand Up @@ -89,13 +90,13 @@ func (s *ScrapeProcessList) Help() string {
return "Collect metrics from information_schema.PROCESSLIST"
}

func (s *ScrapeProcessList) Scrape(db *sqlx.DB, ch chan<- prometheus.Metric) {
func (s *ScrapeProcessList) Scrape(ctx context.Context, db *sqlx.DB, ch chan<- prometheus.Metric) {
if db == nil {
return
}

processList := make([]Process, 0)
if err := db.Select(&processList, s.Query); err != nil {
if err := db.SelectContext(ctx, &processList, s.Query); err != nil {
log.ErrorLogger.Errorf("scraping query failed: query=%s error=%v", s.Query, err)
return
}
Expand Down
5 changes: 3 additions & 2 deletions collector/replication_status.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"database/sql"
"strconv"

Expand Down Expand Up @@ -50,13 +51,13 @@ func (s *ScrapeReplicationStatus) Help() string {
return "Collect metrics from information_schema.MV_REPLICATION_STATUS"
}

func (s *ScrapeReplicationStatus) Scrape(db *sqlx.DB, ch chan<- prometheus.Metric) {
func (s *ScrapeReplicationStatus) Scrape(ctx context.Context, db *sqlx.DB, ch chan<- prometheus.Metric) {
if db == nil {
return
}

rows := make([]ReplicationStatus, 0)
if err := db.Select(&rows, infoSchemaReplicationStatusQuery); err != nil {
if err := db.SelectContext(ctx, &rows, infoSchemaReplicationStatusQuery); err != nil {
log.ErrorLogger.Errorf("scraping query failed: query=%s error=%v", infoSchemaReplicationStatusQuery, err)
return
}
Expand Down
3 changes: 2 additions & 1 deletion collector/scraper.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package collector

import (
"context"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
)

type Scraper interface {
Scrape(db *sqlx.DB, ch chan<- prometheus.Metric)
Scrape(ctx context.Context, db *sqlx.DB, ch chan<- prometheus.Metric)
}
Loading

0 comments on commit 0888921

Please sign in to comment.