From 8673540f31bcdd84aa343e7238efba74bd4348a0 Mon Sep 17 00:00:00 2001 From: muicoder Date: Sat, 26 Mar 2022 00:40:57 +0800 Subject: [PATCH] Adding association between a Kafka cluster and a notifier Signed-off-by: muicoder https://github.com/linkedin/Burrow/pull/611 Adding WeCom/DingTalk template --- .github/workflows/action.yaml | 50 +++++++++++++++++++++ .github/workflows/ci.yml | 2 +- Dockerfile.amd64 | 27 +++++++++++ Dockerfile.arm64 | 25 +++++++++++ config/burrow.toml | 1 + config/default-dingtalk-post.tmpl | 52 ++++++++++++++++++++++ config/default-wecom-post.tmpl | 52 ++++++++++++++++++++++ core/internal/helpers/coordinators.go | 6 +++ core/internal/httpserver/config.go | 2 + core/internal/httpserver/structs.go | 2 + core/internal/notifier/coordinator.go | 13 +++++- core/internal/notifier/coordinator_test.go | 52 +++++++++++++--------- core/internal/notifier/email.go | 6 +++ core/internal/notifier/helpers.go | 6 ++- core/internal/notifier/http.go | 5 +++ core/internal/notifier/null.go | 6 +++ 16 files changed, 282 insertions(+), 25 deletions(-) create mode 100644 .github/workflows/action.yaml create mode 100644 Dockerfile.amd64 create mode 100644 Dockerfile.arm64 create mode 100644 config/default-dingtalk-post.tmpl create mode 100644 config/default-wecom-post.tmpl diff --git a/.github/workflows/action.yaml b/.github/workflows/action.yaml new file mode 100644 index 00000000..36b3ed51 --- /dev/null +++ b/.github/workflows/action.yaml @@ -0,0 +1,50 @@ +env: + BASE64manifest: IyEvYmluL3NoCgpDTUQ9JChpZiBidWlsZGFoID4vZGV2L251bGw7IHRoZW4gZWNobyBidWlsZGFoOyBlbGlmIHNlYWxvcyA+L2Rldi9udWxsOyB0aGVuIGVjaG8gc2VhbG9zOyBmaSkKTUY9Im1mOiQoZGF0ZSArJUYpIgoKUkVQTz0iJHsxOi1kb2NrZXIuaW8vYml0bmFtaS9tZXRyaWNzLXNlcnZlcjpkb2NrZXIuaW8vbXVpY29kZXIvbWV0cmljcy1zZXJ2ZXJ9IgpUQUdTPSIkezI6LTAuNi4zfSIKVEFHPSIkezM6LSRUQUdTfSIKCmlmIFsgIiR7UkVQTyU6Kn0iICE9ICIkUkVQTyIgXTsgdGhlbgogIGlmIFsgIiR7VEFHUyUsKn0iICE9ICIkVEFHUyIgXTsgdGhlbgogICAgZWNobyAiJFRBR1MiIHwgc2VkICJzfix+XG5+ZyIgfCB3aGlsZSByZWFkIC1yIHRhZzsgZG8KICAgICAgZWNobyAiJHtSRVBPJToqfTokdGFnIgogICAgZG9uZSB8ICRDTUQgcHVsbAogIGVsc2UKICAgICRDTUQgcHVsbCAtLXBvbGljeT1hbHdheXMgLS1wbGF0Zm9ybT1saW51eC9hbWQ2NCAiJHtSRVBPJToqfTokVEFHUyIKICAgICRDTUQgdGFnICIke1JFUE8lOip9OiRUQUdTIiAiJHtSRVBPIyo6fTokVEFHUy1hbWQ2NCIKICAgICRDTUQgcHVsbCAtLXBvbGljeT1hbHdheXMgLS1wbGF0Zm9ybT1saW51eC9hcm02NCAiJHtSRVBPJToqfTokVEFHUyIKICAgICRDTUQgdGFnICIke1JFUE8lOip9OiRUQUdTIiAiJHtSRVBPIyo6fTokVEFHUy1hcm02NCIKICAgIFRBR1M9IiRUQUdTLWFtZDY0LCRUQUdTLWFybTY0IgogIGZpCmZpCgplY2hvICIkVEFHUyIgfCBzZWQgInN+LH5cbn5nIiB8IHdoaWxlIHJlYWQgLXIgdGFnOyBkbwogIGVjaG8gIiR7UkVQTyMqOn06JHRhZyIKZG9uZSB8IHhhcmdzICRDTUQgbWFuaWZlc3QgY3JlYXRlIC0tYWxsICIkTUYiCiRDTUQgbWFuaWZlc3QgcHVzaCAtLWFsbCAiJE1GIiAiZG9ja2VyOi8vJHtSRVBPIyo6fTokVEFHIgokQ01EIG1hbmlmZXN0IHJtICIkTUYiIHx8IHRydWUK +jobs: + aio-manifest: + needs: + - build + runs-on: ubuntu-latest + steps: + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + password: ${{ secrets.DOCKERHUB_PASSWORD }} + username: ${{ secrets.DOCKERHUB_USERNAME }} + - name: manifest + run: echo ${{ env.BASE64manifest }} | base64 -d | sh -s docker.io/${{ secrets.DOCKERHUB_USERNAME }}/burrow action-amd64,action-arm64 action + build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + password: ${{ secrets.DOCKERHUB_PASSWORD }} + username: ${{ secrets.DOCKERHUB_USERNAME }} + - name: Build and push + uses: docker/build-push-action@v3 + with: + context: . + file: Dockerfile.${{ matrix.arch}} + platforms: ${{ matrix.os}}/${{ matrix.arch}} + provenance: false + pull: true + push: true + sbom: false + tags: ${{ secrets.DOCKERHUB_USERNAME }}/burrow:action-${{ matrix.arch}} + strategy: + matrix: + arch: + - arm64 + - amd64 + os: + - linux +name: build +on: + workflow_dispatch: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1440aff2..6142a2a8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,6 +1,6 @@ name: CI -on: [push, pull_request] +on: [pull_request] jobs: test: diff --git a/Dockerfile.amd64 b/Dockerfile.amd64 new file mode 100644 index 00000000..1014c3e8 --- /dev/null +++ b/Dockerfile.amd64 @@ -0,0 +1,27 @@ +FROM quay.io/coreos/etcd:v3.3.27 as etcd +FROM quay.io/coreos/zetcd:v0.0.5 as zetcd +FROM edenhill/kcat:1.7.1 as kcat + +FROM golang:1.20-alpine as builder +ARG git_user=muicoder +ARG git_repo=Burrow +ARG git_branch=action +ENV CGO_ENABLED=0 +RUN set -ex && \ + wget -qO- https://github.com/$git_user/$git_repo/archive/refs/heads/$git_branch.tar.gz | tar -xz && \ + cd $git_repo-$git_branch && \ + go get -u all && go mod verify && go mod tidy && git diff && \ + go build -trimpath -ldflags '-s -w -extldflags "-static"' -o $GOPATH/bin/burrow && \ + go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkboom@latest && \ + go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkctl@latest && \ + ls -lh $GOPATH/bin + +FROM alpine:edge as cached +COPY --from=etcd /usr/local/bin/etcd* /cached/ +COPY --from=zetcd /usr/local/bin/zetcd* /cached/ +COPY --from=builder /go/bin/* /cached/ +COPY --from=kcat /usr/bin/kcat /cached/ + +FROM alpine:3.18 +RUN apk add --no-cache curl jq wget tzdata libcurl lz4-libs zstd-libs ca-certificates +COPY --from=cached /cached /usr/local/bin/ diff --git a/Dockerfile.arm64 b/Dockerfile.arm64 new file mode 100644 index 00000000..0704a6e2 --- /dev/null +++ b/Dockerfile.arm64 @@ -0,0 +1,25 @@ +FROM quay.io/coreos/etcd:v3.3.27-arm64 as etcd +FROM kbzjung359/zetcd:v0.0.5-alpine-arm64 as zetcd + +FROM golang:1.20-alpine as builder +ARG git_user=muicoder +ARG git_repo=Burrow +ARG git_branch=action +ENV CGO_ENABLED=0 +RUN set -ex && \ + wget -qO- https://github.com/$git_user/$git_repo/archive/refs/heads/$git_branch.tar.gz | tar -xz && \ + cd $git_repo-$git_branch && \ + go get -u all && go mod verify && go mod tidy && git diff && \ + go build -trimpath -ldflags '-s -w -extldflags "-static"' -o $GOPATH/bin/burrow && \ + go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkboom@latest && \ + go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkctl@latest && \ + ls -lh $GOPATH/bin + +FROM alpine:edge as cached +COPY --from=etcd /usr/local/bin/etcd* /cached/ +COPY --from=zetcd /usr/local/bin/zetcd* /cached/ +COPY --from=builder /go/bin/* /cached/ + +FROM alpine:3.18 +RUN apk add --no-cache curl jq wget tzdata libcurl lz4-libs zstd-libs ca-certificates +COPY --from=cached /cached /usr/local/bin/ diff --git a/config/burrow.toml b/config/burrow.toml index ea2dc4fc..de0cf7bd 100644 --- a/config/burrow.toml +++ b/config/burrow.toml @@ -58,6 +58,7 @@ min-distance=1 [notifier.default] class-name="http" +cluster="local" url-open="http://someservice.example.com:1467/v1/event" interval=60 timeout=5 diff --git a/config/default-dingtalk-post.tmpl b/config/default-dingtalk-post.tmpl new file mode 100644 index 00000000..216e5643 --- /dev/null +++ b/config/default-dingtalk-post.tmpl @@ -0,0 +1,52 @@ +{"msgtype": "markdown","markdown": {"title":"Kafka LagChecker", "text": " +{{- $StatusURL := "https://pkg.go.dev/github.com/linkedin/Burrow/core/protocol#StatusConstant"}} +{{- $FormatString := "2006-01-02 15:04:05"}} +# Kafka: {{.Cluster}} +ConsumerGroup: {{.Group}}{{- with .Result.Status}} +{{- if eq . 0}}NotFound{{end}} +{{- if eq . 1}}normal{{end}} +{{- if eq . 2}}lagging{{end}} +{{- if eq . 3}}abnormal{{end}} +{{- end}} +**Status:** Total(Partitions={{.Result.TotalPartitions}},Lag={{.Result.TotalLag}})[{{- with .Result.Status}} +{{- if eq . 0}}NotFound{{end}} +{{- if eq . 1}}{{.}}{{end}} +{{- if eq . 2}}{{.}}{{end}} +{{- if eq . 3}}{{.}}{{end}} +{{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}} +{{- if .Result.Maxlag|maxlag}} +**MaxLagDetails:** +{{- with .Result.Maxlag}} +{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}} +{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}} +{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}} +\tCurrentLag={{.CurrentLag}} +\tPartition={{.Partition}} +{{- end}} +{{- end}} +{{- $TotalErrors := len .Result.Partitions}} +{{- if $TotalErrors}} +### {{$TotalErrors}} partitions have problems +>**CountPartitions:** +{{- range $k,$v := .Result.Partitions|partitioncounts}} +{{- if ne $v 0}}\n\t{{$k}}={{$v}}{{end}} +{{- end}} +**TopicsByStatus:** +{{- range $k,$v := .Result.Partitions|topicsbystatus}} +\t{{$k}}={{$v}} +{{- end}} +**PartitionDetails:** +{{- range .Result.Partitions}} +{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}} +{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}} +{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}} +\tCurrentLag={{.CurrentLag}} +\tPartition={{.Partition}} +\tStart={{formattimestamp .Start.Timestamp $FormatString}} +\t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}} +\tEnd={{formattimestamp .End.Timestamp $FormatString}} +\t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}} +{{- end}} +{{- end}} +" +}} diff --git a/config/default-wecom-post.tmpl b/config/default-wecom-post.tmpl new file mode 100644 index 00000000..cbba10a3 --- /dev/null +++ b/config/default-wecom-post.tmpl @@ -0,0 +1,52 @@ +{"msgtype": "markdown","markdown": {"content": " +{{- $StatusURL := "https://pkg.go.dev/github.com/linkedin/Burrow/core/protocol#StatusConstant"}} +{{- $FormatString := "2006-01-02 15:04:05"}} +# Kafka: {{.Cluster}} +ConsumerGroup: {{.Group}}{{- with .Result.Status}} +{{- if eq . 0}}NotFound{{end}} +{{- if eq . 1}}normal{{end}} +{{- if eq . 2}}lagging{{end}} +{{- if eq . 3}}abnormal{{end}} +{{- end}} +**Status:** Total(Partitions={{.Result.TotalPartitions}},Lag={{.Result.TotalLag}})[{{- with .Result.Status}} +{{- if eq . 0}}NotFound{{end}} +{{- if eq . 1}}{{.}}{{end}} +{{- if eq . 2}}{{.}}{{end}} +{{- if eq . 3}}{{.}}{{end}} +{{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}} +{{- if .Result.Maxlag|maxlag}} +**MaxLagDetails:** +{{- with .Result.Maxlag}} +{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}} +{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}} +{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}} +\tCurrentLag={{.CurrentLag}} +\tPartition={{.Partition}} +{{- end}} +{{- end}} +{{- $TotalErrors := len .Result.Partitions}} +{{- if $TotalErrors}} +### {{$TotalErrors}} partitions have problems +>**CountPartitions:** +{{- range $k,$v := .Result.Partitions|partitioncounts}} +{{- if ne $v 0}}\n\t{{$k}}={{$v}}{{end}} +{{- end}} +**TopicsByStatus:** +{{- range $k,$v := .Result.Partitions|topicsbystatus}} +\t{{$k}}={{$v}} +{{- end}} +**PartitionDetails:** +{{- range .Result.Partitions}} +{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}} +{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}} +{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}} +\tCurrentLag={{.CurrentLag}} +\tPartition={{.Partition}} +\tStart={{formattimestamp .Start.Timestamp $FormatString}} +\t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}} +\tEnd={{formattimestamp .End.Timestamp $FormatString}} +\t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}} +{{- end}} +{{- end}} +" +}} diff --git a/core/internal/helpers/coordinators.go b/core/internal/helpers/coordinators.go index 520632c1..94e6cc10 100644 --- a/core/internal/helpers/coordinators.go +++ b/core/internal/helpers/coordinators.go @@ -74,6 +74,12 @@ func (m *MockModule) GetName() string { return args.String(0) } +// GetCluster mocks the notifier.Module GetCluster func +func (m *MockModule) GetCluster() string { + args := m.Called() + return args.String(0) +} + // GetGroupAllowlist mocks the notifier.Module GetGroupAllowlist func func (m *MockModule) GetGroupAllowlist() *regexp.Regexp { args := m.Called() diff --git a/core/internal/httpserver/config.go b/core/internal/httpserver/config.go index 92f04532..1d21639b 100644 --- a/core/internal/httpserver/config.go +++ b/core/internal/httpserver/config.go @@ -213,6 +213,7 @@ func (hc *Coordinator) configNotifierHTTP(w http.ResponseWriter, r *http.Request SendClose: viper.GetBool(configRoot + ".send-close"), ExtraCa: viper.GetString(configRoot + ".extra-ca"), NoVerify: viper.GetString(configRoot + ".noverify"), + Cluster: viper.GetString(configRoot + ".cluster"), }, Request: requestInfo, }) @@ -265,6 +266,7 @@ func (hc *Coordinator) configNotifierEmail(w http.ResponseWriter, r *http.Reques To: viper.GetString(configRoot + ".to"), ExtraCa: viper.GetString(configRoot + ".extra-ca"), NoVerify: viper.GetString(configRoot + ".noverify"), + Cluster: viper.GetString(configRoot + ".cluster"), }, Request: requestInfo, }) diff --git a/core/internal/httpserver/structs.go b/core/internal/httpserver/structs.go index b15ce9de..91d269bc 100644 --- a/core/internal/httpserver/structs.go +++ b/core/internal/httpserver/structs.go @@ -202,6 +202,7 @@ type httpResponseConfigModuleNotifierHTTP struct { SendClose bool `json:"send-close"` ExtraCa string `json:"extra-ca"` NoVerify string `json:"noverify"` + Cluster string `json:"cluster"` } type httpResponseConfigModuleNotifierSlack struct { @@ -238,6 +239,7 @@ type httpResponseConfigModuleNotifierEmail struct { To string `json:"to"` ExtraCa string `json:"extra-ca"` NoVerify string `json:"noverify"` + Cluster string `json:"cluster"` } type httpResponseConfigModuleNotifierNull struct { diff --git a/core/internal/notifier/coordinator.go b/core/internal/notifier/coordinator.go index 5cab2906..53e81d0a 100644 --- a/core/internal/notifier/coordinator.go +++ b/core/internal/notifier/coordinator.go @@ -48,6 +48,7 @@ import ( type Module interface { protocol.Module GetName() string + GetCluster() string GetGroupAllowlist() *regexp.Regexp GetGroupDenylist() *regexp.Regexp GetLogger() *zap.Logger @@ -95,7 +96,7 @@ type Coordinator struct { // getModuleForClass returns the correct module based on the passed className. As part of the Configure steps, if there // is any error, it will panic with an appropriate message describing the problem. -func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template) protocol.Module { +func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template, cluster string) protocol.Module { logger := app.Logger.With( zap.String("type", "module"), zap.String("coordinator", "notifier"), @@ -113,6 +114,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s extras: extras, templateOpen: templateOpen, templateClose: templateClose, + cluster: cluster, } case "email": return &EmailNotifier{ @@ -123,6 +125,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s extras: extras, templateOpen: templateOpen, templateClose: templateClose, + cluster: cluster, } case "null": return &NullNotifier{ @@ -133,6 +136,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s extras: extras, templateOpen: templateOpen, templateClose: templateClose, + cluster: cluster, } default: panic("Unknown notifier className provided: " + className) @@ -194,6 +198,8 @@ func (nc *Coordinator) Configure() { groupAllowlist = re } + cluster := viper.GetString(configRoot + ".cluster") + // Compile the denylist for the consumer groups to not notify for var groupDenylist *regexp.Regexp denylist := viper.GetString(configRoot + ".group-denylist") @@ -227,7 +233,7 @@ func (nc *Coordinator) Configure() { templateClose = tmpl.Templates()[0] } - module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose) + module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose, cluster) module.Configure(name, configRoot) nc.modules[name] = module interval := viper.GetInt64(configRoot + ".interval") @@ -436,6 +442,9 @@ func (nc *Coordinator) checkAndSendResponseToModules(response *protocol.Consumer for _, genericModule := range nc.modules { module := genericModule.(Module) + if module.GetCluster() != "" && response.Cluster != module.GetCluster() { + continue + } // No allowlist means everything passes groupAllowlist := module.GetGroupAllowlist() groupDenylist := module.GetGroupDenylist() diff --git a/core/internal/notifier/coordinator_test.go b/core/internal/notifier/coordinator_test.go index 03bb60cc..cb026d38 100644 --- a/core/internal/notifier/coordinator_test.go +++ b/core/internal/notifier/coordinator_test.go @@ -472,6 +472,7 @@ var notifyModuleTests = []struct { ExpectClose bool ExpectID bool SendOnce bool + Cluster string }{ // {1, 0, false, false, false, false, false}, // {2, 0, false, false, false, false, false}, @@ -479,27 +480,31 @@ var notifyModuleTests = []struct { // {1, 0, false, true, false, false, false}, // {1, 0, true, true, false, false, false}, - {1, 1, false, false, true, false, false, false}, - {1, 1, false, true, true, false, false, false}, - {1, 1, true, false, true, false, false, false}, - {1, 1, true, true, true, true, false, true}, - - {1, 2, false, false, true, false, true, false}, - {1, 2, false, true, true, false, true, false}, - {1, 2, true, false, true, false, true, false}, - {1, 2, true, true, true, false, true, false}, - {1, 2, true, true, false, false, true, true}, - {1, 2, false, true, true, false, true, true}, - - {3, 2, false, false, false, false, true, false}, - {3, 2, false, true, false, false, true, false}, - {3, 2, true, false, false, false, true, false}, - {3, 2, true, true, false, false, true, false}, - - {2, 1, false, false, false, false, false, false}, - {2, 1, false, true, false, false, false, false}, - {2, 1, true, false, false, false, false, false}, - {2, 1, true, true, true, true, false, false}, + {1, 1, false, false, true, false, false, false, ""}, + {1, 1, false, true, true, false, false, false, "testcluster"}, + {1, 1, true, false, true, false, false, false, "unmatchedCluster"}, + {1, 1, true, true, true, true, false, true, ""}, + + {1, 2, false, false, true, false, true, false, ""}, + {1, 2, false, true, true, false, true, false, ""}, + {1, 2, true, false, true, false, true, false, ""}, + {1, 2, true, true, true, false, true, false, ""}, + {1, 2, true, true, false, false, true, true, ""}, + {1, 2, false, true, true, false, true, true, ""}, + + {3, 2, false, false, false, false, true, false, ""}, + {3, 2, false, true, false, false, true, false, ""}, + {3, 2, true, false, false, false, true, false, ""}, + {3, 2, true, true, false, false, true, false, ""}, + + {2, 1, false, false, false, false, false, false, ""}, + {2, 1, false, true, false, false, false, false, ""}, + {2, 1, true, false, false, false, false, false, ""}, + {2, 1, true, true, true, true, false, false, ""}, +} + +func checkNotifierClusterMatch(cluster string) bool { + return cluster == "" || cluster == "testcluster" } func TestCoordinator_checkAndSendResponseToModules(t *testing.T) { @@ -558,10 +563,15 @@ func TestCoordinator_checkAndSendResponseToModules(t *testing.T) { // Set up the mock module and expected calls mockModule := &helpers.MockModule{} coordinator.modules["test"] = mockModule + mockModule.On("GetCluster").Return(testSet.Cluster) + + if checkNotifierClusterMatch(testSet.Cluster) { mockModule.On("GetName").Return("test") mockModule.On("GetGroupAllowlist").Return((*regexp.Regexp)(nil)) mockModule.On("GetGroupDenylist").Return((*regexp.Regexp)(nil)) mockModule.On("AcceptConsumerGroup", response).Return(true) + } + if testSet.ExpectSend { mockModule.On("Notify", response, mock.MatchedBy(func(s string) bool { return true }), mock.MatchedBy(func(t time.Time) bool { return true }), testSet.ExpectClose).Return() } diff --git a/core/internal/notifier/email.go b/core/internal/notifier/email.go index c7e0f969..29e0fd96 100644 --- a/core/internal/notifier/email.go +++ b/core/internal/notifier/email.go @@ -38,6 +38,7 @@ type EmailNotifier struct { Log *zap.Logger name string + cluster string groupAllowlist *regexp.Regexp groupDenylist *regexp.Regexp extras map[string]string @@ -139,6 +140,11 @@ func (module *EmailNotifier) GetName() string { return module.name } +// GetCluster returns the configured name of this module +func (module *EmailNotifier) GetCluster() string { + return module.cluster +} + // GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one) func (module *EmailNotifier) GetGroupAllowlist() *regexp.Regexp { return module.groupAllowlist diff --git a/core/internal/notifier/helpers.go b/core/internal/notifier/helpers.go index 6eb6b8b6..6d3bfb6e 100644 --- a/core/internal/notifier/helpers.go +++ b/core/internal/notifier/helpers.go @@ -163,5 +163,9 @@ func maxLagHelper(a *protocol.PartitionStatus) uint64 { } func formatTimestamp(timestamp int64, formatString string) string { - return time.Unix(0, timestamp*int64(time.Millisecond)).Format(formatString) + if timestamp > 0 { + return time.Unix(0, timestamp*int64(time.Millisecond)).Format(formatString) + } else { + return time.Now().Format(formatString) + } } diff --git a/core/internal/notifier/http.go b/core/internal/notifier/http.go index d400a7f7..fedcf26c 100644 --- a/core/internal/notifier/http.go +++ b/core/internal/notifier/http.go @@ -39,6 +39,7 @@ type HTTPNotifier struct { Log *zap.Logger name string + cluster string groupAllowlist *regexp.Regexp groupDenylist *regexp.Regexp extras map[string]string @@ -124,6 +125,10 @@ func (module *HTTPNotifier) GetName() string { return module.name } +func (module *HTTPNotifier) GetCluster() string { + return module.cluster +} + // GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one) func (module *HTTPNotifier) GetGroupAllowlist() *regexp.Regexp { return module.groupAllowlist diff --git a/core/internal/notifier/null.go b/core/internal/notifier/null.go index 834fd653..395a2e4d 100644 --- a/core/internal/notifier/null.go +++ b/core/internal/notifier/null.go @@ -30,6 +30,7 @@ type NullNotifier struct { Log *zap.Logger name string + cluster string groupAllowlist *regexp.Regexp groupDenylist *regexp.Regexp extras map[string]string @@ -75,6 +76,11 @@ func (module *NullNotifier) GetName() string { return module.name } +// GetCluster returns the configured name of this module +func (module *NullNotifier) GetCluster() string { + return module.cluster +} + // GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one) func (module *NullNotifier) GetGroupAllowlist() *regexp.Regexp { return module.groupAllowlist