diff --git a/core/internal/httpserver/prometheus.go b/core/internal/httpserver/prometheus.go index 92456f50..548eba78 100644 --- a/core/internal/httpserver/prometheus.go +++ b/core/internal/httpserver/prometheus.go @@ -3,6 +3,7 @@ package httpserver import ( "net/http" "strconv" + "strings" "github.com/prometheus/client_golang/prometheus" @@ -34,7 +35,7 @@ var ( Name: "burrow_kafka_consumer_current_offset", Help: "Latest offset that Burrow is storing for this partition", }, - []string{"cluster", "consumer_group", "topic", "partition"}, + []string{"cluster", "consumer_group", "topic", "partition", "owner"}, ) consumerPartitionLagGauge = promauto.NewGaugeVec( @@ -42,7 +43,7 @@ var ( Name: "burrow_kafka_consumer_partition_lag", Help: "Number of messages the consumer group is behind by for a partition as reported by Burrow", }, - []string{"cluster", "consumer_group", "topic", "partition"}, + []string{"cluster", "consumer_group", "topic", "partition", "owner"}, ) topicPartitionOffsetGauge = promauto.NewGaugeVec( @@ -86,6 +87,7 @@ func (hc *Coordinator) handlePrometheusMetrics() http.HandlerFunc { "consumer_group": consumer, "topic": partition.Topic, "partition": strconv.FormatInt(int64(partition.Partition), 10), + "owner": strings.Replace(partition.Owner, "/", "", -1), } consumerPartitionCurrentOffset.With(labels).Set(float64(partition.End.Offset)) diff --git a/core/internal/httpserver/prometheus_test.go b/core/internal/httpserver/prometheus_test.go index 2b2eed03..360b411c 100644 --- a/core/internal/httpserver/prometheus_test.go +++ b/core/internal/httpserver/prometheus_test.go @@ -72,6 +72,7 @@ func TestHttpServer_handlePrometheusMetrics(t *testing.T) { End: &protocol.ConsumerOffset{ Offset: 22663, }, + Owner: "/1.1.1.1", }, { Topic: "testtopic", @@ -82,6 +83,7 @@ func TestHttpServer_handlePrometheusMetrics(t *testing.T) { End: &protocol.ConsumerOffset{ Offset: 2488, }, + Owner: "/1.1.1.1", }, { Topic: "testtopic1", @@ -92,6 +94,7 @@ func TestHttpServer_handlePrometheusMetrics(t *testing.T) { End: &protocol.ConsumerOffset{ Offset: 99888, }, + Owner: "/1.1.1.1", }, { Topic: "incomplete", @@ -102,6 +105,7 @@ func TestHttpServer_handlePrometheusMetrics(t *testing.T) { End: &protocol.ConsumerOffset{ Offset: 5335, }, + Owner: "/1.1.1.1", }, }, TotalPartitions: 2134, @@ -139,13 +143,13 @@ func TestHttpServer_handlePrometheusMetrics(t *testing.T) { assert.Contains(t, promExp, `burrow_kafka_consumer_status{cluster="testcluster",consumer_group="testgroup"} 1`) assert.Contains(t, promExp, `burrow_kafka_consumer_lag_total{cluster="testcluster",consumer_group="testgroup"} 2345`) - assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic"} 100`) - assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="1",topic="testtopic"} 10`) - assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic1"} 50`) + assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",owner="1.1.1.1",partition="0",topic="testtopic"} 100`) + assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",owner="1.1.1.1",partition="1",topic="testtopic"} 10`) + assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",owner="1.1.1.1",partition="0",topic="testtopic1"} 50`) - assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic"} 22663`) - assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="1",topic="testtopic"} 2488`) - assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic1"} 99888`) + assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",owner="1.1.1.1",partition="0",topic="testtopic"} 22663`) + assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",owner="1.1.1.1",partition="1",topic="testtopic"} 2488`) + assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",owner="1.1.1.1",partition="0",topic="testtopic1"} 99888`) assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="0",topic="testtopic"} 6556`) assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="1",topic="testtopic"} 5566`)