Skip to content

Commit 070e388

Browse files
authored
Retry creating unique index (#613)
Introduce a retry mechanism for creating unique indexes. First, create unique index concurrently. Then wait until Postgres is done with the index creation, periodically checking the view `pg_stat_progress_create_index`. Once it is done, lookup pg_index to see if the index is validated or not. If it is, we are good to go. If not, drop the index and try again. Since we are running select queries on `pg_stat_progress_create_index` and `pg_index` and we are expecting a real output to see the progress, this approach doesn't work with fake db. I have added hardcoded responses for the fake db scenario, so that we can safely process migrations to update virtual schema as well. Not sure if this is a good solution or not. Open for discussions. Tested manually with high load, while 10m rows being inserted. But I wonder if there's a way to add a test for the "high load" scenario.
1 parent c09619a commit 070e388

File tree

5 files changed

+181
-91
lines changed

5 files changed

+181
-91
lines changed

pkg/migrations/duplicate.go

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ const (
4040
dataTypeMismatchErrorCode pq.ErrorCode = "42804"
4141
undefinedFunctionErrorCode pq.ErrorCode = "42883"
4242

43-
cCreateUniqueIndexSQL = `CREATE UNIQUE INDEX CONCURRENTLY %s ON %s (%s)`
4443
cSetDefaultSQL = `ALTER TABLE %s ALTER COLUMN %s SET DEFAULT %s`
4544
cAlterTableAddCheckConstraintSQL = `ALTER TABLE %s ADD CONSTRAINT %s %s NOT VALID`
4645
cAlterTableAddForeignKeySQL = `ALTER TABLE %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s) ON DELETE %s`
@@ -134,12 +133,16 @@ func (d *Duplicator) Duplicate(ctx context.Context) error {
134133
}
135134
}
136135

137-
// Generate SQL to duplicate any unique constraints on the columns
138-
// The constraint is duplicated by adding a unique index on the column concurrently.
136+
// Create indexes for unique constraints on the columns concurrently.
139137
// The index is converted into a unique constraint on migration completion.
140-
for _, sql := range d.stmtBuilder.duplicateUniqueConstraints(d.withoutConstraint, colNames...) {
141-
if _, err := d.conn.ExecContext(ctx, sql); err != nil {
142-
return err
138+
for _, uc := range d.stmtBuilder.table.UniqueConstraints {
139+
if slices.Contains(d.withoutConstraint, uc.Name) {
140+
continue
141+
}
142+
if duplicatedMember, constraintColumns := d.stmtBuilder.allConstraintColumns(uc.Columns, colNames...); duplicatedMember {
143+
if err := createUniqueIndexConcurrently(ctx, d.conn, "", DuplicationName(uc.Name), d.stmtBuilder.table.Name, constraintColumns); err != nil {
144+
return err
145+
}
143146
}
144147
}
145148

@@ -180,23 +183,6 @@ func (d *duplicatorStmtBuilder) duplicateCheckConstraints(withoutConstraint []st
180183
return stmts
181184
}
182185

183-
func (d *duplicatorStmtBuilder) duplicateUniqueConstraints(withoutConstraint []string, colNames ...string) []string {
184-
stmts := make([]string, 0, len(d.table.UniqueConstraints))
185-
for _, uc := range d.table.UniqueConstraints {
186-
if slices.Contains(withoutConstraint, uc.Name) {
187-
continue
188-
}
189-
if duplicatedMember, constraintColumns := d.allConstraintColumns(uc.Columns, colNames...); duplicatedMember {
190-
stmts = append(stmts, fmt.Sprintf(cCreateUniqueIndexSQL,
191-
pq.QuoteIdentifier(DuplicationName(uc.Name)),
192-
pq.QuoteIdentifier(d.table.Name),
193-
strings.Join(quoteColumnNames(constraintColumns), ", "),
194-
))
195-
}
196-
}
197-
return stmts
198-
}
199-
200186
func (d *duplicatorStmtBuilder) duplicateForeignKeyConstraints(withoutConstraint []string, colNames ...string) []string {
201187
stmts := make([]string, 0, len(d.table.ForeignKeys))
202188
for _, fk := range d.table.ForeignKeys {

pkg/migrations/duplicate_test.go

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -102,47 +102,6 @@ func TestDuplicateStmtBuilderCheckConstraints(t *testing.T) {
102102
}
103103
}
104104

105-
func TestDuplicateStmtBuilderUniqueConstraints(t *testing.T) {
106-
d := &duplicatorStmtBuilder{table}
107-
for name, testCases := range map[string]struct {
108-
columns []string
109-
expectedStmts []string
110-
}{
111-
"single column duplicated": {
112-
columns: []string{"city"},
113-
expectedStmts: []string{},
114-
},
115-
"single-column constraint with single column duplicated": {
116-
columns: []string{"email"},
117-
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_email" ON "test_table" ("_pgroll_new_email")`},
118-
},
119-
"single-column constraint with multiple column duplicated": {
120-
columns: []string{"email", "description"},
121-
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_email" ON "test_table" ("_pgroll_new_email")`},
122-
},
123-
"multi-column constraint with single column duplicated": {
124-
columns: []string{"name"},
125-
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_name_nick" ON "test_table" ("_pgroll_new_name", "nick")`},
126-
},
127-
"multi-column constraint with multiple unrelated column duplicated": {
128-
columns: []string{"name", "description"},
129-
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_name_nick" ON "test_table" ("_pgroll_new_name", "nick")`},
130-
},
131-
"multi-column constraint with multiple columns": {
132-
columns: []string{"name", "nick"},
133-
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_name_nick" ON "test_table" ("_pgroll_new_name", "_pgroll_new_nick")`},
134-
},
135-
} {
136-
t.Run(name, func(t *testing.T) {
137-
stmts := d.duplicateUniqueConstraints(nil, testCases.columns...)
138-
assert.Equal(t, len(testCases.expectedStmts), len(stmts))
139-
for _, stmt := range stmts {
140-
assert.True(t, slices.Contains(testCases.expectedStmts, stmt))
141-
}
142-
})
143-
}
144-
}
145-
146105
func TestDuplicateStmtBuilderForeignKeyConstraints(t *testing.T) {
147106
d := &duplicatorStmtBuilder{table}
148107
for name, testCases := range map[string]struct {
@@ -233,4 +192,46 @@ func TestDuplicateStmtBuilderIndexes(t *testing.T) {
233192
}
234193
}
235194

195+
func TestCreateIndexConcurrentlySqlGeneration(t *testing.T) {
196+
for name, testCases := range map[string]struct {
197+
indexName string
198+
schemaName string
199+
tableName string
200+
columns []string
201+
expectedStmt string
202+
}{
203+
"single column with schemaname": {
204+
indexName: "idx_email",
205+
schemaName: "test_sch",
206+
tableName: "test_table",
207+
columns: []string{"email"},
208+
expectedStmt: `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "idx_email" ON "test_sch"."test_table" ("email")`,
209+
},
210+
"single column with no schema name": {
211+
indexName: "idx_email",
212+
tableName: "test_table",
213+
columns: []string{"email"},
214+
expectedStmt: `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "idx_email" ON "test_table" ("email")`,
215+
},
216+
"multi-column with no schema name": {
217+
indexName: "idx_name_city",
218+
tableName: "test_table",
219+
columns: []string{"name", "city"},
220+
expectedStmt: `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "idx_name_city" ON "test_table" ("name", "city")`,
221+
},
222+
"multi-column with schema name": {
223+
indexName: "idx_name_city",
224+
schemaName: "test_sch",
225+
tableName: "test_table",
226+
columns: []string{"id", "name", "city"},
227+
expectedStmt: `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "idx_name_city" ON "test_sch"."test_table" ("id", "name", "city")`,
228+
},
229+
} {
230+
t.Run(name, func(t *testing.T) {
231+
stmt := getCreateUniqueIndexConcurrentlySQL(testCases.indexName, testCases.schemaName, testCases.tableName, testCases.columns)
232+
assert.Equal(t, testCases.expectedStmt, stmt)
233+
})
234+
}
235+
}
236+
236237
func ptr[T any](x T) *T { return &x }

pkg/migrations/index.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
package migrations
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"strings"
9+
"time"
10+
11+
"github.com/lib/pq"
12+
"github.com/xataio/pgroll/pkg/db"
13+
)
14+
15+
func createUniqueIndexConcurrently(ctx context.Context, conn db.DB, schemaName string, indexName string, tableName string, columnNames []string) error {
16+
quotedQualifiedIndexName := pq.QuoteIdentifier(indexName)
17+
if schemaName != "" {
18+
quotedQualifiedIndexName = fmt.Sprintf("%s.%s", pq.QuoteIdentifier(schemaName), pq.QuoteIdentifier(indexName))
19+
}
20+
for retryCount := 0; retryCount < 5; retryCount++ {
21+
// Add a unique index to the new column
22+
// Indexes are created in the same schema with the table automatically. Instead of the qualified one, just pass the index name.
23+
createIndexSQL := getCreateUniqueIndexConcurrentlySQL(indexName, schemaName, tableName, columnNames)
24+
if _, err := conn.ExecContext(ctx, createIndexSQL); err != nil {
25+
return fmt.Errorf("failed to add unique index %q: %w", indexName, err)
26+
}
27+
28+
// Make sure Postgres is done creating the index
29+
isInProgress, err := isIndexInProgress(ctx, conn, quotedQualifiedIndexName)
30+
if err != nil {
31+
return err
32+
}
33+
34+
ticker := time.NewTicker(500 * time.Millisecond)
35+
defer ticker.Stop()
36+
for isInProgress {
37+
<-ticker.C
38+
isInProgress, err = isIndexInProgress(ctx, conn, quotedQualifiedIndexName)
39+
if err != nil {
40+
return err
41+
}
42+
}
43+
44+
// Check pg_index to see if it's valid or not. Break if it's valid.
45+
isValid, err := isIndexValid(ctx, conn, quotedQualifiedIndexName)
46+
if err != nil {
47+
return err
48+
}
49+
50+
if isValid {
51+
// success
52+
return nil
53+
}
54+
55+
// If not valid, since Postgres has already given up validating the index,
56+
// it will remain invalid forever. Drop it and try again.
57+
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP INDEX IF EXISTS %s", quotedQualifiedIndexName))
58+
if err != nil {
59+
return fmt.Errorf("failed to drop index: %w", err)
60+
}
61+
}
62+
63+
// ran out of retries, return an error
64+
return fmt.Errorf("failed to create unique index %q", indexName)
65+
}
66+
67+
func getCreateUniqueIndexConcurrentlySQL(indexName string, schemaName string, tableName string, columnNames []string) string {
68+
// create unique index concurrently
69+
qualifiedTableName := pq.QuoteIdentifier(tableName)
70+
if schemaName != "" {
71+
qualifiedTableName = fmt.Sprintf("%s.%s", pq.QuoteIdentifier(schemaName), pq.QuoteIdentifier(tableName))
72+
}
73+
74+
indexQuery := fmt.Sprintf(
75+
"CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
76+
pq.QuoteIdentifier(indexName),
77+
qualifiedTableName,
78+
strings.Join(quoteColumnNames(columnNames), ", "),
79+
)
80+
81+
return indexQuery
82+
}
83+
84+
func isIndexInProgress(ctx context.Context, conn db.DB, quotedQualifiedIndexName string) (bool, error) {
85+
rows, err := conn.QueryContext(ctx, `SELECT EXISTS(
86+
SELECT * FROM pg_catalog.pg_stat_progress_create_index
87+
WHERE index_relid = $1::regclass
88+
)`, quotedQualifiedIndexName)
89+
if err != nil {
90+
return false, fmt.Errorf("getting index in progress with name %q: %w", quotedQualifiedIndexName, err)
91+
}
92+
if rows == nil {
93+
// if rows == nil && err != nil, then it means we have queried a `FakeDB`.
94+
// In that case, we can safely return false.
95+
return false, nil
96+
}
97+
var isInProgress bool
98+
if err := db.ScanFirstValue(rows, &isInProgress); err != nil {
99+
return false, fmt.Errorf("scanning index in progress with name %q: %w", quotedQualifiedIndexName, err)
100+
}
101+
102+
return isInProgress, nil
103+
}
104+
105+
func isIndexValid(ctx context.Context, conn db.DB, quotedQualifiedIndexName string) (bool, error) {
106+
rows, err := conn.QueryContext(ctx, `SELECT indisvalid
107+
FROM pg_catalog.pg_index
108+
WHERE indexrelid = $1::regclass`,
109+
quotedQualifiedIndexName)
110+
if err != nil {
111+
return false, fmt.Errorf("getting index with name %q: %w", quotedQualifiedIndexName, err)
112+
}
113+
if rows == nil {
114+
// if rows == nil && err != nil, then it means we have queried a fake db.
115+
// In that case, we can safely return true.
116+
return true, nil
117+
}
118+
var isValid bool
119+
if err := db.ScanFirstValue(rows, &isValid); err != nil {
120+
return false, fmt.Errorf("scanning index with name %q: %w", quotedQualifiedIndexName, err)
121+
}
122+
123+
return isValid, nil
124+
}

pkg/migrations/op_create_constraint.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ func (o *OpCreateConstraint) Start(ctx context.Context, conn db.DB, latestSchema
6767

6868
switch o.Type {
6969
case OpCreateConstraintTypeUnique:
70-
return table, o.addUniqueIndex(ctx, conn)
70+
temporaryColumnNames := make([]string, len(o.Columns))
71+
for i, col := range o.Columns {
72+
temporaryColumnNames[i] = TemporaryName(col)
73+
}
74+
return table, createUniqueIndexConcurrently(ctx, conn, s.Name, o.Name, o.Table, temporaryColumnNames)
7175
case OpCreateConstraintTypeCheck:
7276
return table, o.addCheckConstraint(ctx, conn)
7377
case OpCreateConstraintTypeForeignKey:
@@ -232,16 +236,6 @@ func (o *OpCreateConstraint) Validate(ctx context.Context, s *schema.Schema) err
232236
return nil
233237
}
234238

235-
func (o *OpCreateConstraint) addUniqueIndex(ctx context.Context, conn db.DB) error {
236-
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
237-
pq.QuoteIdentifier(o.Name),
238-
pq.QuoteIdentifier(o.Table),
239-
strings.Join(quotedTemporaryNames(o.Columns), ", "),
240-
))
241-
242-
return err
243-
}
244-
245239
func (o *OpCreateConstraint) addCheckConstraint(ctx context.Context, conn db.DB) error {
246240
_, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s ADD CONSTRAINT %s CHECK (%s) NOT VALID",
247241
pq.QuoteIdentifier(o.Table),

pkg/migrations/op_set_unique.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,7 @@ func (o *OpSetUnique) Start(ctx context.Context, conn db.DB, latestSchema string
2525
table := s.GetTable(o.Table)
2626
column := table.GetColumn(o.Column)
2727

28-
// Add a unique index to the new column
29-
if err := addUniqueIndex(ctx, conn, table.Name, column.Name, o.Name); err != nil {
30-
return nil, fmt.Errorf("failed to add unique index: %w", err)
31-
}
32-
33-
return table, nil
28+
return table, createUniqueIndexConcurrently(ctx, conn, s.Name, o.Name, table.Name, []string{column.Name})
3429
}
3530

3631
func (o *OpSetUnique) Complete(ctx context.Context, conn db.DB, tr SQLTransformer, s *schema.Schema) error {
@@ -73,13 +68,3 @@ func (o *OpSetUnique) Validate(ctx context.Context, s *schema.Schema) error {
7368

7469
return nil
7570
}
76-
77-
func addUniqueIndex(ctx context.Context, conn db.DB, table, column, name string) error {
78-
// create unique index concurrently
79-
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
80-
pq.QuoteIdentifier(name),
81-
pq.QuoteIdentifier(table),
82-
pq.QuoteIdentifier(column)))
83-
84-
return err
85-
}

0 commit comments

Comments
 (0)