diff --git a/.github/workflows/action.yaml b/.github/workflows/action.yaml
new file mode 100644
index 00000000..36b3ed51
--- /dev/null
+++ b/.github/workflows/action.yaml
@@ -0,0 +1,50 @@
+env:
+ BASE64manifest: IyEvYmluL3NoCgpDTUQ9JChpZiBidWlsZGFoID4vZGV2L251bGw7IHRoZW4gZWNobyBidWlsZGFoOyBlbGlmIHNlYWxvcyA+L2Rldi9udWxsOyB0aGVuIGVjaG8gc2VhbG9zOyBmaSkKTUY9Im1mOiQoZGF0ZSArJUYpIgoKUkVQTz0iJHsxOi1kb2NrZXIuaW8vYml0bmFtaS9tZXRyaWNzLXNlcnZlcjpkb2NrZXIuaW8vbXVpY29kZXIvbWV0cmljcy1zZXJ2ZXJ9IgpUQUdTPSIkezI6LTAuNi4zfSIKVEFHPSIkezM6LSRUQUdTfSIKCmlmIFsgIiR7UkVQTyU6Kn0iICE9ICIkUkVQTyIgXTsgdGhlbgogIGlmIFsgIiR7VEFHUyUsKn0iICE9ICIkVEFHUyIgXTsgdGhlbgogICAgZWNobyAiJFRBR1MiIHwgc2VkICJzfix+XG5+ZyIgfCB3aGlsZSByZWFkIC1yIHRhZzsgZG8KICAgICAgZWNobyAiJHtSRVBPJToqfTokdGFnIgogICAgZG9uZSB8ICRDTUQgcHVsbAogIGVsc2UKICAgICRDTUQgcHVsbCAtLXBvbGljeT1hbHdheXMgLS1wbGF0Zm9ybT1saW51eC9hbWQ2NCAiJHtSRVBPJToqfTokVEFHUyIKICAgICRDTUQgdGFnICIke1JFUE8lOip9OiRUQUdTIiAiJHtSRVBPIyo6fTokVEFHUy1hbWQ2NCIKICAgICRDTUQgcHVsbCAtLXBvbGljeT1hbHdheXMgLS1wbGF0Zm9ybT1saW51eC9hcm02NCAiJHtSRVBPJToqfTokVEFHUyIKICAgICRDTUQgdGFnICIke1JFUE8lOip9OiRUQUdTIiAiJHtSRVBPIyo6fTokVEFHUy1hcm02NCIKICAgIFRBR1M9IiRUQUdTLWFtZDY0LCRUQUdTLWFybTY0IgogIGZpCmZpCgplY2hvICIkVEFHUyIgfCBzZWQgInN+LH5cbn5nIiB8IHdoaWxlIHJlYWQgLXIgdGFnOyBkbwogIGVjaG8gIiR7UkVQTyMqOn06JHRhZyIKZG9uZSB8IHhhcmdzICRDTUQgbWFuaWZlc3QgY3JlYXRlIC0tYWxsICIkTUYiCiRDTUQgbWFuaWZlc3QgcHVzaCAtLWFsbCAiJE1GIiAiZG9ja2VyOi8vJHtSRVBPIyo6fTokVEFHIgokQ01EIG1hbmlmZXN0IHJtICIkTUYiIHx8IHRydWUK
+jobs:
+ aio-manifest:
+ needs:
+ - build
+ runs-on: ubuntu-latest
+ steps:
+ - name: Login to DockerHub
+ uses: docker/login-action@v2
+ with:
+ password: ${{ secrets.DOCKERHUB_PASSWORD }}
+ username: ${{ secrets.DOCKERHUB_USERNAME }}
+ - name: manifest
+ run: echo ${{ env.BASE64manifest }} | base64 -d | sh -s docker.io/${{ secrets.DOCKERHUB_USERNAME }}/burrow action-amd64,action-arm64 action
+ build:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+ - name: Set up QEMU
+ uses: docker/setup-qemu-action@v2
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v2
+ - name: Login to DockerHub
+ uses: docker/login-action@v2
+ with:
+ password: ${{ secrets.DOCKERHUB_PASSWORD }}
+ username: ${{ secrets.DOCKERHUB_USERNAME }}
+ - name: Build and push
+ uses: docker/build-push-action@v3
+ with:
+ context: .
+ file: Dockerfile.${{ matrix.arch}}
+ platforms: ${{ matrix.os}}/${{ matrix.arch}}
+ provenance: false
+ pull: true
+ push: true
+ sbom: false
+ tags: ${{ secrets.DOCKERHUB_USERNAME }}/burrow:action-${{ matrix.arch}}
+ strategy:
+ matrix:
+ arch:
+ - arm64
+ - amd64
+ os:
+ - linux
+name: build
+on:
+ workflow_dispatch:
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 1440aff2..6142a2a8 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -1,6 +1,6 @@
name: CI
-on: [push, pull_request]
+on: [pull_request]
jobs:
test:
diff --git a/Dockerfile.amd64 b/Dockerfile.amd64
new file mode 100644
index 00000000..1014c3e8
--- /dev/null
+++ b/Dockerfile.amd64
@@ -0,0 +1,27 @@
+FROM quay.io/coreos/etcd:v3.3.27 as etcd
+FROM quay.io/coreos/zetcd:v0.0.5 as zetcd
+FROM edenhill/kcat:1.7.1 as kcat
+
+FROM golang:1.20-alpine as builder
+ARG git_user=muicoder
+ARG git_repo=Burrow
+ARG git_branch=action
+ENV CGO_ENABLED=0
+RUN set -ex && \
+ wget -qO- https://github.com/$git_user/$git_repo/archive/refs/heads/$git_branch.tar.gz | tar -xz && \
+ cd $git_repo-$git_branch && \
+ go get -u all && go mod verify && go mod tidy && git diff && \
+ go build -trimpath -ldflags '-s -w -extldflags "-static"' -o $GOPATH/bin/burrow && \
+ go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkboom@latest && \
+ go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkctl@latest && \
+ ls -lh $GOPATH/bin
+
+FROM alpine:edge as cached
+COPY --from=etcd /usr/local/bin/etcd* /cached/
+COPY --from=zetcd /usr/local/bin/zetcd* /cached/
+COPY --from=builder /go/bin/* /cached/
+COPY --from=kcat /usr/bin/kcat /cached/
+
+FROM alpine:3.18
+RUN apk add --no-cache curl jq wget tzdata libcurl lz4-libs zstd-libs ca-certificates
+COPY --from=cached /cached /usr/local/bin/
diff --git a/Dockerfile.arm64 b/Dockerfile.arm64
new file mode 100644
index 00000000..0704a6e2
--- /dev/null
+++ b/Dockerfile.arm64
@@ -0,0 +1,25 @@
+FROM quay.io/coreos/etcd:v3.3.27-arm64 as etcd
+FROM kbzjung359/zetcd:v0.0.5-alpine-arm64 as zetcd
+
+FROM golang:1.20-alpine as builder
+ARG git_user=muicoder
+ARG git_repo=Burrow
+ARG git_branch=action
+ENV CGO_ENABLED=0
+RUN set -ex && \
+ wget -qO- https://github.com/$git_user/$git_repo/archive/refs/heads/$git_branch.tar.gz | tar -xz && \
+ cd $git_repo-$git_branch && \
+ go get -u all && go mod verify && go mod tidy && git diff && \
+ go build -trimpath -ldflags '-s -w -extldflags "-static"' -o $GOPATH/bin/burrow && \
+ go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkboom@latest && \
+ go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkctl@latest && \
+ ls -lh $GOPATH/bin
+
+FROM alpine:edge as cached
+COPY --from=etcd /usr/local/bin/etcd* /cached/
+COPY --from=zetcd /usr/local/bin/zetcd* /cached/
+COPY --from=builder /go/bin/* /cached/
+
+FROM alpine:3.18
+RUN apk add --no-cache curl jq wget tzdata libcurl lz4-libs zstd-libs ca-certificates
+COPY --from=cached /cached /usr/local/bin/
diff --git a/config/burrow.toml b/config/burrow.toml
index ea2dc4fc..de0cf7bd 100644
--- a/config/burrow.toml
+++ b/config/burrow.toml
@@ -58,6 +58,7 @@ min-distance=1
[notifier.default]
class-name="http"
+cluster="local"
url-open="http://someservice.example.com:1467/v1/event"
interval=60
timeout=5
diff --git a/config/default-dingtalk-post.tmpl b/config/default-dingtalk-post.tmpl
new file mode 100644
index 00000000..216e5643
--- /dev/null
+++ b/config/default-dingtalk-post.tmpl
@@ -0,0 +1,52 @@
+{"msgtype": "markdown","markdown": {"title":"Kafka LagChecker", "text": "
+{{- $StatusURL := "https://pkg.go.dev/github.com/linkedin/Burrow/core/protocol#StatusConstant"}}
+{{- $FormatString := "2006-01-02 15:04:05"}}
+# Kafka: {{.Cluster}}
+ConsumerGroup: {{.Group}}{{- with .Result.Status}}
+{{- if eq . 0}}NotFound{{end}}
+{{- if eq . 1}}normal{{end}}
+{{- if eq . 2}}lagging{{end}}
+{{- if eq . 3}}abnormal{{end}}
+{{- end}}
+**Status:** Total(Partitions={{.Result.TotalPartitions}},Lag={{.Result.TotalLag}})[{{- with .Result.Status}}
+{{- if eq . 0}}NotFound{{end}}
+{{- if eq . 1}}{{.}}{{end}}
+{{- if eq . 2}}{{.}}{{end}}
+{{- if eq . 3}}{{.}}{{end}}
+{{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}}
+{{- if .Result.Maxlag|maxlag}}
+**MaxLagDetails:**
+{{- with .Result.Maxlag}}
+{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
+{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
+{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
+\tCurrentLag={{.CurrentLag}}
+\tPartition={{.Partition}}
+{{- end}}
+{{- end}}
+{{- $TotalErrors := len .Result.Partitions}}
+{{- if $TotalErrors}}
+### {{$TotalErrors}} partitions have problems
+>**CountPartitions:**
+{{- range $k,$v := .Result.Partitions|partitioncounts}}
+{{- if ne $v 0}}\n\t{{$k}}={{$v}}{{end}}
+{{- end}}
+**TopicsByStatus:**
+{{- range $k,$v := .Result.Partitions|topicsbystatus}}
+\t{{$k}}={{$v}}
+{{- end}}
+**PartitionDetails:**
+{{- range .Result.Partitions}}
+{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
+{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
+{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
+\tCurrentLag={{.CurrentLag}}
+\tPartition={{.Partition}}
+\tStart={{formattimestamp .Start.Timestamp $FormatString}}
+\t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}}
+\tEnd={{formattimestamp .End.Timestamp $FormatString}}
+\t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}}
+{{- end}}
+{{- end}}
+"
+}}
diff --git a/config/default-wecom-post.tmpl b/config/default-wecom-post.tmpl
new file mode 100644
index 00000000..cbba10a3
--- /dev/null
+++ b/config/default-wecom-post.tmpl
@@ -0,0 +1,52 @@
+{"msgtype": "markdown","markdown": {"content": "
+{{- $StatusURL := "https://pkg.go.dev/github.com/linkedin/Burrow/core/protocol#StatusConstant"}}
+{{- $FormatString := "2006-01-02 15:04:05"}}
+# Kafka: {{.Cluster}}
+ConsumerGroup: {{.Group}}{{- with .Result.Status}}
+{{- if eq . 0}}NotFound{{end}}
+{{- if eq . 1}}normal{{end}}
+{{- if eq . 2}}lagging{{end}}
+{{- if eq . 3}}abnormal{{end}}
+{{- end}}
+**Status:** Total(Partitions={{.Result.TotalPartitions}},Lag={{.Result.TotalLag}})[{{- with .Result.Status}}
+{{- if eq . 0}}NotFound{{end}}
+{{- if eq . 1}}{{.}}{{end}}
+{{- if eq . 2}}{{.}}{{end}}
+{{- if eq . 3}}{{.}}{{end}}
+{{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}}
+{{- if .Result.Maxlag|maxlag}}
+**MaxLagDetails:**
+{{- with .Result.Maxlag}}
+{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
+{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
+{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
+\tCurrentLag={{.CurrentLag}}
+\tPartition={{.Partition}}
+{{- end}}
+{{- end}}
+{{- $TotalErrors := len .Result.Partitions}}
+{{- if $TotalErrors}}
+### {{$TotalErrors}} partitions have problems
+>**CountPartitions:**
+{{- range $k,$v := .Result.Partitions|partitioncounts}}
+{{- if ne $v 0}}\n\t{{$k}}={{$v}}{{end}}
+{{- end}}
+**TopicsByStatus:**
+{{- range $k,$v := .Result.Partitions|topicsbystatus}}
+\t{{$k}}={{$v}}
+{{- end}}
+**PartitionDetails:**
+{{- range .Result.Partitions}}
+{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
+{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
+{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
+\tCurrentLag={{.CurrentLag}}
+\tPartition={{.Partition}}
+\tStart={{formattimestamp .Start.Timestamp $FormatString}}
+\t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}}
+\tEnd={{formattimestamp .End.Timestamp $FormatString}}
+\t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}}
+{{- end}}
+{{- end}}
+"
+}}
diff --git a/core/internal/helpers/coordinators.go b/core/internal/helpers/coordinators.go
index 520632c1..94e6cc10 100644
--- a/core/internal/helpers/coordinators.go
+++ b/core/internal/helpers/coordinators.go
@@ -74,6 +74,12 @@ func (m *MockModule) GetName() string {
return args.String(0)
}
+// GetCluster mocks the notifier.Module GetCluster func
+func (m *MockModule) GetCluster() string {
+ args := m.Called()
+ return args.String(0)
+}
+
// GetGroupAllowlist mocks the notifier.Module GetGroupAllowlist func
func (m *MockModule) GetGroupAllowlist() *regexp.Regexp {
args := m.Called()
diff --git a/core/internal/httpserver/config.go b/core/internal/httpserver/config.go
index 92f04532..1d21639b 100644
--- a/core/internal/httpserver/config.go
+++ b/core/internal/httpserver/config.go
@@ -213,6 +213,7 @@ func (hc *Coordinator) configNotifierHTTP(w http.ResponseWriter, r *http.Request
SendClose: viper.GetBool(configRoot + ".send-close"),
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
NoVerify: viper.GetString(configRoot + ".noverify"),
+ Cluster: viper.GetString(configRoot + ".cluster"),
},
Request: requestInfo,
})
@@ -265,6 +266,7 @@ func (hc *Coordinator) configNotifierEmail(w http.ResponseWriter, r *http.Reques
To: viper.GetString(configRoot + ".to"),
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
NoVerify: viper.GetString(configRoot + ".noverify"),
+ Cluster: viper.GetString(configRoot + ".cluster"),
},
Request: requestInfo,
})
diff --git a/core/internal/httpserver/structs.go b/core/internal/httpserver/structs.go
index b15ce9de..91d269bc 100644
--- a/core/internal/httpserver/structs.go
+++ b/core/internal/httpserver/structs.go
@@ -202,6 +202,7 @@ type httpResponseConfigModuleNotifierHTTP struct {
SendClose bool `json:"send-close"`
ExtraCa string `json:"extra-ca"`
NoVerify string `json:"noverify"`
+ Cluster string `json:"cluster"`
}
type httpResponseConfigModuleNotifierSlack struct {
@@ -238,6 +239,7 @@ type httpResponseConfigModuleNotifierEmail struct {
To string `json:"to"`
ExtraCa string `json:"extra-ca"`
NoVerify string `json:"noverify"`
+ Cluster string `json:"cluster"`
}
type httpResponseConfigModuleNotifierNull struct {
diff --git a/core/internal/notifier/coordinator.go b/core/internal/notifier/coordinator.go
index 5cab2906..53e81d0a 100644
--- a/core/internal/notifier/coordinator.go
+++ b/core/internal/notifier/coordinator.go
@@ -48,6 +48,7 @@ import (
type Module interface {
protocol.Module
GetName() string
+ GetCluster() string
GetGroupAllowlist() *regexp.Regexp
GetGroupDenylist() *regexp.Regexp
GetLogger() *zap.Logger
@@ -95,7 +96,7 @@ type Coordinator struct {
// getModuleForClass returns the correct module based on the passed className. As part of the Configure steps, if there
// is any error, it will panic with an appropriate message describing the problem.
-func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template) protocol.Module {
+func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template, cluster string) protocol.Module {
logger := app.Logger.With(
zap.String("type", "module"),
zap.String("coordinator", "notifier"),
@@ -113,6 +114,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
+ cluster: cluster,
}
case "email":
return &EmailNotifier{
@@ -123,6 +125,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
+ cluster: cluster,
}
case "null":
return &NullNotifier{
@@ -133,6 +136,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
+ cluster: cluster,
}
default:
panic("Unknown notifier className provided: " + className)
@@ -194,6 +198,8 @@ func (nc *Coordinator) Configure() {
groupAllowlist = re
}
+ cluster := viper.GetString(configRoot + ".cluster")
+
// Compile the denylist for the consumer groups to not notify for
var groupDenylist *regexp.Regexp
denylist := viper.GetString(configRoot + ".group-denylist")
@@ -227,7 +233,7 @@ func (nc *Coordinator) Configure() {
templateClose = tmpl.Templates()[0]
}
- module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose)
+ module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose, cluster)
module.Configure(name, configRoot)
nc.modules[name] = module
interval := viper.GetInt64(configRoot + ".interval")
@@ -436,6 +442,9 @@ func (nc *Coordinator) checkAndSendResponseToModules(response *protocol.Consumer
for _, genericModule := range nc.modules {
module := genericModule.(Module)
+ if module.GetCluster() != "" && response.Cluster != module.GetCluster() {
+ continue
+ }
// No allowlist means everything passes
groupAllowlist := module.GetGroupAllowlist()
groupDenylist := module.GetGroupDenylist()
diff --git a/core/internal/notifier/coordinator_test.go b/core/internal/notifier/coordinator_test.go
index 03bb60cc..cb026d38 100644
--- a/core/internal/notifier/coordinator_test.go
+++ b/core/internal/notifier/coordinator_test.go
@@ -472,6 +472,7 @@ var notifyModuleTests = []struct {
ExpectClose bool
ExpectID bool
SendOnce bool
+ Cluster string
}{
// {1, 0, false, false, false, false, false},
// {2, 0, false, false, false, false, false},
@@ -479,27 +480,31 @@ var notifyModuleTests = []struct {
// {1, 0, false, true, false, false, false},
// {1, 0, true, true, false, false, false},
- {1, 1, false, false, true, false, false, false},
- {1, 1, false, true, true, false, false, false},
- {1, 1, true, false, true, false, false, false},
- {1, 1, true, true, true, true, false, true},
-
- {1, 2, false, false, true, false, true, false},
- {1, 2, false, true, true, false, true, false},
- {1, 2, true, false, true, false, true, false},
- {1, 2, true, true, true, false, true, false},
- {1, 2, true, true, false, false, true, true},
- {1, 2, false, true, true, false, true, true},
-
- {3, 2, false, false, false, false, true, false},
- {3, 2, false, true, false, false, true, false},
- {3, 2, true, false, false, false, true, false},
- {3, 2, true, true, false, false, true, false},
-
- {2, 1, false, false, false, false, false, false},
- {2, 1, false, true, false, false, false, false},
- {2, 1, true, false, false, false, false, false},
- {2, 1, true, true, true, true, false, false},
+ {1, 1, false, false, true, false, false, false, ""},
+ {1, 1, false, true, true, false, false, false, "testcluster"},
+ {1, 1, true, false, true, false, false, false, "unmatchedCluster"},
+ {1, 1, true, true, true, true, false, true, ""},
+
+ {1, 2, false, false, true, false, true, false, ""},
+ {1, 2, false, true, true, false, true, false, ""},
+ {1, 2, true, false, true, false, true, false, ""},
+ {1, 2, true, true, true, false, true, false, ""},
+ {1, 2, true, true, false, false, true, true, ""},
+ {1, 2, false, true, true, false, true, true, ""},
+
+ {3, 2, false, false, false, false, true, false, ""},
+ {3, 2, false, true, false, false, true, false, ""},
+ {3, 2, true, false, false, false, true, false, ""},
+ {3, 2, true, true, false, false, true, false, ""},
+
+ {2, 1, false, false, false, false, false, false, ""},
+ {2, 1, false, true, false, false, false, false, ""},
+ {2, 1, true, false, false, false, false, false, ""},
+ {2, 1, true, true, true, true, false, false, ""},
+}
+
+func checkNotifierClusterMatch(cluster string) bool {
+ return cluster == "" || cluster == "testcluster"
}
func TestCoordinator_checkAndSendResponseToModules(t *testing.T) {
@@ -558,10 +563,15 @@ func TestCoordinator_checkAndSendResponseToModules(t *testing.T) {
// Set up the mock module and expected calls
mockModule := &helpers.MockModule{}
coordinator.modules["test"] = mockModule
+ mockModule.On("GetCluster").Return(testSet.Cluster)
+
+ if checkNotifierClusterMatch(testSet.Cluster) {
mockModule.On("GetName").Return("test")
mockModule.On("GetGroupAllowlist").Return((*regexp.Regexp)(nil))
mockModule.On("GetGroupDenylist").Return((*regexp.Regexp)(nil))
mockModule.On("AcceptConsumerGroup", response).Return(true)
+ }
+
if testSet.ExpectSend {
mockModule.On("Notify", response, mock.MatchedBy(func(s string) bool { return true }), mock.MatchedBy(func(t time.Time) bool { return true }), testSet.ExpectClose).Return()
}
diff --git a/core/internal/notifier/email.go b/core/internal/notifier/email.go
index c7e0f969..29e0fd96 100644
--- a/core/internal/notifier/email.go
+++ b/core/internal/notifier/email.go
@@ -38,6 +38,7 @@ type EmailNotifier struct {
Log *zap.Logger
name string
+ cluster string
groupAllowlist *regexp.Regexp
groupDenylist *regexp.Regexp
extras map[string]string
@@ -139,6 +140,11 @@ func (module *EmailNotifier) GetName() string {
return module.name
}
+// GetCluster returns the configured name of this module
+func (module *EmailNotifier) GetCluster() string {
+ return module.cluster
+}
+
// GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one)
func (module *EmailNotifier) GetGroupAllowlist() *regexp.Regexp {
return module.groupAllowlist
diff --git a/core/internal/notifier/helpers.go b/core/internal/notifier/helpers.go
index 6eb6b8b6..6d3bfb6e 100644
--- a/core/internal/notifier/helpers.go
+++ b/core/internal/notifier/helpers.go
@@ -163,5 +163,9 @@ func maxLagHelper(a *protocol.PartitionStatus) uint64 {
}
func formatTimestamp(timestamp int64, formatString string) string {
- return time.Unix(0, timestamp*int64(time.Millisecond)).Format(formatString)
+ if timestamp > 0 {
+ return time.Unix(0, timestamp*int64(time.Millisecond)).Format(formatString)
+ } else {
+ return time.Now().Format(formatString)
+ }
}
diff --git a/core/internal/notifier/http.go b/core/internal/notifier/http.go
index d400a7f7..fedcf26c 100644
--- a/core/internal/notifier/http.go
+++ b/core/internal/notifier/http.go
@@ -39,6 +39,7 @@ type HTTPNotifier struct {
Log *zap.Logger
name string
+ cluster string
groupAllowlist *regexp.Regexp
groupDenylist *regexp.Regexp
extras map[string]string
@@ -124,6 +125,10 @@ func (module *HTTPNotifier) GetName() string {
return module.name
}
+func (module *HTTPNotifier) GetCluster() string {
+ return module.cluster
+}
+
// GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one)
func (module *HTTPNotifier) GetGroupAllowlist() *regexp.Regexp {
return module.groupAllowlist
diff --git a/core/internal/notifier/null.go b/core/internal/notifier/null.go
index 834fd653..395a2e4d 100644
--- a/core/internal/notifier/null.go
+++ b/core/internal/notifier/null.go
@@ -30,6 +30,7 @@ type NullNotifier struct {
Log *zap.Logger
name string
+ cluster string
groupAllowlist *regexp.Regexp
groupDenylist *regexp.Regexp
extras map[string]string
@@ -75,6 +76,11 @@ func (module *NullNotifier) GetName() string {
return module.name
}
+// GetCluster returns the configured name of this module
+func (module *NullNotifier) GetCluster() string {
+ return module.cluster
+}
+
// GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one)
func (module *NullNotifier) GetGroupAllowlist() *regexp.Regexp {
return module.groupAllowlist