Skip to content

Commit

Permalink
Test for frozen transaction id in each batch
Browse files Browse the repository at this point in the history
Ensure rows logically preceding the frozen transaction id for the table
are included in the batch of rows to be backfilled.
  • Loading branch information
andrew-farries committed Jan 29, 2025
1 parent 1ed7f7c commit 4bed314
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (bf *Backfill) Start(ctx context.Context, table *schema.Table) error {
PrimaryKey: identityColumns,
BatchSize: bf.batchSize,
TransactionID: xid,
Schema: bf.schema,
StateSchema: bf.stateSchema,
},
}
Expand Down
1 change: 1 addition & 0 deletions pkg/backfill/templates/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type BatchConfig struct {
LastValue []string
BatchSize int
TransactionID int64
Schema string
StateSchema string
}

Expand Down
24 changes: 20 additions & 4 deletions pkg/backfill/templates/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func TestBatchStatementBuilder(t *testing.T) {
TableName: "table_name",
PrimaryKey: []string{"id"},
BatchSize: 10,
Schema: "public",
StateSchema: "pgroll",
TransactionID: 1234,
},
Expand All @@ -28,6 +29,7 @@ func TestBatchStatementBuilder(t *testing.T) {
TableName: "table_name",
PrimaryKey: []string{"id", "zip"},
BatchSize: 10,
Schema: "public",
StateSchema: "pgroll",
TransactionID: 1234,
},
Expand All @@ -39,6 +41,7 @@ func TestBatchStatementBuilder(t *testing.T) {
PrimaryKey: []string{"id"},
LastValue: []string{"1"},
BatchSize: 10,
Schema: "public",
StateSchema: "pgroll",
TransactionID: 1234,
},
Expand All @@ -50,6 +53,7 @@ func TestBatchStatementBuilder(t *testing.T) {
PrimaryKey: []string{"id", "zip"},
LastValue: []string{"1", "1234"},
BatchSize: 10,
Schema: "public",
StateSchema: "pgroll",
TransactionID: 1234,
},
Expand All @@ -71,7 +75,10 @@ const expectSingleIDColumnNoLastValue = `WITH batch AS
(
SELECT "id"
FROM "table_name"
WHERE "pgroll".b_follows_a(xmin::text::bigint, 1234)
WHERE (
"pgroll".b_follows_a(xmin::text::bigint, 1234) OR
"pgroll".b_follows_a(xmin::text::bigint, "pgroll".frozen_xid('public', 'table_name')::text::bigint)
)
ORDER BY "id"
LIMIT 10
FOR NO KEY UPDATE
Expand All @@ -92,7 +99,10 @@ const multipleIDColumnsNoLastValue = `WITH batch AS
(
SELECT "id", "zip"
FROM "table_name"
WHERE "pgroll".b_follows_a(xmin::text::bigint, 1234)
WHERE (
"pgroll".b_follows_a(xmin::text::bigint, 1234) OR
"pgroll".b_follows_a(xmin::text::bigint, "pgroll".frozen_xid('public', 'table_name')::text::bigint)
)
ORDER BY "id", "zip"
LIMIT 10
FOR NO KEY UPDATE
Expand All @@ -113,7 +123,10 @@ const singleIDColumnWithLastValue = `WITH batch AS
(
SELECT "id"
FROM "table_name"
WHERE "pgroll".b_follows_a(xmin::text::bigint, 1234)
WHERE (
"pgroll".b_follows_a(xmin::text::bigint, 1234) OR
"pgroll".b_follows_a(xmin::text::bigint, "pgroll".frozen_xid('public', 'table_name')::text::bigint)
)
AND ("id") > ('1')
ORDER BY "id"
LIMIT 10
Expand All @@ -135,7 +148,10 @@ const multipleIDColumnsWithLastValue = `WITH batch AS
(
SELECT "id", "zip"
FROM "table_name"
WHERE "pgroll".b_follows_a(xmin::text::bigint, 1234)
WHERE (
"pgroll".b_follows_a(xmin::text::bigint, 1234) OR
"pgroll".b_follows_a(xmin::text::bigint, "pgroll".frozen_xid('public', 'table_name')::text::bigint)
)
AND ("id", "zip") > ('1', '1234')
ORDER BY "id", "zip"
LIMIT 10
Expand Down
5 changes: 4 additions & 1 deletion pkg/backfill/templates/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ const SQL = `WITH batch AS
(
SELECT {{ commaSeparate (quoteIdentifiers .PrimaryKey) }}
FROM {{ .TableName | qi}}
WHERE {{ .StateSchema | qi }}.b_follows_a(xmin::text::bigint, {{ .TransactionID }})
WHERE (
{{ .StateSchema | qi }}.b_follows_a(xmin::text::bigint, {{ .TransactionID }}) OR
{{ .StateSchema | qi }}.b_follows_a(xmin::text::bigint, {{ .StateSchema | qi }}.frozen_xid({{ .Schema | ql }}, {{ .TableName | ql }})::text::bigint)
)
{{ if .LastValue -}}
AND ({{ commaSeparate (quoteIdentifiers .PrimaryKey) }}) > ({{ commaSeparate (quoteLiterals .LastValue) }})
{{ end -}}
Expand Down

0 comments on commit 4bed314

Please sign in to comment.