diff --git a/pkg/backfill/backfill.go b/pkg/backfill/backfill.go index b4d8e24d..3d45eb72 100644 --- a/pkg/backfill/backfill.go +++ b/pkg/backfill/backfill.go @@ -71,6 +71,7 @@ func (bf *Backfill) Start(ctx context.Context, table *schema.Table) error { PrimaryKey: identityColumns, BatchSize: bf.batchSize, TransactionID: xid, + Schema: bf.schema, StateSchema: bf.stateSchema, }, } diff --git a/pkg/backfill/templates/build.go b/pkg/backfill/templates/build.go index ebc19cd5..04ff6846 100644 --- a/pkg/backfill/templates/build.go +++ b/pkg/backfill/templates/build.go @@ -16,6 +16,7 @@ type BatchConfig struct { LastValue []string BatchSize int TransactionID int64 + Schema string StateSchema string } diff --git a/pkg/backfill/templates/build_test.go b/pkg/backfill/templates/build_test.go index d5d4c185..e167437e 100644 --- a/pkg/backfill/templates/build_test.go +++ b/pkg/backfill/templates/build_test.go @@ -18,6 +18,7 @@ func TestBatchStatementBuilder(t *testing.T) { TableName: "table_name", PrimaryKey: []string{"id"}, BatchSize: 10, + Schema: "public", StateSchema: "pgroll", TransactionID: 1234, }, @@ -28,6 +29,7 @@ func TestBatchStatementBuilder(t *testing.T) { TableName: "table_name", PrimaryKey: []string{"id", "zip"}, BatchSize: 10, + Schema: "public", StateSchema: "pgroll", TransactionID: 1234, }, @@ -39,6 +41,7 @@ func TestBatchStatementBuilder(t *testing.T) { PrimaryKey: []string{"id"}, LastValue: []string{"1"}, BatchSize: 10, + Schema: "public", StateSchema: "pgroll", TransactionID: 1234, }, @@ -50,6 +53,7 @@ func TestBatchStatementBuilder(t *testing.T) { PrimaryKey: []string{"id", "zip"}, LastValue: []string{"1", "1234"}, BatchSize: 10, + Schema: "public", StateSchema: "pgroll", TransactionID: 1234, }, @@ -71,7 +75,10 @@ const expectSingleIDColumnNoLastValue = `WITH batch AS ( SELECT "id" FROM "table_name" - WHERE "pgroll".b_follows_a(xmin::text::bigint, 1234) + WHERE ( + "pgroll".b_follows_a(xmin::text::bigint, 1234) OR + "pgroll".b_follows_a(xmin::text::bigint, "pgroll".frozen_xid('public', 'table_name')::text::bigint) + ) ORDER BY "id" LIMIT 10 FOR NO KEY UPDATE @@ -92,7 +99,10 @@ const multipleIDColumnsNoLastValue = `WITH batch AS ( SELECT "id", "zip" FROM "table_name" - WHERE "pgroll".b_follows_a(xmin::text::bigint, 1234) + WHERE ( + "pgroll".b_follows_a(xmin::text::bigint, 1234) OR + "pgroll".b_follows_a(xmin::text::bigint, "pgroll".frozen_xid('public', 'table_name')::text::bigint) + ) ORDER BY "id", "zip" LIMIT 10 FOR NO KEY UPDATE @@ -113,7 +123,10 @@ const singleIDColumnWithLastValue = `WITH batch AS ( SELECT "id" FROM "table_name" - WHERE "pgroll".b_follows_a(xmin::text::bigint, 1234) + WHERE ( + "pgroll".b_follows_a(xmin::text::bigint, 1234) OR + "pgroll".b_follows_a(xmin::text::bigint, "pgroll".frozen_xid('public', 'table_name')::text::bigint) + ) AND ("id") > ('1') ORDER BY "id" LIMIT 10 @@ -135,7 +148,10 @@ const multipleIDColumnsWithLastValue = `WITH batch AS ( SELECT "id", "zip" FROM "table_name" - WHERE "pgroll".b_follows_a(xmin::text::bigint, 1234) + WHERE ( + "pgroll".b_follows_a(xmin::text::bigint, 1234) OR + "pgroll".b_follows_a(xmin::text::bigint, "pgroll".frozen_xid('public', 'table_name')::text::bigint) + ) AND ("id", "zip") > ('1', '1234') ORDER BY "id", "zip" LIMIT 10 diff --git a/pkg/backfill/templates/sql.go b/pkg/backfill/templates/sql.go index 9eef62c3..9db50ebf 100644 --- a/pkg/backfill/templates/sql.go +++ b/pkg/backfill/templates/sql.go @@ -6,7 +6,10 @@ const SQL = `WITH batch AS ( SELECT {{ commaSeparate (quoteIdentifiers .PrimaryKey) }} FROM {{ .TableName | qi}} - WHERE {{ .StateSchema | qi }}.b_follows_a(xmin::text::bigint, {{ .TransactionID }}) + WHERE ( + {{ .StateSchema | qi }}.b_follows_a(xmin::text::bigint, {{ .TransactionID }}) OR + {{ .StateSchema | qi }}.b_follows_a(xmin::text::bigint, {{ .StateSchema | qi }}.frozen_xid({{ .Schema | ql }}, {{ .TableName | ql }})::text::bigint) + ) {{ if .LastValue -}} AND ({{ commaSeparate (quoteIdentifiers .PrimaryKey) }}) > ({{ commaSeparate (quoteLiterals .LastValue) }}) {{ end -}}