Skip to content

Commit

Permalink
Tweaks to test pubsub source issues
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Oct 30, 2024
1 parent e5780ea commit e83a70a
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 28 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.4.2
3.1.0-test1
4 changes: 4 additions & 0 deletions assets/docs/configuration/sources/pubsub-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ source {

# Maximum size of unprocessed messages (default 1e9)
max_outstanding_bytes = 2e9

min_extension_period_seconds = 10

streaming_pull_goroutines = 2
}
}
2 changes: 1 addition & 1 deletion cmd/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package cmd

const (
// AppVersion is the current version of the app
AppVersion = "2.4.2"
AppVersion = "3.1.0-test1"

// AppName is the name of the application to use in logging / places that require the artifact
AppName = "snowbridge"
Expand Down
59 changes: 35 additions & 24 deletions pkg/source/pubsub/pubsub_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,25 @@ import (

// Configuration configures the source for records pulled
type Configuration struct {
ProjectID string `hcl:"project_id"`
SubscriptionID string `hcl:"subscription_id"`
ConcurrentWrites int `hcl:"concurrent_writes,optional"`
MaxOutstandingMessages int `hcl:"max_outstanding_messages,optional"`
MaxOutstandingBytes int `hcl:"max_outstanding_bytes,optional"`
ProjectID string `hcl:"project_id"`
SubscriptionID string `hcl:"subscription_id"`
ConcurrentWrites int `hcl:"concurrent_writes,optional"`
MaxOutstandingMessages int `hcl:"max_outstanding_messages,optional"`
MaxOutstandingBytes int `hcl:"max_outstanding_bytes,optional"`
MinExtensionPeriodSeconds int `hcl:"min_extension_period_seconds,optional"`
StreamingPullGoRoutines int `hcl:"streaming_pull_goroutines,optional"`
}

// pubSubSource holds a new client for reading messages from PubSub
type pubSubSource struct {
projectID string
client *pubsub.Client
subscriptionID string
concurrentWrites int
maxOutstandingMessages int
maxOutstandingBytes int
projectID string
client *pubsub.Client
subscriptionID string
concurrentWrites int
maxOutstandingMessages int
maxOutstandingBytes int
minExtensionPeriodSeconds int
streamingPullGoRoutines int

log *log.Entry

Expand All @@ -58,6 +62,8 @@ func configFunction(c *Configuration) (sourceiface.Source, error) {
c.SubscriptionID,
c.MaxOutstandingMessages,
c.MaxOutstandingBytes,
c.MinExtensionPeriodSeconds,
c.StreamingPullGoRoutines,
)
}

Expand All @@ -74,9 +80,11 @@ func (f adapter) Create(i interface{}) (interface{}, error) {
func (f adapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &Configuration{
ConcurrentWrites: 50,
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9,
ConcurrentWrites: 50,
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9,
MinExtensionPeriodSeconds: 0,
StreamingPullGoRoutines: 1,
}

return cfg, nil
Expand All @@ -101,7 +109,7 @@ var ConfigPair = config.ConfigurationPair{
}

// newPubSubSource creates a new client for reading messages from PubSub
func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string, maxOutstandingMessages, maxOutstandingBytes int) (*pubSubSource, error) {
func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string, maxOutstandingMessages, maxOutstandingBytes int, minExtensionPeriodSeconds int, streamingPullGoRoutines int) (*pubSubSource, error) {
ctx := context.Background()

// Ensures as even as possible distribution of UUIDs
Expand All @@ -113,13 +121,15 @@ func newPubSubSource(concurrentWrites int, projectID string, subscriptionID stri
}

return &pubSubSource{
projectID: projectID,
client: client,
subscriptionID: subscriptionID,
concurrentWrites: concurrentWrites,
maxOutstandingMessages: maxOutstandingMessages,
maxOutstandingBytes: maxOutstandingBytes,
log: log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID}),
projectID: projectID,
client: client,
subscriptionID: subscriptionID,
concurrentWrites: concurrentWrites,
maxOutstandingMessages: maxOutstandingMessages,
maxOutstandingBytes: maxOutstandingBytes,
minExtensionPeriodSeconds: minExtensionPeriodSeconds, // default to off to match client default
streamingPullGoRoutines: streamingPullGoRoutines,
log: log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID}),
}, nil
}

Expand All @@ -130,9 +140,10 @@ func (ps *pubSubSource) Read(sf *sourceiface.SourceFunctions) error {
ps.log.Info("Reading messages from subscription ...")

sub := ps.client.Subscription(ps.subscriptionID)
sub.ReceiveSettings.NumGoroutines = ps.concurrentWrites
sub.ReceiveSettings.MaxOutstandingMessages = ps.maxOutstandingMessages
sub.ReceiveSettings.NumGoroutines = ps.streamingPullGoRoutines // This sets the number of goroutines that can open a streaming pull at once, not the concurrency
sub.ReceiveSettings.MaxOutstandingMessages = ps.concurrentWrites // maxOutstandingMessages limits concurrency
sub.ReceiveSettings.MaxOutstandingBytes = ps.maxOutstandingBytes
sub.ReceiveSettings.MinExtensionPeriod = time.Duration(ps.minExtensionPeriodSeconds) * time.Second

cctx, cancel := context.WithCancel(ctx)

Expand Down
8 changes: 6 additions & 2 deletions pkg/source/pubsub/pubsub_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package pubsubsource

import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -82,6 +83,9 @@ func TestPubSubSource_ReadAndReturnSuccessIntegration(t *testing.T) {

assert.NotNil(pubsubSource)
assert.Nil(err)
if err != nil {
fmt.Println(err.Error())
}
assert.Equal("projects/project-test/subscriptions/test-sub", pubsubSource.GetID())

output := testutil.ReadAndReturnMessages(pubsubSource, 5*time.Second, testutil.DefaultTestWriteBuilder, nil)
Expand Down Expand Up @@ -116,7 +120,7 @@ func TestNewPubSubSource_Success(t *testing.T) {

testutil.InitMockPubsubServer(8010, nil, t)

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9)
pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9, 0, 1)
assert.Nil(err)
assert.IsType(&pubSubSource{}, pubsubSource)
// This should return an error when we can't connect, rather than proceeding to the Write() function before we hit a problem.
Expand All @@ -141,7 +145,7 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) {
}
wg.Wait()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9)
pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9, 0, 1)

assert.NotNil(pubsubSource)
assert.Nil(err)
Expand Down

0 comments on commit e83a70a

Please sign in to comment.