diff --git a/pulsar-function-go/go.mod b/pulsar-function-go/go.mod index 1a0f2990f006f..e234422d9239d 100644 --- a/pulsar-function-go/go.mod +++ b/pulsar-function-go/go.mod @@ -44,6 +44,7 @@ require ( github.com/prometheus/procfs v0.9.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect go.uber.org/atomic v1.7.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.17.0 // indirect diff --git a/pulsar-function-go/go.sum b/pulsar-function-go/go.sum index 2cadeb1331f30..bae60ec68ab8e 100644 --- a/pulsar-function-go/go.sum +++ b/pulsar-function-go/go.sum @@ -352,6 +352,7 @@ github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -360,6 +361,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go index 1064aece46fe8..7f5e652d6b745 100644 --- a/pulsar-function-go/pf/instance.go +++ b/pulsar-function-go/pf/instance.go @@ -161,7 +161,7 @@ CLOSE: gi.ackInputMessage(msgInput) } gi.stats.incrTotalReceived() - gi.addLogTopicHandler() + gi.flushLogsToTopicHandler() gi.stats.setLastInvocation() gi.stats.processTimeStart() @@ -476,15 +476,36 @@ func (gi *goInstance) setupLogHandler() error { gi.context.instanceConf.funcDetails.Namespace, gi.context.instanceConf.funcDetails.Name), ) - return gi.context.logAppender.Start() + if err := gi.context.logAppender.Start(); err != nil { + return err + } + gi.setupLogHandlerTicker() } return nil } -func (gi *goInstance) addLogTopicHandler() { +func (gi *goInstance) setupLogHandlerTicker() { + go func() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + gi.flushLogsToTopicHandler() + case <-gi.context.logAppender.stopChan: + gi.flushLogsToTopicHandler() + return + } + } + }() +} + +func (gi *goInstance) flushLogsToTopicHandler() { // Clear StrEntry regardless gi.context.logAppender is set or not + gi.context.logAppender.mutex.Lock() defer func() { log.StrEntry = nil + gi.context.logAppender.mutex.Unlock() }() if gi.context.logAppender == nil { diff --git a/pulsar-function-go/pf/instance_test.go b/pulsar-function-go/pf/instance_test.go index bf45ae3a8917e..1fd5839864691 100644 --- a/pulsar-function-go/pf/instance_test.go +++ b/pulsar-function-go/pf/instance_test.go @@ -23,10 +23,14 @@ import ( "context" "fmt" "strconv" + "sync" "testing" "time" + "github.com/apache/pulsar-client-go/pulsar" + log "github.com/apache/pulsar/pulsar-function-go/logutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func testProcessSpawnerHealthCheckTimer( @@ -115,3 +119,29 @@ func Test_goInstance_handlerMsg(t *testing.T) { assert.Equal(t, "output", string(output)) assert.Equal(t, message, fc.record) } + +func Test_goInstance_setupLogHandlerTicker(t *testing.T) { + mockProd := &MockPulsarProducer{} + fc := NewFuncContext() + fc.logAppender = &LogAppender{ + producer: mockProd, + mutex: sync.Mutex{}, + stopChan: make(chan struct{}), + } + defer close(fc.logAppender.stopChan) + instance := &goInstance{ + context: fc, + } + + logString := "Hello from Test_goInstance_setupLogHandlerTicker" + log.Info(logString) + mockProd.On("SendAsync", context.Background(), mock.Anything, mock.Anything).Return() + instance.setupLogHandlerTicker() + time.Sleep(150 * time.Millisecond) + + // Check that the message argument from the last SendAsync call is as expected + mockProd.AssertExpectations(t) + pulsarMsg, ok := mockProd.Calls[len(mockProd.Calls)-1].Arguments[1].(*pulsar.ProducerMessage) + assert.True(t, ok) + assert.Regexp(t, fmt.Sprintf(".*%s", logString), string(pulsarMsg.Payload)) +} diff --git a/pulsar-function-go/pf/logAppender.go b/pulsar-function-go/pf/logAppender.go index 699f5514f7fa4..c503534e78546 100644 --- a/pulsar-function-go/pf/logAppender.go +++ b/pulsar-function-go/pf/logAppender.go @@ -21,6 +21,7 @@ package pf import ( "context" + "sync" "time" "github.com/apache/pulsar-client-go/pulsar" @@ -32,6 +33,8 @@ type LogAppender struct { logTopic string fqn string producer pulsar.Producer + mutex sync.Mutex + stopChan chan struct{} } func NewLogAppender(client pulsar.Client, logTopic, fqn string) *LogAppender { @@ -39,6 +42,8 @@ func NewLogAppender(client pulsar.Client, logTopic, fqn string) *LogAppender { pulsarClient: client, logTopic: logTopic, fqn: fqn, + mutex: sync.Mutex{}, + stopChan: make(chan struct{}), } return logAppender } @@ -77,6 +82,8 @@ func (la *LogAppender) GetName() string { } func (la *LogAppender) Stop() { + close(la.stopChan) + time.Sleep(10 * time.Millisecond) la.producer.Close() la.producer = nil } diff --git a/pulsar-function-go/pf/mockMessage_test.go b/pulsar-function-go/pf/mockMessage_test.go index 471a9c9ca252c..104f61a26a0b0 100644 --- a/pulsar-function-go/pf/mockMessage_test.go +++ b/pulsar-function-go/pf/mockMessage_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar" + "github.com/stretchr/testify/mock" ) type MockMessage struct { @@ -110,7 +111,9 @@ func (m *MockMessageID) PartitionIdx() int32 { return 0 } -type MockPulsarProducer struct{} +type MockPulsarProducer struct { + mock.Mock +} func (producer *MockPulsarProducer) Topic() string { return "publish-topic" @@ -124,8 +127,9 @@ func (producer *MockPulsarProducer) Send(context.Context, *pulsar.ProducerMessag return nil, nil } -func (producer *MockPulsarProducer) SendAsync(context.Context, *pulsar.ProducerMessage, - func(pulsar.MessageID, *pulsar.ProducerMessage, error)) { +func (producer *MockPulsarProducer) SendAsync(ctx context.Context, msg *pulsar.ProducerMessage, + cb func(pulsar.MessageID, *pulsar.ProducerMessage, error)) { + producer.Called(ctx, msg, cb) } func (producer *MockPulsarProducer) LastSequenceID() int64 {