Skip to content

Commit b1a67da

Browse files
committed
experimental migration from mysql to postgres
1 parent 44cd51e commit b1a67da

File tree

1 file changed

+398
-0
lines changed

1 file changed

+398
-0
lines changed

distrib/postgresql/mysql2pg.go

Lines changed: 398 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,398 @@
1+
// migrate-mysql-to-postgres copies Ergo's history data from a MySQL database
2+
// to a PostgreSQL database, reading connection details from an Ergo config file
3+
// with both databases populated in `datastore` (neither database needs `enabled: true`
4+
// to work.) It creates the PostgreSQL schema from scratch, then copies all data.
5+
//
6+
// WARNINGS:
7+
// 1. This was tested with Ergo v2.18.0 only
8+
// 2. It will destroy any preexisting data on the PostgreSQL side
9+
// 3. It should not modify any data on the MySQL side, but make a backup first anyway
10+
// 4. For best results, quiesce MySQL by stopping Ergo before running the migration
11+
//
12+
// Usage:
13+
//
14+
// go run ./distrib/postgresql/mysql2pg -config /path/to/ircd.yaml
15+
package main
16+
17+
import (
18+
"database/sql"
19+
"flag"
20+
"fmt"
21+
"log"
22+
"net/url"
23+
"os"
24+
25+
_ "github.com/go-sql-driver/mysql"
26+
_ "github.com/jackc/pgx/v5/stdlib"
27+
"gopkg.in/yaml.v2"
28+
)
29+
30+
const (
31+
maxTargetLength = 64 // copied from irc/postgresql/config.go
32+
latestSchema = "2" // copied from irc/postgresql/history.go
33+
latestMinorSchema = "2"
34+
batchSize = 1000
35+
)
36+
37+
// Minimal config structs matching the ergo YAML structure.
38+
// Field names are lowercased by yaml.v2 when no tag is present.
39+
40+
type mysqlConfig struct {
41+
Host string
42+
Port int
43+
SocketPath string `yaml:"socket-path"`
44+
User string
45+
Password string
46+
HistoryDatabase string `yaml:"history-database"`
47+
}
48+
49+
type postgresConfig struct {
50+
Host string
51+
Port int
52+
SocketPath string `yaml:"socket-path"`
53+
User string
54+
Password string
55+
HistoryDatabase string `yaml:"history-database"`
56+
URI string `yaml:"uri"`
57+
SSLMode string `yaml:"ssl-mode"`
58+
}
59+
60+
type ergoConfig struct {
61+
Datastore struct {
62+
MySQL mysqlConfig `yaml:"mysql"`
63+
PostgreSQL postgresConfig `yaml:"postgresql"`
64+
}
65+
}
66+
67+
// mysqlDSN builds a go-sql-driver/mysql DSN from config.
68+
// Copied from irc/mysql/history.go (*MySQL).open().
69+
func mysqlDSN(c mysqlConfig) string {
70+
var address string
71+
if c.SocketPath != "" {
72+
address = fmt.Sprintf("unix(%s)", c.SocketPath)
73+
} else {
74+
port := c.Port
75+
if port == 0 {
76+
port = 3306
77+
}
78+
address = fmt.Sprintf("tcp(%s:%d)", c.Host, port)
79+
}
80+
return fmt.Sprintf("%s:%s@%s/%s", c.User, c.Password, address, c.HistoryDatabase)
81+
}
82+
83+
// postgresURI builds a libpq URI from config.
84+
// Copied from irc/postgresql/history.go (*Config).buildURI().
85+
func postgresURI(c postgresConfig) string {
86+
if c.URI != "" {
87+
return c.URI
88+
}
89+
u := &url.URL{
90+
Scheme: "postgresql",
91+
Path: "/" + c.HistoryDatabase,
92+
}
93+
q := url.Values{}
94+
if c.SocketPath != "" {
95+
q.Set("host", c.SocketPath)
96+
if c.User != "" || c.Password != "" {
97+
u.User = url.UserPassword(c.User, c.Password)
98+
}
99+
} else {
100+
port := c.Port
101+
if port == 0 {
102+
port = 5432
103+
}
104+
host := c.Host
105+
if host == "" {
106+
host = "localhost"
107+
}
108+
u.Host = fmt.Sprintf("%s:%d", host, port)
109+
if c.User != "" || c.Password != "" {
110+
u.User = url.UserPassword(c.User, c.Password)
111+
}
112+
}
113+
sslMode := c.SSLMode
114+
if sslMode == "" {
115+
sslMode = "disable"
116+
}
117+
q.Set("sslmode", sslMode)
118+
u.RawQuery = q.Encode()
119+
return u.String()
120+
}
121+
122+
func main() {
123+
configPath := flag.String("config", "ircd.yaml", "path to ergo config file")
124+
flag.Parse()
125+
126+
data, err := os.ReadFile(*configPath)
127+
if err != nil {
128+
log.Fatalf("read config: %v", err)
129+
}
130+
var config ergoConfig
131+
if err := yaml.Unmarshal(data, &config); err != nil {
132+
log.Fatalf("parse config: %v", err)
133+
}
134+
135+
mysqlDB, err := sql.Open("mysql", mysqlDSN(config.Datastore.MySQL))
136+
if err != nil {
137+
log.Fatalf("open mysql: %v", err)
138+
}
139+
defer mysqlDB.Close()
140+
if err := mysqlDB.Ping(); err != nil {
141+
log.Fatalf("ping mysql: %v", err)
142+
}
143+
log.Println("connected to MySQL")
144+
145+
pgDB, err := sql.Open("pgx", postgresURI(config.Datastore.PostgreSQL))
146+
if err != nil {
147+
log.Fatalf("open postgres: %v", err)
148+
}
149+
defer pgDB.Close()
150+
if err := pgDB.Ping(); err != nil {
151+
log.Fatalf("ping postgres: %v", err)
152+
}
153+
log.Println("connected to PostgreSQL")
154+
155+
log.Println("setting up PostgreSQL schema")
156+
if err := setupSchema(pgDB); err != nil {
157+
log.Fatalf("setup schema: %v", err)
158+
}
159+
160+
tables := []struct {
161+
name string
162+
fn func(*sql.DB, *sql.DB) (int, error)
163+
}{
164+
{"history", copyHistory},
165+
{"sequence", copySequence},
166+
{"conversations", copyConversations},
167+
{"correspondents", copyCorrespondents},
168+
{"account_messages", copyAccountMessages},
169+
{"forget", copyForget},
170+
}
171+
for _, t := range tables {
172+
log.Printf("copying %s...", t.name)
173+
n, err := t.fn(mysqlDB, pgDB)
174+
if err != nil {
175+
log.Fatalf("copy %s: %v", t.name, err)
176+
}
177+
log.Printf(" %d rows", n)
178+
}
179+
180+
log.Println("resetting sequences")
181+
if err := resetSequences(pgDB); err != nil {
182+
log.Fatalf("reset sequences: %v", err)
183+
}
184+
185+
log.Println("done")
186+
}
187+
188+
// setupSchema drops and recreates all tables and indexes.
189+
// Table definitions and indexes are copied from irc/postgresql/history.go.
190+
func setupSchema(db *sql.DB) error {
191+
drops := []string{
192+
"DROP TABLE IF EXISTS forget CASCADE",
193+
"DROP TABLE IF EXISTS account_messages CASCADE",
194+
"DROP TABLE IF EXISTS correspondents CASCADE",
195+
"DROP TABLE IF EXISTS conversations CASCADE",
196+
"DROP TABLE IF EXISTS sequence CASCADE",
197+
"DROP TABLE IF EXISTS history CASCADE",
198+
"DROP TABLE IF EXISTS metadata CASCADE",
199+
}
200+
for _, stmt := range drops {
201+
if _, err := db.Exec(stmt); err != nil {
202+
return fmt.Errorf("%s: %w", stmt, err)
203+
}
204+
}
205+
206+
stmts := []string{
207+
`CREATE TABLE metadata (
208+
key_name VARCHAR(32) PRIMARY KEY,
209+
value VARCHAR(32) NOT NULL
210+
)`,
211+
212+
`CREATE TABLE history (
213+
id BIGSERIAL PRIMARY KEY,
214+
data BYTEA NOT NULL,
215+
msgid BYTEA NOT NULL CHECK (octet_length(msgid) = 16)
216+
)`,
217+
`CREATE INDEX idx_history_msgid ON history (msgid)`,
218+
219+
fmt.Sprintf(`CREATE TABLE sequence (
220+
history_id BIGINT NOT NULL PRIMARY KEY,
221+
target BYTEA NOT NULL CHECK (octet_length(target) <= %d),
222+
nanotime BIGINT NOT NULL CHECK (nanotime >= 0)
223+
)`, maxTargetLength),
224+
`CREATE INDEX idx_sequence_target_nanotime ON sequence (target, nanotime)`,
225+
226+
fmt.Sprintf(`CREATE TABLE conversations (
227+
id BIGSERIAL PRIMARY KEY,
228+
target BYTEA NOT NULL CHECK (octet_length(target) <= %d),
229+
correspondent BYTEA NOT NULL CHECK (octet_length(correspondent) <= %d),
230+
nanotime BIGINT NOT NULL CHECK (nanotime >= 0),
231+
history_id BIGINT NOT NULL
232+
)`, maxTargetLength, maxTargetLength),
233+
`CREATE INDEX idx_conversations_target_correspondent_nanotime ON conversations (target, correspondent, nanotime)`,
234+
`CREATE INDEX idx_conversations_history_id ON conversations (history_id)`,
235+
236+
fmt.Sprintf(`CREATE TABLE correspondents (
237+
id BIGSERIAL PRIMARY KEY,
238+
target BYTEA NOT NULL CHECK (octet_length(target) <= %d),
239+
correspondent BYTEA NOT NULL CHECK (octet_length(correspondent) <= %d),
240+
nanotime BIGINT NOT NULL CHECK (nanotime >= 0),
241+
UNIQUE (target, correspondent)
242+
)`, maxTargetLength, maxTargetLength),
243+
`CREATE INDEX idx_correspondents_target_nanotime ON correspondents (target, nanotime)`,
244+
`CREATE INDEX idx_correspondents_nanotime ON correspondents (nanotime)`,
245+
246+
fmt.Sprintf(`CREATE TABLE account_messages (
247+
history_id BIGINT NOT NULL PRIMARY KEY,
248+
account BYTEA NOT NULL CHECK (octet_length(account) <= %d)
249+
)`, maxTargetLength),
250+
`CREATE INDEX idx_account_messages_account_history_id ON account_messages (account, history_id)`,
251+
252+
fmt.Sprintf(`CREATE TABLE forget (
253+
id BIGSERIAL PRIMARY KEY,
254+
account BYTEA NOT NULL CHECK (octet_length(account) <= %d)
255+
)`, maxTargetLength),
256+
}
257+
for _, stmt := range stmts {
258+
if _, err := db.Exec(stmt); err != nil {
259+
return fmt.Errorf("exec: %w\nstatement: %s", err, stmt)
260+
}
261+
}
262+
263+
_, err := db.Exec(
264+
`INSERT INTO metadata (key_name, value) VALUES ($1, $2), ($3, $4)`,
265+
"db.version", latestSchema, "db.minorversion", latestMinorSchema,
266+
)
267+
return err
268+
}
269+
270+
func copyHistory(src, dst *sql.DB) (int, error) {
271+
return copyBatched(src, dst,
272+
"SELECT id, data, msgid FROM history WHERE id > ? ORDER BY id LIMIT ?",
273+
"INSERT INTO history (id, data, msgid) VALUES ($1, $2, $3)",
274+
3,
275+
)
276+
}
277+
278+
func copySequence(src, dst *sql.DB) (int, error) {
279+
return copyBatched(src, dst,
280+
"SELECT history_id, target, nanotime FROM sequence WHERE history_id > ? ORDER BY history_id LIMIT ?",
281+
"INSERT INTO sequence (history_id, target, nanotime) VALUES ($1, $2, $3)",
282+
3,
283+
)
284+
}
285+
286+
func copyConversations(src, dst *sql.DB) (int, error) {
287+
return copyBatched(src, dst,
288+
"SELECT id, target, correspondent, nanotime, history_id FROM conversations WHERE id > ? ORDER BY id LIMIT ?",
289+
"INSERT INTO conversations (id, target, correspondent, nanotime, history_id) VALUES ($1, $2, $3, $4, $5)",
290+
5,
291+
)
292+
}
293+
294+
func copyCorrespondents(src, dst *sql.DB) (int, error) {
295+
return copyBatched(src, dst,
296+
"SELECT id, target, correspondent, nanotime FROM correspondents WHERE id > ? ORDER BY id LIMIT ?",
297+
"INSERT INTO correspondents (id, target, correspondent, nanotime) VALUES ($1, $2, $3, $4)",
298+
4,
299+
)
300+
}
301+
302+
func copyAccountMessages(src, dst *sql.DB) (int, error) {
303+
return copyBatched(src, dst,
304+
"SELECT history_id, account FROM account_messages WHERE history_id > ? ORDER BY history_id LIMIT ?",
305+
"INSERT INTO account_messages (history_id, account) VALUES ($1, $2)",
306+
2,
307+
)
308+
}
309+
310+
func copyForget(src, dst *sql.DB) (int, error) {
311+
return copyBatched(src, dst,
312+
"SELECT id, account FROM forget WHERE id > ? ORDER BY id LIMIT ?",
313+
"INSERT INTO forget (id, account) VALUES ($1, $2)",
314+
2,
315+
)
316+
}
317+
318+
// copyBatched copies rows from src to dst in batches using keyset pagination on
319+
// the first (primary key) column. srcQuery must accept (lastID int64, limit int)
320+
// and return rows ordered by that column. One transaction is committed per batch.
321+
func copyBatched(src, dst *sql.DB, srcQuery, dstInsert string, ncols int) (int, error) {
322+
vals := make([]any, ncols)
323+
ptrs := make([]any, ncols)
324+
for i := range vals {
325+
ptrs[i] = &vals[i]
326+
}
327+
328+
var lastID int64
329+
total := 0
330+
for {
331+
rows, err := src.Query(srcQuery, lastID, batchSize)
332+
if err != nil {
333+
return total, fmt.Errorf("query: %w", err)
334+
}
335+
336+
tx, err := dst.Begin()
337+
if err != nil {
338+
rows.Close()
339+
return total, fmt.Errorf("begin tx: %w", err)
340+
}
341+
342+
batchCount := 0
343+
for rows.Next() {
344+
if err := rows.Scan(ptrs...); err != nil {
345+
rows.Close()
346+
tx.Rollback()
347+
return total, fmt.Errorf("scan: %w", err)
348+
}
349+
// MySQL returns BIGINT UNSIGNED as uint64; convert to int64 for PostgreSQL.
350+
for i, v := range vals {
351+
if u, ok := v.(uint64); ok {
352+
vals[i] = int64(u)
353+
}
354+
}
355+
if _, err := tx.Exec(dstInsert, vals...); err != nil {
356+
rows.Close()
357+
tx.Rollback()
358+
return total, fmt.Errorf("insert: %w", err)
359+
}
360+
if id, ok := vals[0].(int64); ok {
361+
lastID = id
362+
}
363+
batchCount++
364+
}
365+
rows.Close()
366+
if err := rows.Err(); err != nil {
367+
tx.Rollback()
368+
return total, fmt.Errorf("rows: %w", err)
369+
}
370+
if err := tx.Commit(); err != nil {
371+
return total, fmt.Errorf("commit: %w", err)
372+
}
373+
total += batchCount
374+
log.Printf(" %d rows copied so far", total)
375+
if batchCount < batchSize {
376+
break
377+
}
378+
}
379+
return total, nil
380+
}
381+
382+
func resetSequences(db *sql.DB) error {
383+
seqs := []struct{ seq, table, col string }{
384+
{"history_id_seq", "history", "id"},
385+
{"conversations_id_seq", "conversations", "id"},
386+
{"correspondents_id_seq", "correspondents", "id"},
387+
{"forget_id_seq", "forget", "id"},
388+
}
389+
for _, s := range seqs {
390+
if _, err := db.Exec(fmt.Sprintf(
391+
"SELECT setval('%s', COALESCE((SELECT MAX(%s) FROM %s), 1))",
392+
s.seq, s.col, s.table,
393+
)); err != nil {
394+
return fmt.Errorf("reset %s: %w", s.seq, err)
395+
}
396+
}
397+
return nil
398+
}

0 commit comments

Comments
 (0)