Skip to content

Commit 9956f87

Browse files
dnovitskiCopilot
authored andcommitted
Fix OOM when allEventsUpToLockProcessed buffer equals MaxRetries()
PR github#1637 buffered allEventsUpToLockProcessed to MaxRetries() to prevent a goroutine deadlock when waitForEventsUpToLock times out during cutover. However, when --default-retries is set to a very large value (e.g. 9999999999999), Go tries to allocate a channel with trillions of buffer slots, causing an immediate OOM crash before the migration even starts. Replace the MaxRetries()-sized buffer with a buffer of 1 and overwrite-oldest (latest-wins) send semantics. When the buffer is full (receiver timed out on a previous attempt), the stale message is drained before sending the current sentinel. This guarantees: - The current sentinel is always delivered (no message loss) - The executeWriteFuncs worker is never blocked (no deadlock) - Memory usage is constant regardless of MaxRetries() (no OOM) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 4deeadf commit 9956f87

3 files changed

Lines changed: 90 additions & 7 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
/.vendor/
55
.idea/
66
*.tmp
7+
gh-ost

go/logic/migrator.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,10 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator {
111111
ghostTableMigrated: make(chan bool),
112112
firstThrottlingCollected: make(chan bool, 3),
113113
rowCopyComplete: make(chan error),
114-
// Buffered to MaxRetries() to prevent a deadlock when waitForEventsUpToLock times out.
115-
// (see https://github.com/github/gh-ost/pull/1637)
116-
allEventsUpToLockProcessed: make(chan *lockProcessedStruct, context.MaxRetries()),
114+
// Buffered with capacity 1; the send uses overwrite-oldest semantics
115+
// to prevent both deadlock (see https://github.com/github/gh-ost/pull/1637)
116+
// and OOM when MaxRetries() is extremely large.
117+
allEventsUpToLockProcessed: make(chan *lockProcessedStruct, 1),
117118

118119
copyRowsQueue: make(chan tableWriteFunc),
119120
applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize),
@@ -276,11 +277,32 @@ func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e
276277
// Use helper to prevent deadlock if migration aborts before receiver is ready
277278
_ = base.SendWithContext(this.migrationContext.GetContext(), this.ghostTableMigrated, true)
278279
case AllEventsUpToLockProcessed:
280+
lps := &lockProcessedStruct{
281+
state: changelogStateString,
282+
coords: dmlEntry.Coordinates.Clone(),
283+
}
279284
var applyEventFunc tableWriteFunc = func() error {
280-
return base.SendWithContext(this.migrationContext.GetContext(), this.allEventsUpToLockProcessed, &lockProcessedStruct{
281-
state: changelogStateString,
282-
coords: dmlEntry.Coordinates.Clone(),
283-
})
285+
// Non-blocking send with overwrite-oldest semantics: if the buffer is
286+
// full (receiver timed out on a previous attempt), drain the stale
287+
// message first so the current sentinel is always delivered. This
288+
// prevents both goroutine leaks (the original PR #1637 issue) and OOM
289+
// when MaxRetries() is very large.
290+
select {
291+
case this.allEventsUpToLockProcessed <- lps:
292+
default:
293+
// Buffer full — drain the stale value, then send the current one.
294+
select {
295+
case <-this.allEventsUpToLockProcessed:
296+
default:
297+
}
298+
select {
299+
case this.allEventsUpToLockProcessed <- lps:
300+
default:
301+
// Concurrent drain by another goroutine or receiver; the current
302+
// value is no longer needed since a newer sentinel will follow.
303+
}
304+
}
305+
return nil
284306
}
285307
// at this point we know all events up to lock have been read from the streamer,
286308
// because the streamer works sequentially. So those events are either already handled,

go/logic/migrator_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,66 @@ func TestMigratorOnChangelogEvent(t *testing.T) {
8181
wg.Wait()
8282
})
8383

84+
t.Run("state-AllEventsUpToLockProcessed-overwrite-oldest", func(t *testing.T) {
85+
// Simulate the scenario where the receiver (waitForEventsUpToLock) timed out
86+
// and a stale message sits in the channel buffer. The next sentinel must
87+
// overwrite the stale one so the current attempt's message is delivered.
88+
m := NewMigrator(base.NewMigrationContext(), "test")
89+
m.applier = NewApplier(m.migrationContext)
90+
91+
sendChangelogEvent := func(challenge string) {
92+
columnValues := sql.ToColumnValues([]interface{}{
93+
123,
94+
time.Now().Unix(),
95+
"state",
96+
challenge,
97+
})
98+
require.NoError(t, m.onChangelogEvent(&binlog.BinlogEntry{
99+
DmlEvent: &binlog.BinlogDMLEvent{
100+
DatabaseName: "test",
101+
DML: binlog.InsertDML,
102+
NewColumnValues: columnValues},
103+
Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)),
104+
}))
105+
}
106+
107+
executeWriteFunc := func() {
108+
es := <-m.applyEventsQueue
109+
require.NotNil(t, es.writeFunc)
110+
require.NoError(t, (*es.writeFunc)())
111+
}
112+
113+
// Attempt 1: send sentinel and execute the writeFunc to deliver it
114+
sendChangelogEvent("AllEventsUpToLockProcessed:attempt1")
115+
executeWriteFunc()
116+
117+
// The message sits unconsumed in allEventsUpToLockProcessed (simulating a timeout)
118+
require.Len(t, m.allEventsUpToLockProcessed, 1)
119+
120+
// Attempt 2: send a new sentinel — must overwrite the stale one
121+
sendChangelogEvent("AllEventsUpToLockProcessed:attempt2")
122+
executeWriteFunc()
123+
124+
// The channel should contain exactly the latest message
125+
require.Len(t, m.allEventsUpToLockProcessed, 1)
126+
msg := <-m.allEventsUpToLockProcessed
127+
require.Equal(t, "AllEventsUpToLockProcessed:attempt2", msg.state)
128+
})
129+
130+
t.Run("NewMigrator-with-extreme-MaxRetries", func(t *testing.T) {
131+
// Regression test: an extremely large --default-retries value must not
132+
// cause an OOM when creating the migrator. Before the fix,
133+
// allEventsUpToLockProcessed was buffered to MaxRetries(), which tried
134+
// to allocate a ~10 trillion element channel.
135+
ctx := base.NewMigrationContext()
136+
ctx.SetDefaultNumRetries(9999999999999)
137+
require.Equal(t, int64(9999999999999), ctx.MaxRetries())
138+
139+
m := NewMigrator(ctx, "test")
140+
require.NotNil(t, m)
141+
require.Equal(t, 1, cap(m.allEventsUpToLockProcessed))
142+
})
143+
84144
t.Run("state-GhostTableMigrated", func(t *testing.T) {
85145
go func() {
86146
require.True(t, <-migrator.ghostTableMigrated)

0 commit comments

Comments
 (0)