Skip to content

Commit

Permalink
Merge branch 'main' into feat/kafka-receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
magiskboy committed Jan 13, 2025
2 parents a11a558 + 3b61ae8 commit 699d5d6
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 14 deletions.
1 change: 1 addition & 0 deletions .promu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ go:
# Whenever the Go version is updated here,
# .circle/config.yml should also be updated.
version: 1.23
cgo: true
repository:
path: github.com/prometheus/alertmanager
build:
Expand Down
2 changes: 1 addition & 1 deletion config/notifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ type KafkaConfig struct {
BootstrapServers string `yaml:"bootstrap_servers" json:"bootstrap_servers"`
Topic string `yaml:"topic" json:"topic"`
ExtrasConfigs *map[string]string `yaml:"extras_configs" json:"extras_configs"`
NumberOfPartition int `yaml:"number_of_partitions" json:"number_of_partitions"`
NumberOfPartition int32 `yaml:"number_of_partitions" json:"number_of_partitions"`
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
Expand Down
34 changes: 21 additions & 13 deletions notify/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"log/slog"
"sync"

"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/notify"
Expand All @@ -27,15 +28,15 @@ import (
ckafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

var nextPartition = 0

// Notifier implements a Notifier for Discord notifications.
type Notifier struct {
conf *config.KafkaConfig
tmpl *template.Template
logger *slog.Logger
retrier *notify.Retrier
producer *ckafka.Producer
conf *config.KafkaConfig
tmpl *template.Template
logger *slog.Logger
retrier *notify.Retrier
producer *ckafka.Producer
partitionIndex int32
partitionIndexMutex sync.Mutex
}

type KafkaMessage struct {
Expand Down Expand Up @@ -67,11 +68,22 @@ func New(c *config.KafkaConfig, l *slog.Logger) (*Notifier, error) {
conf: c,
logger: l,
producer: p,
partitionIndex: 0,
}

return n, nil
}

func (n *Notifier) NextPartition() {
n.partitionIndexMutex.Lock()
defer n.partitionIndexMutex.Unlock()
n.partitionIndex = (n.partitionIndex + 1) % n.conf.NumberOfPartition
}

func (n *Notifier) GetPartition() int32 {
return n.partitionIndex
}

// Notify implements the Notifier interface.
func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
n.logger.Info("Sending alert to Kafka")
Expand All @@ -89,11 +101,7 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
}

if unflushed := n.producer.Flush(1000); unflushed == 0 {
nextPartition++
if nextPartition == n.conf.NumberOfPartition {
nextPartition = 0
}

n.NextPartition()
n.logger.Info("Successfully produced alert")
return false, nil
}
Expand All @@ -103,7 +111,7 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error)

func (n *Notifier) Produce(ctx context.Context, topic string, key string, value []byte) error {
return n.producer.Produce(&ckafka.Message{
TopicPartition: ckafka.TopicPartition{Topic: &topic, Partition: int32(nextPartition)},
TopicPartition: ckafka.TopicPartition{Topic: &topic, Partition: n.GetPartition()},
Key: []byte(key),
Value: value,
}, nil);
Expand Down

0 comments on commit 699d5d6

Please sign in to comment.