Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scrape admin requests #52

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 182 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package main

import (
"bufio"
"bytes"
"flag"
"fmt"
"io"
"math"
"net"
"net/http"
Expand All @@ -26,10 +28,11 @@ const (
)

type Exporter struct {
server string
timeout time.Duration
server_stats bool
logger log.Logger
server string
timeout time.Duration
server_stats bool
admin_requests bool
logger log.Logger

up *prometheus.Desc
startTime *prometheus.Desc
Expand Down Expand Up @@ -84,15 +87,21 @@ type Exporter struct {
serverMemcachedTimeout *prometheus.Desc
serverMemcachedSoftTKO *prometheus.Desc
serverMemcachedHardTKO *prometheus.Desc
adminRequestVersion *prometheus.Desc
adminRequestConfigAge *prometheus.Desc
adminRequestConfigFile *prometheus.Desc
adminRequestHostId *prometheus.Desc
adminRequestConfigMD5Digest *prometheus.Desc
}

// NewExporter returns an initialized exporter.
func NewExporter(server string, timeout time.Duration, server_stats bool, logger log.Logger) *Exporter {
func NewExporter(server string, timeout time.Duration, server_stats bool, admin_requests bool, logger log.Logger) *Exporter {
return &Exporter{
server: server,
timeout: timeout,
server_stats: server_stats,
logger: logger,
server: server,
timeout: timeout,
server_stats: server_stats,
admin_requests: admin_requests,
logger: logger,

up: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "up"),
Expand Down Expand Up @@ -413,6 +422,36 @@ func NewExporter(server string, timeout time.Duration, server_stats bool, logger
[]string{"server"},
nil,
),
adminRequestVersion: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "admin_request_version"),
"Version string of the build (same string as returned by version).",
[]string{"version"},
nil,
),
adminRequestConfigAge: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "admin_request_config_age"),
"How long, in seconds, since last config reload.",
nil,
nil,
),
adminRequestConfigFile: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "admin_request_config_file"),
"Config file location (error if configured from string).",
[]string{"file"},
nil,
),
adminRequestHostId: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "admin_request_host_id"),
"Hostid of this mcrouter instance, an unsigned 32-bit integer in decimal.",
[]string{"host_id"},
nil,
),
adminRequestConfigMD5Digest: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "admin_request_config_md5_digest"),
"Current config's md5 hash. Note that this only specifies the main config file's md5 and ignores any additional tracked files.",
[]string{"hash"},
nil,
),
}
}

Expand Down Expand Up @@ -473,6 +512,14 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
ch <- e.serverMemcachedSoftTKO
ch <- e.serverMemcachedHardTKO
}

if e.admin_requests {
ch <- e.adminRequestVersion
ch <- e.adminRequestConfigAge
ch <- e.adminRequestConfigFile
ch <- e.adminRequestHostId
ch <- e.adminRequestConfigMD5Digest
}
}

// Collect fetches the statistics from the configured mcrouter server, and
Expand Down Expand Up @@ -632,6 +679,65 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
e.serverMemcachedHardTKO, prometheus.GaugeValue, e.parse(metrics, "hard_tko"), server)
}
}

if e.admin_requests {
version, err := getAdminRequest(conn, "__mcrouter__.version")
if err != nil {
ch <- prometheus.MustNewConstMetric(e.up, prometheus.GaugeValue, 0)
level.Error(e.logger).Log("msg", "Failed to collect version", "err", err)
return
}

ch <- prometheus.MustNewConstMetric(
e.adminRequestVersion, prometheus.GaugeValue, 1, string(version))

configAgeStr, err := getAdminRequest(conn, "__mcrouter__.config_age")
if err != nil {
ch <- prometheus.MustNewConstMetric(e.up, prometheus.GaugeValue, 0)
level.Error(e.logger).Log("msg", "Failed to collect config age", "err", err)
return
}

configAge, err := strconv.ParseFloat(string(configAgeStr), 64)
if err != nil {
ch <- prometheus.MustNewConstMetric(e.up, prometheus.GaugeValue, 0)
level.Error(e.logger).Log("msg", "Failed to collect config age", "err", err)
return
}

ch <- prometheus.MustNewConstMetric(
e.adminRequestConfigAge, prometheus.GaugeValue, configAge)

configFile, err := getAdminRequest(conn, "__mcrouter__.config_file")
if err != nil {
ch <- prometheus.MustNewConstMetric(e.up, prometheus.GaugeValue, 0)
level.Error(e.logger).Log("msg", "Failed to collect config file", "err", err)
return
}

ch <- prometheus.MustNewConstMetric(
e.adminRequestConfigFile, prometheus.GaugeValue, 1, string(configFile))

hostId, err := getAdminRequest(conn, "__mcrouter__.hostid")
if err != nil {
ch <- prometheus.MustNewConstMetric(e.up, prometheus.GaugeValue, 0)
level.Error(e.logger).Log("msg", "Failed to collect host id", "err", err)
return
}

ch <- prometheus.MustNewConstMetric(
e.adminRequestHostId, prometheus.GaugeValue, 1, string(hostId))

config_md5_digest, err := getAdminRequest(conn, "__mcrouter__.config_md5_digest")
if err != nil {
ch <- prometheus.MustNewConstMetric(e.up, prometheus.GaugeValue, 0)
level.Error(e.logger).Log("msg", "Failed to collect admin config_md5_digest from mcrouter", "err", err)
return
}

ch <- prometheus.MustNewConstMetric(
e.adminRequestConfigMD5Digest, prometheus.GaugeValue, 1, string(config_md5_digest))
}
}

// Parse a string into a 64 bit float suitable for Prometheus
Expand Down Expand Up @@ -779,6 +885,71 @@ func getServerStats(conn net.Conn) (map[string]map[string]string, error) {
return m, nil
}

func getAdminRequest(conn net.Conn, request string) ([]byte, error) {
var value []byte
fmt.Fprintf(conn, "get "+request+"\r\n")
reader := bufio.NewReader(conn)

for {
line, err := reader.ReadString('\n')
if err != nil {
return nil, err
}

if line == "END\r\n" {
break
}

it := new(item)
size, err := scanGetResponseLine(line, it)
if err != nil {
return nil, err
}

it.Value = make([]byte, size+2)
_, err = io.ReadFull(reader, it.Value)
if err != nil {
it.Value = nil
return nil, err
}
if !bytes.HasSuffix(it.Value, []byte("\r\n")) {
it.Value = nil
return nil, fmt.Errorf("memcache: corrupt get result read")
}

value = it.Value[:size]
}

return value, nil
}

func scanGetResponseLine(line string, it *item) (size int, err error) {
pattern := "VALUE %s %d %d\r\n"
dest := []interface{}{&it.Key, &it.Flags, &size}
n, err := fmt.Sscanf(line, pattern, dest...)
if err != nil || n != len(dest) {
return -1, fmt.Errorf("memcache: unexpected line in get response: %q", line)
}
return size, nil
}

type item struct {
// Key is the Item's key (250 bytes maximum).
Key string

// Value is the Item's value.
Value []byte

// Flags are server-opaque flags whose semantics are entirely
// up to the app.
Flags uint32

// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
Expiration int32
}

func main() {
var (
address = flag.String("mcrouter.address", "localhost:5000", "mcrouter server TCP address (tcp4/tcp6) or UNIX socket path")
Expand All @@ -787,6 +958,7 @@ func main() {
listenAddress = flag.String("web.listen-address", ":9442", "Address to listen on for web interface and telemetry.")
metricsPath = flag.String("web.telemetry-path", "/metrics", "Path under which to expose metrics.")
serverMetrics = flag.Bool("mcrouter.server_metrics", false, "Collect per-server metrics.")
adminRequests = flag.Bool("mcrouter.admin_requests", false, "Collect admin requests.")
logLevel = flag.String(promlogflag.LevelFlagName, "info", promlogflag.LevelFlagHelp)
logFormat = flag.String(promlogflag.FormatFlagName, "logfmt", promlogflag.FormatFlagHelp)
)
Expand Down Expand Up @@ -816,7 +988,7 @@ func main() {
level.Info(logger).Log("msg", "Starting mcrouter_exporter", "version", version.Info())
level.Info(logger).Log("msg", "Build context", "build_context", version.BuildContext())

prometheus.MustRegister(NewExporter(*address, *timeout, *serverMetrics, logger))
prometheus.MustRegister(NewExporter(*address, *timeout, *serverMetrics, *adminRequests, logger))
http.Handle(*metricsPath, promhttp.Handler())
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
//nolint:errcheck
Expand Down
122 changes: 122 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,36 @@ func handleRequestServerStats(conn net.Conn, full bool) {
conn.Close()
}

func handleAdminRequest(conn net.Conn) {
buf := make([]byte, 1024)
n, err := conn.Read(buf)
if err != nil {
fmt.Println("Error reading:", err.Error())
}

var ret []byte

command := string(buf[:n])

switch command {
case "get __mcrouter__.version\r\n":
ret = []byte("VALUE __mcrouter__.version 0 15\r\n40.0.0 mcrouter\r\nEND\r\n")
case "get __mcrouter__.config_age\r\n":
ret = []byte("VALUE __mcrouter__.config_age 0 7\r\n1040469\r\nEND\r\n")
case "get __mcrouter__.config_file\r\n":
ret = []byte("VALUE __mcrouter__.config_file 0 32\r\n/opt/mcrouter/config/config.json\r\nEND\r\n")
case "get __mcrouter__.hostid\r\n":
ret = []byte("VALUE __mcrouter__.hostid 0 10\r\n4079863250\r\nEND\r\n")
case "get __mcrouter__.config_md5_digest\r\n":
ret = []byte("VALUE __mcrouter__.config_md5_digest 0 32\r\n2aa22ce671e9fdf6a7bb762f9a6cb0cc\r\nEND\r\n")
default:
ret = []byte("unknown command\r\nEND\r\n")
}

conn.Write(ret)
conn.Close()
}

func TestStatsParsing(t *testing.T) {
Convey("Given a remote mcrouter stats endpoint", t, func() {
server, client := net.Pipe()
Expand Down Expand Up @@ -138,3 +168,95 @@ func TestServerStatsParsingAfterMcrouterBootstrap(t *testing.T) {

})
}

func TestAdminRequestParsing(t *testing.T) {
Convey("Given a remote mcrouter admin request server version", t, func() {
server, client := net.Pipe()
go func() {
go handleAdminRequest(server)
}()

Convey("When scraped by our client", func() {
data, err := getAdminRequest(client, "__mcrouter__.version")
if err != nil {
t.Fatal(err)
}
Convey("It should parse the version", func() {
expected := []byte("40.0.0 mcrouter")
So(data, ShouldResemble, expected)
})
})
})

Convey("Given a remote mcrouter admin request server config_age", t, func() {
server, client := net.Pipe()
go func() {
go handleAdminRequest(server)
}()

Convey("When scraped by our client", func() {
data, err := getAdminRequest(client, "__mcrouter__.config_age")
if err != nil {
t.Fatal(err)
}
Convey("It should parse the config age", func() {
expected := []byte("1040469")
So(data, ShouldResemble, expected)
})
})
})

Convey("Given a remote mcrouter admin request server config_file", t, func() {
server, client := net.Pipe()
go func() {
go handleAdminRequest(server)
}()

Convey("When scraped by our client", func() {
data, err := getAdminRequest(client, "__mcrouter__.config_file")
if err != nil {
t.Fatal(err)
}
Convey("It should parse the config file", func() {
expected := []byte("/opt/mcrouter/config/config.json")
So(data, ShouldResemble, expected)
})
})
})

Convey("Given a remote mcrouter admin request server hostid", t, func() {
server, client := net.Pipe()
go func() {
go handleAdminRequest(server)
}()

Convey("When scraped by our client", func() {
data, err := getAdminRequest(client, "__mcrouter__.hostid")
if err != nil {
t.Fatal(err)
}
Convey("It should parse the hostid", func() {
expected := []byte("4079863250")
So(data, ShouldResemble, expected)
})
})
})

Convey("Given a remote mcrouter admin request server config_md5_digest", t, func() {
server, client := net.Pipe()
go func() {
go handleAdminRequest(server)
}()

Convey("When scraped by our client", func() {
data, err := getAdminRequest(client, "__mcrouter__.config_md5_digest")
if err != nil {
t.Fatal(err)
}
Convey("It should parse the config_md5_digest", func() {
expected := []byte("2aa22ce671e9fdf6a7bb762f9a6cb0cc")
So(data, ShouldResemble, expected)
})
})
})
}