From e83a70a447141728e77dd91f0aa5a219eb3faa84 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Fri, 25 Oct 2024 14:47:04 +0100 Subject: [PATCH] Tweaks to test pubsub source issues --- VERSION | 2 +- .../sources/pubsub-full-example.hcl | 4 ++ cmd/constants.go | 2 +- pkg/source/pubsub/pubsub_source.go | 59 +++++++++++-------- pkg/source/pubsub/pubsub_source_test.go | 8 ++- 5 files changed, 47 insertions(+), 28 deletions(-) diff --git a/VERSION b/VERSION index 8e8299dc..c701c239 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.4.2 +3.1.0-test1 diff --git a/assets/docs/configuration/sources/pubsub-full-example.hcl b/assets/docs/configuration/sources/pubsub-full-example.hcl index 4a00ac8e..988e7449 100644 --- a/assets/docs/configuration/sources/pubsub-full-example.hcl +++ b/assets/docs/configuration/sources/pubsub-full-example.hcl @@ -16,5 +16,9 @@ source { # Maximum size of unprocessed messages (default 1e9) max_outstanding_bytes = 2e9 + + min_extension_period_seconds = 10 + + streaming_pull_goroutines = 2 } } diff --git a/cmd/constants.go b/cmd/constants.go index 8c5f5869..640b352b 100644 --- a/cmd/constants.go +++ b/cmd/constants.go @@ -13,7 +13,7 @@ package cmd const ( // AppVersion is the current version of the app - AppVersion = "2.4.2" + AppVersion = "3.1.0-test1" // AppName is the name of the application to use in logging / places that require the artifact AppName = "snowbridge" diff --git a/pkg/source/pubsub/pubsub_source.go b/pkg/source/pubsub/pubsub_source.go index 76708b96..70cdd6d0 100644 --- a/pkg/source/pubsub/pubsub_source.go +++ b/pkg/source/pubsub/pubsub_source.go @@ -28,21 +28,25 @@ import ( // Configuration configures the source for records pulled type Configuration struct { - ProjectID string `hcl:"project_id"` - SubscriptionID string `hcl:"subscription_id"` - ConcurrentWrites int `hcl:"concurrent_writes,optional"` - MaxOutstandingMessages int `hcl:"max_outstanding_messages,optional"` - MaxOutstandingBytes int `hcl:"max_outstanding_bytes,optional"` + ProjectID string `hcl:"project_id"` + SubscriptionID string `hcl:"subscription_id"` + ConcurrentWrites int `hcl:"concurrent_writes,optional"` + MaxOutstandingMessages int `hcl:"max_outstanding_messages,optional"` + MaxOutstandingBytes int `hcl:"max_outstanding_bytes,optional"` + MinExtensionPeriodSeconds int `hcl:"min_extension_period_seconds,optional"` + StreamingPullGoRoutines int `hcl:"streaming_pull_goroutines,optional"` } // pubSubSource holds a new client for reading messages from PubSub type pubSubSource struct { - projectID string - client *pubsub.Client - subscriptionID string - concurrentWrites int - maxOutstandingMessages int - maxOutstandingBytes int + projectID string + client *pubsub.Client + subscriptionID string + concurrentWrites int + maxOutstandingMessages int + maxOutstandingBytes int + minExtensionPeriodSeconds int + streamingPullGoRoutines int log *log.Entry @@ -58,6 +62,8 @@ func configFunction(c *Configuration) (sourceiface.Source, error) { c.SubscriptionID, c.MaxOutstandingMessages, c.MaxOutstandingBytes, + c.MinExtensionPeriodSeconds, + c.StreamingPullGoRoutines, ) } @@ -74,9 +80,11 @@ func (f adapter) Create(i interface{}) (interface{}, error) { func (f adapter) ProvideDefault() (interface{}, error) { // Provide defaults cfg := &Configuration{ - ConcurrentWrites: 50, - MaxOutstandingMessages: 1000, - MaxOutstandingBytes: 1e9, + ConcurrentWrites: 50, + MaxOutstandingMessages: 1000, + MaxOutstandingBytes: 1e9, + MinExtensionPeriodSeconds: 0, + StreamingPullGoRoutines: 1, } return cfg, nil @@ -101,7 +109,7 @@ var ConfigPair = config.ConfigurationPair{ } // newPubSubSource creates a new client for reading messages from PubSub -func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string, maxOutstandingMessages, maxOutstandingBytes int) (*pubSubSource, error) { +func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string, maxOutstandingMessages, maxOutstandingBytes int, minExtensionPeriodSeconds int, streamingPullGoRoutines int) (*pubSubSource, error) { ctx := context.Background() // Ensures as even as possible distribution of UUIDs @@ -113,13 +121,15 @@ func newPubSubSource(concurrentWrites int, projectID string, subscriptionID stri } return &pubSubSource{ - projectID: projectID, - client: client, - subscriptionID: subscriptionID, - concurrentWrites: concurrentWrites, - maxOutstandingMessages: maxOutstandingMessages, - maxOutstandingBytes: maxOutstandingBytes, - log: log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID}), + projectID: projectID, + client: client, + subscriptionID: subscriptionID, + concurrentWrites: concurrentWrites, + maxOutstandingMessages: maxOutstandingMessages, + maxOutstandingBytes: maxOutstandingBytes, + minExtensionPeriodSeconds: minExtensionPeriodSeconds, // default to off to match client default + streamingPullGoRoutines: streamingPullGoRoutines, + log: log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID}), }, nil } @@ -130,9 +140,10 @@ func (ps *pubSubSource) Read(sf *sourceiface.SourceFunctions) error { ps.log.Info("Reading messages from subscription ...") sub := ps.client.Subscription(ps.subscriptionID) - sub.ReceiveSettings.NumGoroutines = ps.concurrentWrites - sub.ReceiveSettings.MaxOutstandingMessages = ps.maxOutstandingMessages + sub.ReceiveSettings.NumGoroutines = ps.streamingPullGoRoutines // This sets the number of goroutines that can open a streaming pull at once, not the concurrency + sub.ReceiveSettings.MaxOutstandingMessages = ps.concurrentWrites // maxOutstandingMessages limits concurrency sub.ReceiveSettings.MaxOutstandingBytes = ps.maxOutstandingBytes + sub.ReceiveSettings.MinExtensionPeriod = time.Duration(ps.minExtensionPeriodSeconds) * time.Second cctx, cancel := context.WithCancel(ctx) diff --git a/pkg/source/pubsub/pubsub_source_test.go b/pkg/source/pubsub/pubsub_source_test.go index db632d44..cc977ce9 100644 --- a/pkg/source/pubsub/pubsub_source_test.go +++ b/pkg/source/pubsub/pubsub_source_test.go @@ -13,6 +13,7 @@ package pubsubsource import ( "context" + "fmt" "os" "path/filepath" "sort" @@ -82,6 +83,9 @@ func TestPubSubSource_ReadAndReturnSuccessIntegration(t *testing.T) { assert.NotNil(pubsubSource) assert.Nil(err) + if err != nil { + fmt.Println(err.Error()) + } assert.Equal("projects/project-test/subscriptions/test-sub", pubsubSource.GetID()) output := testutil.ReadAndReturnMessages(pubsubSource, 5*time.Second, testutil.DefaultTestWriteBuilder, nil) @@ -116,7 +120,7 @@ func TestNewPubSubSource_Success(t *testing.T) { testutil.InitMockPubsubServer(8010, nil, t) - pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9) + pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9, 0, 1) assert.Nil(err) assert.IsType(&pubSubSource{}, pubsubSource) // This should return an error when we can't connect, rather than proceeding to the Write() function before we hit a problem. @@ -141,7 +145,7 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) { } wg.Wait() - pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9) + pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9, 0, 1) assert.NotNil(pubsubSource) assert.Nil(err)