Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry creating unique index #613

Merged
merged 11 commits into from
Jan 24, 2025
32 changes: 9 additions & 23 deletions pkg/migrations/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const (
dataTypeMismatchErrorCode pq.ErrorCode = "42804"
undefinedFunctionErrorCode pq.ErrorCode = "42883"

cCreateUniqueIndexSQL = `CREATE UNIQUE INDEX CONCURRENTLY %s ON %s (%s)`
cSetDefaultSQL = `ALTER TABLE %s ALTER COLUMN %s SET DEFAULT %s`
cAlterTableAddCheckConstraintSQL = `ALTER TABLE %s ADD CONSTRAINT %s %s NOT VALID`
cAlterTableAddForeignKeySQL = `ALTER TABLE %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s) ON DELETE %s`
Expand Down Expand Up @@ -134,12 +133,16 @@ func (d *Duplicator) Duplicate(ctx context.Context) error {
}
}

// Generate SQL to duplicate any unique constraints on the columns
// The constraint is duplicated by adding a unique index on the column concurrently.
// Create indexes for unique constraints on the columns concurrently.
// The index is converted into a unique constraint on migration completion.
for _, sql := range d.stmtBuilder.duplicateUniqueConstraints(d.withoutConstraint, colNames...) {
if _, err := d.conn.ExecContext(ctx, sql); err != nil {
return err
for _, uc := range d.stmtBuilder.table.UniqueConstraints {
if slices.Contains(d.withoutConstraint, uc.Name) {
continue
}
if duplicatedMember, constraintColumns := d.stmtBuilder.allConstraintColumns(uc.Columns, colNames...); duplicatedMember {
if err := createUniqueIndexConcurrently(ctx, d.conn, "", DuplicationName(uc.Name), d.stmtBuilder.table.Name, constraintColumns); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

andrew-farries marked this conversation as resolved.
Show resolved Hide resolved
return err
}
}
}

Expand Down Expand Up @@ -180,23 +183,6 @@ func (d *duplicatorStmtBuilder) duplicateCheckConstraints(withoutConstraint []st
return stmts
}

func (d *duplicatorStmtBuilder) duplicateUniqueConstraints(withoutConstraint []string, colNames ...string) []string {
stmts := make([]string, 0, len(d.table.UniqueConstraints))
for _, uc := range d.table.UniqueConstraints {
if slices.Contains(withoutConstraint, uc.Name) {
continue
}
if duplicatedMember, constraintColumns := d.allConstraintColumns(uc.Columns, colNames...); duplicatedMember {
stmts = append(stmts, fmt.Sprintf(cCreateUniqueIndexSQL,
pq.QuoteIdentifier(DuplicationName(uc.Name)),
pq.QuoteIdentifier(d.table.Name),
strings.Join(quoteColumnNames(constraintColumns), ", "),
))
}
}
return stmts
}

func (d *duplicatorStmtBuilder) duplicateForeignKeyConstraints(withoutConstraint []string, colNames ...string) []string {
stmts := make([]string, 0, len(d.table.ForeignKeys))
for _, fk := range d.table.ForeignKeys {
Expand Down
83 changes: 42 additions & 41 deletions pkg/migrations/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,47 +102,6 @@ func TestDuplicateStmtBuilderCheckConstraints(t *testing.T) {
}
}

func TestDuplicateStmtBuilderUniqueConstraints(t *testing.T) {
andrew-farries marked this conversation as resolved.
Show resolved Hide resolved
d := &duplicatorStmtBuilder{table}
for name, testCases := range map[string]struct {
columns []string
expectedStmts []string
}{
"single column duplicated": {
columns: []string{"city"},
expectedStmts: []string{},
},
"single-column constraint with single column duplicated": {
columns: []string{"email"},
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_email" ON "test_table" ("_pgroll_new_email")`},
},
"single-column constraint with multiple column duplicated": {
columns: []string{"email", "description"},
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_email" ON "test_table" ("_pgroll_new_email")`},
},
"multi-column constraint with single column duplicated": {
columns: []string{"name"},
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_name_nick" ON "test_table" ("_pgroll_new_name", "nick")`},
},
"multi-column constraint with multiple unrelated column duplicated": {
columns: []string{"name", "description"},
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_name_nick" ON "test_table" ("_pgroll_new_name", "nick")`},
},
"multi-column constraint with multiple columns": {
columns: []string{"name", "nick"},
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_name_nick" ON "test_table" ("_pgroll_new_name", "_pgroll_new_nick")`},
},
} {
t.Run(name, func(t *testing.T) {
stmts := d.duplicateUniqueConstraints(nil, testCases.columns...)
assert.Equal(t, len(testCases.expectedStmts), len(stmts))
for _, stmt := range stmts {
assert.True(t, slices.Contains(testCases.expectedStmts, stmt))
}
})
}
}

func TestDuplicateStmtBuilderForeignKeyConstraints(t *testing.T) {
d := &duplicatorStmtBuilder{table}
for name, testCases := range map[string]struct {
Expand Down Expand Up @@ -233,4 +192,46 @@ func TestDuplicateStmtBuilderIndexes(t *testing.T) {
}
}

func TestCreateIndexConcurrentlySqlGeneration(t *testing.T) {
for name, testCases := range map[string]struct {
indexName string
schemaName string
tableName string
columns []string
expectedStmt string
}{
"single column with schemaname": {
indexName: "idx_email",
schemaName: "test_sch",
tableName: "test_table",
columns: []string{"email"},
expectedStmt: `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "idx_email" ON "test_sch"."test_table" ("email")`,
},
"single column with no schema name": {
indexName: "idx_email",
tableName: "test_table",
columns: []string{"email"},
expectedStmt: `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "idx_email" ON "test_table" ("email")`,
},
"multi-column with no schema name": {
indexName: "idx_name_city",
tableName: "test_table",
columns: []string{"name", "city"},
expectedStmt: `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "idx_name_city" ON "test_table" ("name", "city")`,
},
"multi-column with schema name": {
indexName: "idx_name_city",
schemaName: "test_sch",
tableName: "test_table",
columns: []string{"id", "name", "city"},
expectedStmt: `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "idx_name_city" ON "test_sch"."test_table" ("id", "name", "city")`,
},
} {
t.Run(name, func(t *testing.T) {
stmt := getCreateUniqueIndexConcurrentlySQL(testCases.indexName, testCases.schemaName, testCases.tableName, testCases.columns)
assert.Equal(t, testCases.expectedStmt, stmt)
})
}
}

func ptr[T any](x T) *T { return &x }
124 changes: 124 additions & 0 deletions pkg/migrations/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// SPDX-License-Identifier: Apache-2.0

package migrations

import (
"context"
"fmt"
"strings"
"time"

"github.com/lib/pq"
"github.com/xataio/pgroll/pkg/db"
)

func createUniqueIndexConcurrently(ctx context.Context, conn db.DB, schemaName string, indexName string, tableName string, columnNames []string) error {
quotedQualifiedIndexName := pq.QuoteIdentifier(indexName)
if schemaName != "" {
quotedQualifiedIndexName = fmt.Sprintf("%s.%s", pq.QuoteIdentifier(schemaName), pq.QuoteIdentifier(indexName))
}
for retryCount := 0; retryCount < 5; retryCount++ {
// Add a unique index to the new column
// Indexes are created in the same schema with the table automatically. Instead of the qualified one, just pass the index name.
createIndexSQL := getCreateUniqueIndexConcurrentlySQL(indexName, schemaName, tableName, columnNames)
if _, err := conn.ExecContext(ctx, createIndexSQL); err != nil {
return fmt.Errorf("failed to add unique index %q: %w", indexName, err)
}

// Make sure Postgres is done creating the index
isInProgress, err := isIndexInProgress(ctx, conn, quotedQualifiedIndexName)
if err != nil {
return err
}

ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for isInProgress {
<-ticker.C
isInProgress, err = isIndexInProgress(ctx, conn, quotedQualifiedIndexName)
if err != nil {
return err
}
}

// Check pg_index to see if it's valid or not. Break if it's valid.
isValid, err := isIndexValid(ctx, conn, quotedQualifiedIndexName)
if err != nil {
return err
}

if isValid {
// success
return nil
}

// If not valid, since Postgres has already given up validating the index,
// it will remain invalid forever. Drop it and try again.
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP INDEX IF EXISTS %s", quotedQualifiedIndexName))
if err != nil {
return fmt.Errorf("failed to drop index: %w", err)
}
}

// ran out of retries, return an error
return fmt.Errorf("failed to create unique index %q", indexName)
}

func getCreateUniqueIndexConcurrentlySQL(indexName string, schemaName string, tableName string, columnNames []string) string {
// create unique index concurrently
qualifiedTableName := pq.QuoteIdentifier(tableName)
if schemaName != "" {
qualifiedTableName = fmt.Sprintf("%s.%s", pq.QuoteIdentifier(schemaName), pq.QuoteIdentifier(tableName))
}

indexQuery := fmt.Sprintf(
"CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
pq.QuoteIdentifier(indexName),
qualifiedTableName,
strings.Join(quoteColumnNames(columnNames), ", "),
)

return indexQuery
}

func isIndexInProgress(ctx context.Context, conn db.DB, quotedQualifiedIndexName string) (bool, error) {
rows, err := conn.QueryContext(ctx, `SELECT EXISTS(
SELECT * FROM pg_catalog.pg_stat_progress_create_index
WHERE index_relid = $1::regclass
)`, quotedQualifiedIndexName)
if err != nil {
return false, fmt.Errorf("getting index in progress with name %q: %w", quotedQualifiedIndexName, err)
}
if rows == nil {
// if rows == nil && err != nil, then it means we have queried a `FakeDB`.
// In that case, we can safely return false.
return false, nil
}
var isInProgress bool
if err := db.ScanFirstValue(rows, &isInProgress); err != nil {
return false, fmt.Errorf("scanning index in progress with name %q: %w", quotedQualifiedIndexName, err)
}

return isInProgress, nil
}

func isIndexValid(ctx context.Context, conn db.DB, quotedQualifiedIndexName string) (bool, error) {
rows, err := conn.QueryContext(ctx, `SELECT indisvalid
FROM pg_catalog.pg_index
WHERE indexrelid = $1::regclass`,
quotedQualifiedIndexName)
if err != nil {
return false, fmt.Errorf("getting index with name %q: %w", quotedQualifiedIndexName, err)
}
if rows == nil {
// if rows == nil && err != nil, then it means we have queried a fake db.
// In that case, we can safely return true.
return true, nil
}
var isValid bool
if err := db.ScanFirstValue(rows, &isValid); err != nil {
return false, fmt.Errorf("scanning index with name %q: %w", quotedQualifiedIndexName, err)
}

return isValid, nil
}
16 changes: 5 additions & 11 deletions pkg/migrations/op_create_constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ func (o *OpCreateConstraint) Start(ctx context.Context, conn db.DB, latestSchema

switch o.Type {
case OpCreateConstraintTypeUnique:
return table, o.addUniqueIndex(ctx, conn)
temporaryColumnNames := make([]string, len(o.Columns))
for i, col := range o.Columns {
temporaryColumnNames[i] = TemporaryName(col)
}
return table, createUniqueIndexConcurrently(ctx, conn, s.Name, o.Name, o.Table, temporaryColumnNames)
case OpCreateConstraintTypeCheck:
return table, o.addCheckConstraint(ctx, conn)
case OpCreateConstraintTypeForeignKey:
Expand Down Expand Up @@ -232,16 +236,6 @@ func (o *OpCreateConstraint) Validate(ctx context.Context, s *schema.Schema) err
return nil
}

func (o *OpCreateConstraint) addUniqueIndex(ctx context.Context, conn db.DB) error {
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
pq.QuoteIdentifier(o.Name),
pq.QuoteIdentifier(o.Table),
strings.Join(quotedTemporaryNames(o.Columns), ", "),
))

return err
}

func (o *OpCreateConstraint) addCheckConstraint(ctx context.Context, conn db.DB) error {
_, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s ADD CONSTRAINT %s CHECK (%s) NOT VALID",
pq.QuoteIdentifier(o.Table),
Expand Down
17 changes: 1 addition & 16 deletions pkg/migrations/op_set_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@ func (o *OpSetUnique) Start(ctx context.Context, conn db.DB, latestSchema string
table := s.GetTable(o.Table)
column := table.GetColumn(o.Column)

// Add a unique index to the new column
if err := addUniqueIndex(ctx, conn, table.Name, column.Name, o.Name); err != nil {
return nil, fmt.Errorf("failed to add unique index: %w", err)
}

return table, nil
return table, createUniqueIndexConcurrently(ctx, conn, s.Name, o.Name, table.Name, []string{column.Name})
}

func (o *OpSetUnique) Complete(ctx context.Context, conn db.DB, tr SQLTransformer, s *schema.Schema) error {
Expand Down Expand Up @@ -73,13 +68,3 @@ func (o *OpSetUnique) Validate(ctx context.Context, s *schema.Schema) error {

return nil
}

func addUniqueIndex(ctx context.Context, conn db.DB, table, column, name string) error {
// create unique index concurrently
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
pq.QuoteIdentifier(name),
pq.QuoteIdentifier(table),
pq.QuoteIdentifier(column)))

return err
}