diff --git a/.promu.yml b/.promu.yml index b3f6bd32ba..f4c734b094 100644 --- a/.promu.yml +++ b/.promu.yml @@ -2,6 +2,7 @@ go: # Whenever the Go version is updated here, # .circle/config.yml should also be updated. version: 1.23 + cgo: true repository: path: github.com/prometheus/alertmanager build: diff --git a/config/notifiers.go b/config/notifiers.go index f83bf8d843..74ade03d57 100644 --- a/config/notifiers.go +++ b/config/notifiers.go @@ -1013,7 +1013,7 @@ type KafkaConfig struct { BootstrapServers string `yaml:"bootstrap_servers" json:"bootstrap_servers"` Topic string `yaml:"topic" json:"topic"` ExtrasConfigs *map[string]string `yaml:"extras_configs" json:"extras_configs"` - NumberOfPartition int `yaml:"number_of_partitions" json:"number_of_partitions"` + NumberOfPartition int32 `yaml:"number_of_partitions" json:"number_of_partitions"` } // UnmarshalYAML implements the yaml.Unmarshaler interface. diff --git a/notify/kafka/kafka.go b/notify/kafka/kafka.go index 0b1b2ea750..b3b58091b3 100644 --- a/notify/kafka/kafka.go +++ b/notify/kafka/kafka.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "log/slog" + "sync" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/notify" @@ -27,15 +28,15 @@ import ( ckafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) -var nextPartition = 0 - // Notifier implements a Notifier for Discord notifications. type Notifier struct { - conf *config.KafkaConfig - tmpl *template.Template - logger *slog.Logger - retrier *notify.Retrier - producer *ckafka.Producer + conf *config.KafkaConfig + tmpl *template.Template + logger *slog.Logger + retrier *notify.Retrier + producer *ckafka.Producer + partitionIndex int32 + partitionIndexMutex sync.Mutex } type KafkaMessage struct { @@ -67,11 +68,22 @@ func New(c *config.KafkaConfig, l *slog.Logger) (*Notifier, error) { conf: c, logger: l, producer: p, + partitionIndex: 0, } return n, nil } +func (n *Notifier) NextPartition() { + n.partitionIndexMutex.Lock() + defer n.partitionIndexMutex.Unlock() + n.partitionIndex = (n.partitionIndex + 1) % n.conf.NumberOfPartition +} + +func (n *Notifier) GetPartition() int32 { + return n.partitionIndex +} + // Notify implements the Notifier interface. func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) { n.logger.Info("Sending alert to Kafka") @@ -89,11 +101,7 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) } if unflushed := n.producer.Flush(1000); unflushed == 0 { - nextPartition++ - if nextPartition == n.conf.NumberOfPartition { - nextPartition = 0 - } - + n.NextPartition() n.logger.Info("Successfully produced alert") return false, nil } @@ -103,7 +111,7 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) func (n *Notifier) Produce(ctx context.Context, topic string, key string, value []byte) error { return n.producer.Produce(&ckafka.Message{ - TopicPartition: ckafka.TopicPartition{Topic: &topic, Partition: int32(nextPartition)}, + TopicPartition: ckafka.TopicPartition{Topic: &topic, Partition: n.GetPartition()}, Key: []byte(key), Value: value, }, nil);