Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: clean imports #796

Merged
merged 2 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions sqlcon/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,14 @@
package sqlcon

import (
"fmt"
"net/url"
"runtime"
"strings"
)

func cleanURLQuery(in url.Values) (out url.Values) {
out, _ = url.ParseQuery(in.Encode())
out.Del("max_conns")
out.Del("max_idle_conns")
out.Del("max_conn_lifetime")
out.Del("max_idle_conn_time")
out.Del("parseTime")
return out
}

// GetDriverName returns the driver name of a given DSN.
func GetDriverName(dsn string) string {
return strings.Split(dsn, "://")[0]
}

func classifyDSN(dsn string) string {
scheme := strings.Split(dsn, "://")[0]
parts := strings.Split(dsn, "@")
host := parts[len(parts)-1]
return fmt.Sprintf("%s://*:*@%s", scheme, host)
}

func maxParallelism() int {
maxProcs := runtime.GOMAXPROCS(0)
numCPU := runtime.NumCPU()
Expand Down
42 changes: 0 additions & 42 deletions sqlcon/connector_test.go

This file was deleted.

15 changes: 10 additions & 5 deletions watcherx/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"strings"
"time"

// Import driver
_ "github.com/jackc/pgx/v4/stdlib"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/tidwall/gjson"
Expand All @@ -27,6 +25,13 @@ type row struct {
value string
}

// NewChangeFeedConnection opens a new connection to the database and enables the CHANGEFEED feature.
//
// The caller is responsible for closing the connection when done.
//
// You must register the `pgx` driver before calling this function:
//
// import _ "github.com/jackc/pgx/v4/stdlib"
func NewChangeFeedConnection(ctx context.Context, l *logrusx.Logger, dsn string) (*sqlx.DB, error) {
if !strings.HasPrefix(dsn, "cockroach://") {
return nil, errors.Errorf("DSN value must be prefixed with a cockroach URI schema")
Expand Down Expand Up @@ -75,7 +80,7 @@ const heartBeatInterval = time.Second
// This function spawns the necessary go-routines to process the change-feed events and deduplicate them.
func WatchChangeFeed(ctx context.Context, cx *sqlx.DB, tableName string, out EventChannel, cursor time.Time) (_ Watcher, err error) {
c := make(EventChannel)
deduplicate(ctx, c, out, 100)
InternalDeduplicate(ctx, c, out, 100)

var rows *sql.Rows
if cursor.IsZero() {
Expand Down Expand Up @@ -213,13 +218,13 @@ func WatchChangeFeed(ctx context.Context, cx *sqlx.DB, tableName string, out Eve
return d, nil
}

// deduplicate sents events from `events` to the `deduplicated` channel, but
// InternalDeduplicate sents events from `events` to the `deduplicated` channel, but
// deduplicates events that are sent multiple times. This is necessary, because
// the CochroachDB changefeed has a atleast-once guarantee for change events,
// meaning that events could be sent multiple times.
//
// For deduplication, the last x `pastEvents` are considered.
func deduplicate(ctx context.Context, in <-chan Event, out chan<- Event, pastEvents int) {
func InternalDeduplicate(ctx context.Context, in <-chan Event, out chan<- Event, pastEvents int) {
go func() {
previous := newRingBuffer(pastEvents)

Expand Down
13 changes: 6 additions & 7 deletions watcherx/changefeed_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © 2023 Ory Corp
// SPDX-License-Identifier: Apache-2.0

package watcherx
package watcherx_test

import (
"context"
Expand All @@ -18,6 +18,7 @@ import (
"github.com/tidwall/gjson"

"github.com/ory/x/logrusx"
. "github.com/ory/x/watcherx"
)

func TestWatchChangeFeed(t *testing.T) {
Expand Down Expand Up @@ -177,9 +178,7 @@ func Test_deduplicate(t *testing.T) {

events := make([]Event, 3)
for i := range events {
events[i] = &ErrorEvent{
source: source(fmt.Sprintf("Event %d", i)),
}
events[i] = NewErrorEvent(nil, fmt.Sprintf("Event %d", i))
}

t.Run("case=proxies", func(t *testing.T) {
Expand All @@ -188,7 +187,7 @@ func Test_deduplicate(t *testing.T) {
eventCh := make(EventChannel)
deduplicatedEvents := make(EventChannel)

deduplicate(childCtx, eventCh, deduplicatedEvents, len(events))
InternalDeduplicate(childCtx, eventCh, deduplicatedEvents, len(events))
go send(childCtx, eventCh, events)
received := recv(ctx, deduplicatedEvents)

Expand All @@ -203,7 +202,7 @@ func Test_deduplicate(t *testing.T) {

duplicateEvents := append(events, events...)

deduplicate(childCtx, eventCh, deduplicatedEvents, len(events))
InternalDeduplicate(childCtx, eventCh, deduplicatedEvents, len(events))
go send(childCtx, eventCh, duplicateEvents)
received := recv(ctx, deduplicatedEvents)

Expand All @@ -220,7 +219,7 @@ func Test_deduplicate(t *testing.T) {
duplicateEvents = append(duplicateEvents, events[0])
expectedEvents := append(events, events[0])

deduplicate(childCtx, eventCh, deduplicatedEvents, len(events)-1)
InternalDeduplicate(childCtx, eventCh, deduplicatedEvents, len(events)-1)
go send(childCtx, eventCh, duplicateEvents)
received := recv(ctx, deduplicatedEvents)

Expand Down
7 changes: 7 additions & 0 deletions watcherx/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ type (
}
)

func NewErrorEvent(err error, source_ string) *ErrorEvent {
return &ErrorEvent{
error: err,
source: source(source_),
}
}

const (
serialTypeChange serialEventType = "change"
serialTypeRemove serialEventType = "remove"
Expand Down
Loading