Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor collectors to not depend of kingpin to be configured #774

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions collector/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,23 @@ const (
timeoutParam = `lock_wait_timeout=%d`
)

// Tunable flags.
var (
exporterLockTimeout = kingpin.Flag(
// Config holds configuration options for the exporter.
type Config struct {
LockTimeout int
SlowLogFilter bool
}

// RegisterFlags adds flags to configure the exporter.
func (c *Config) RegisterFlags(application *kingpin.Application) {
application.Flag(
"exporter.lock_wait_timeout",
"Set a lock_wait_timeout (in seconds) on the connection to avoid long metadata locking.",
).Default("2").Int()
slowLogFilter = kingpin.Flag(
).Default("2").IntVar(&c.LockTimeout)
application.Flag(
"exporter.log_slow_filter",
"Add a log_slow_filter to avoid slow query logging of scrapes. NOTE: Not supported by Oracle MySQL.",
).Default("false").Bool()
)
).Default("false").BoolVar(&c.SlowLogFilter)
}

// metric definition
var (
Expand Down Expand Up @@ -86,11 +92,11 @@ type Exporter struct {
}

// New returns a new MySQL exporter for the provided DSN.
func New(ctx context.Context, dsn string, scrapers []Scraper, logger *slog.Logger) *Exporter {
func New(ctx context.Context, dsn string, scrapers []Scraper, logger *slog.Logger, cfg Config) *Exporter {
// Setup extra params for the DSN, default to having a lock timeout.
dsnParams := []string{fmt.Sprintf(timeoutParam, *exporterLockTimeout)}
dsnParams := []string{fmt.Sprintf(timeoutParam, cfg.LockTimeout)}

if *slowLogFilter {
if cfg.SlowLogFilter {
dsnParams = append(dsnParams, sessionSettingsParam)
}

Expand Down
10 changes: 10 additions & 0 deletions collector/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"testing"

"github.com/alecthomas/kingpin/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
Expand All @@ -30,13 +31,22 @@ func TestExporter(t *testing.T) {
t.Skip("-short is passed, skipping test")
}

var exporterConfig Config
kingpinApp := kingpin.New("TestExporter", "")
exporterConfig.RegisterFlags(kingpinApp)
_, err := kingpinApp.Parse([]string{})
if err != nil {
t.Fatal(err)
}

exporter := New(
context.Background(),
dsn,
[]Scraper{
ScrapeGlobalStatus{},
},
promslog.NewNopLogger(),
exporterConfig,
)

convey.Convey("Metrics describing", t, func() {
Expand Down
45 changes: 25 additions & 20 deletions collector/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,6 @@ const (
heartbeatQuery = "SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(%s), server_id from `%s`.`%s`"
)

var (
collectHeartbeatDatabase = kingpin.Flag(
"collect.heartbeat.database",
"Database from where to collect heartbeat data",
).Default("heartbeat").String()
collectHeartbeatTable = kingpin.Flag(
"collect.heartbeat.table",
"Table from where to collect heartbeat data",
).Default("heartbeat").String()
collectHeartbeatUtc = kingpin.Flag(
"collect.heartbeat.utc",
"Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`)",
).Bool()
)

// Metric descriptors.
var (
HeartbeatStoredDesc = prometheus.NewDesc(
Expand All @@ -74,7 +59,11 @@ var (
// server_id int unsigned NOT NULL PRIMARY KEY,
//
// );
type ScrapeHeartbeat struct{}
type ScrapeHeartbeat struct {
Database string
Table string
UTC bool
}

// Name of the Scraper. Should be unique.
func (ScrapeHeartbeat) Name() string {
Expand All @@ -91,18 +80,34 @@ func (ScrapeHeartbeat) Version() float64 {
return 5.1
}

// RegisterFlags adds flags to configure the Scraper.
func (s *ScrapeHeartbeat) RegisterFlags(application *kingpin.Application) {
application.Flag(
"collect.heartbeat.database",
"Database from where to collect heartbeat data",
).Default("heartbeat").StringVar(&s.Database)
application.Flag(
"collect.heartbeat.table",
"Table from where to collect heartbeat data",
).Default("heartbeat").StringVar(&s.Table)
application.Flag(
"collect.heartbeat.utc",
"Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`)",
).BoolVar(&s.UTC)
}

// nowExpr returns a current timestamp expression.
func nowExpr() string {
if *collectHeartbeatUtc {
func (s ScrapeHeartbeat) nowExpr() string {
if s.UTC {
return "UTC_TIMESTAMP(6)"
}
return "NOW(6)"
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeHeartbeat) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
func (s ScrapeHeartbeat) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
db := instance.getDB()
query := fmt.Sprintf(heartbeatQuery, nowExpr(), *collectHeartbeatDatabase, *collectHeartbeatTable)
query := fmt.Sprintf(heartbeatQuery, s.nowExpr(), s.Database, s.Table)
heartbeatRows, err := db.QueryContext(ctx, query)
if err != nil {
return err
Expand Down
9 changes: 7 additions & 2 deletions collector/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ var ScrapeHeartbeatTestCases = []ScrapeHeartbeatTestCase{
func TestScrapeHeartbeat(t *testing.T) {
for _, tt := range ScrapeHeartbeatTestCases {
t.Run(fmt.Sprint(tt.Args), func(t *testing.T) {
_, err := kingpin.CommandLine.Parse(tt.Args)
scraper := ScrapeHeartbeat{}

app := kingpin.New("TestScrapeHeartbeat", "")
scraper.RegisterFlags(app)

_, err := app.Parse(tt.Args)
if err != nil {
t.Fatal(err)
}
Expand All @@ -73,7 +78,7 @@ func TestScrapeHeartbeat(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeHeartbeat{}).Scrape(context.Background(), inst, ch, promslog.NewNopLogger()); err != nil {
if err = scraper.Scrape(context.Background(), inst, ch, promslog.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
47 changes: 26 additions & 21 deletions collector/info_schema_processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,6 @@ const infoSchemaProcesslistQuery = `
GROUP BY user, host, command, state
`

// Tunable flags.
var (
processlistMinTime = kingpin.Flag(
"collect.info_schema.processlist.min_time",
"Minimum time a thread must be in each state to be counted",
).Default("0").Int()
processesByUserFlag = kingpin.Flag(
"collect.info_schema.processlist.processes_by_user",
"Enable collecting the number of processes by user",
).Default("true").Bool()
processesByHostFlag = kingpin.Flag(
"collect.info_schema.processlist.processes_by_host",
"Enable collecting the number of processes by host",
).Default("true").Bool()
)

// Metric descriptors.
var (
processlistCountDesc = prometheus.NewDesc(
Expand All @@ -78,7 +62,11 @@ var (
)

// ScrapeProcesslist collects from `information_schema.processlist`.
type ScrapeProcesslist struct{}
type ScrapeProcesslist struct {
ProcessListMinTime int
ProcessesByUserFlag bool
ProcessesByHostFlag bool
}

// Name of the Scraper. Should be unique.
func (ScrapeProcesslist) Name() string {
Expand All @@ -95,11 +83,27 @@ func (ScrapeProcesslist) Version() float64 {
return 5.1
}

// RegisterFlags adds flags to configure the Scraper.
func (s *ScrapeProcesslist) RegisterFlags(application *kingpin.Application) {
application.Flag(
"collect.info_schema.processlist.min_time",
"Minimum time a thread must be in each state to be counted",
).Default("0").IntVar(&s.ProcessListMinTime)
application.Flag(
"collect.info_schema.processlist.processes_by_user",
"Enable collecting the number of processes by user",
).Default("true").BoolVar(&s.ProcessesByUserFlag)
application.Flag(
"collect.info_schema.processlist.processes_by_host",
"Enable collecting the number of processes by host",
).Default("true").BoolVar(&s.ProcessesByHostFlag)
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeProcesslist) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
func (s ScrapeProcesslist) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
processQuery := fmt.Sprintf(
infoSchemaProcesslistQuery,
*processlistMinTime,
s.ProcessListMinTime,
)
db := instance.getDB()
processlistRows, err := db.QueryContext(ctx, processQuery)
Expand Down Expand Up @@ -162,12 +166,13 @@ func (ScrapeProcesslist) Scrape(ctx context.Context, instance *instance, ch chan
}
}

if *processesByHostFlag {
if s.ProcessesByHostFlag {
for _, host := range sortedMapKeys(stateHostCounts) {
ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(stateHostCounts[host]), host)
}
}
if *processesByUserFlag {

if s.ProcessesByUserFlag {
for _, user := range sortedMapKeys(stateUserCounts) {
ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(stateUserCounts[user]), user)
}
Expand Down
8 changes: 6 additions & 2 deletions collector/info_schema_processlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import (
)

func TestScrapeProcesslist(t *testing.T) {
_, err := kingpin.CommandLine.Parse([]string{
scraper := ScrapeProcesslist{}
app := kingpin.New("TestScrapeProcesslist", "")
scraper.RegisterFlags(app)

_, err := app.Parse([]string{
"--collect.info_schema.processlist.processes_by_user",
"--collect.info_schema.processlist.processes_by_host",
})
Expand Down Expand Up @@ -57,7 +61,7 @@ func TestScrapeProcesslist(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeProcesslist{}).Scrape(context.Background(), inst, ch, promslog.NewNopLogger()); err != nil {
if err = scraper.Scrape(context.Background(), inst, ch, promslog.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
26 changes: 14 additions & 12 deletions collector/info_schema_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@ const (
`
)

// Tunable flags.
var (
tableSchemaDatabases = kingpin.Flag(
"collect.info_schema.tables.databases",
"The list of databases to collect table stats for, or '*' for all",
).Default("*").String()
)

// Metric descriptors.
var (
infoSchemaTablesVersionDesc = prometheus.NewDesc(
Expand All @@ -78,7 +70,9 @@ var (
)

// ScrapeTableSchema collects from `information_schema.tables`.
type ScrapeTableSchema struct{}
type ScrapeTableSchema struct {
Databases string
}

// Name of the Scraper. Should be unique.
func (ScrapeTableSchema) Name() string {
Expand All @@ -95,11 +89,19 @@ func (ScrapeTableSchema) Version() float64 {
return 5.1
}

// RegisterFlags adds flags to configure the Scraper.
func (s *ScrapeTableSchema) RegisterFlags(application *kingpin.Application) {
application.Flag(
"collect.info_schema.tables.databases",
"The list of databases to collect table stats for, or '*' for all",
).Default("*").StringVar(&s.Databases)
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeTableSchema) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
func (s ScrapeTableSchema) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
var dbList []string
db := instance.getDB()
if *tableSchemaDatabases == "*" {
if s.Databases == "*" {
dbListRows, err := db.QueryContext(ctx, dbListQuery)
if err != nil {
return err
Expand All @@ -117,7 +119,7 @@ func (ScrapeTableSchema) Scrape(ctx context.Context, instance *instance, ch chan
dbList = append(dbList, database)
}
} else {
dbList = strings.Split(*tableSchemaDatabases, ",")
dbList = strings.Split(s.Databases, ",")
}

for _, database := range dbList {
Expand Down
24 changes: 13 additions & 11 deletions collector/mysql_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,6 @@ const mysqlUserQuery = `
FROM mysql.user
`

// Tunable flags.
var (
userPrivilegesFlag = kingpin.Flag(
"collect.mysql.user.privileges",
"Enable collecting user privileges from mysql.user",
).Default("false").Bool()
)

var (
labelNames = []string{"mysql_user", "hostmask"}
)
Expand All @@ -102,7 +94,9 @@ var (
)

// ScrapeUser collects from `information_schema.processlist`.
type ScrapeUser struct{}
type ScrapeUser struct {
Privileges bool
}

// Name of the Scraper. Should be unique.
func (ScrapeUser) Name() string {
Expand All @@ -119,8 +113,16 @@ func (ScrapeUser) Version() float64 {
return 5.1
}

// RegisterFlags adds flags to configure the Scraper.
func (s *ScrapeUser) RegisterFlags(application *kingpin.Application) {
application.Flag(
"collect.mysql.user.privileges",
"Enable collecting user privileges from mysql.user",
).Default("false").BoolVar(&s.Privileges)
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeUser) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
func (s ScrapeUser) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
db := instance.getDB()
var (
userRows *sql.Rows
Expand Down Expand Up @@ -214,7 +216,7 @@ func (ScrapeUser) Scrape(ctx context.Context, instance *instance, ch chan<- prom
return err
}

if *userPrivilegesFlag {
if s.Privileges {
userCols, err := userRows.Columns()
if err != nil {
return err
Expand Down
Loading