Skip to content

Commit 703f976

Browse files
authored
refactor: rework chainsync bulk mode (#455)
This will now cache pending block points and request the full blocks in a batch. This also removes the bulk-mode config option, since it's no longer optional and the original reasons for making this optional no longer apply. Fixes #412 Signed-off-by: Aurora Gaffney <[email protected]>
1 parent 1917b56 commit 703f976

File tree

4 files changed

+51
-68
lines changed

4 files changed

+51
-68
lines changed

.golangci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ linters:
4141
- zerologlint
4242
disable:
4343
- depguard
44+
- noctx
4445
exclusions:
4546
generated: lax
4647
presets:

input/chainsync/chainsync.go

Lines changed: 47 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ const (
4343
// Size of cache for recent chainsync cursors
4444
cursorCacheSize = 20
4545

46+
blockBatchSize = 500
47+
4648
maxAutoReconnectDelay = 60 * time.Second
4749
defaultKupoTimeout = 30 * time.Second
4850
)
@@ -55,7 +57,6 @@ type ChainSync struct {
5557
address string
5658
socketPath string
5759
ntcTcp bool
58-
bulkMode bool
5960
intersectTip bool
6061
intersectPoints []ocommon.Point
6162
includeCbor bool
@@ -65,15 +66,16 @@ type ChainSync struct {
6566
status *ChainSyncStatus
6667
errorChan chan error
6768
eventChan chan event.Event
68-
bulkRangeStart ocommon.Point
69-
bulkRangeEnd ocommon.Point
7069
cursorCache []ocommon.Point
7170
dialAddress string
7271
dialFamily string
7372
kupoUrl string
7473
kupoClient *kugo.Client
7574
delayConfirmations uint
7675
delayBuffer [][]event.Event
76+
pendingBlockPoints []ocommon.Point
77+
blockfetchDoneChan chan struct{}
78+
lastTip ochainsync.Tip
7779
}
7880

7981
type ChainSyncStatus struct {
@@ -111,41 +113,16 @@ func (c *ChainSync) Start() error {
111113
if c.oConn.BlockFetch() != nil {
112114
c.oConn.BlockFetch().Client.Start()
113115
}
114-
// TODO: remove me
115-
// Disable bulk mode until we can fix it
116-
// https://github.com/blinklabs-io/adder/issues/412
117-
c.bulkMode = false
118-
if c.bulkMode && !c.intersectTip && c.oConn.BlockFetch() != nil {
119-
// Get available block range between our intersect point(s) and the chain tip
120-
var err error
121-
c.bulkRangeStart, c.bulkRangeEnd, err = c.oConn.ChainSync().Client.GetAvailableBlockRange(
122-
c.intersectPoints,
123-
)
116+
c.pendingBlockPoints = make([]ocommon.Point, 0)
117+
if c.intersectTip {
118+
tip, err := c.oConn.ChainSync().Client.GetCurrentTip()
124119
if err != nil {
125120
return err
126121
}
127-
if c.bulkRangeStart.Slot == 0 || c.bulkRangeEnd.Slot == 0 {
128-
// We're already at chain tip, so start a normal sync
129-
if err := c.oConn.ChainSync().Client.Sync(c.intersectPoints); err != nil {
130-
return err
131-
}
132-
} else {
133-
// Use BlockFetch to request the entire available block range at once
134-
if err := c.oConn.BlockFetch().Client.GetBlockRange(c.bulkRangeStart, c.bulkRangeEnd); err != nil {
135-
return err
136-
}
137-
}
138-
} else {
139-
if c.intersectTip {
140-
tip, err := c.oConn.ChainSync().Client.GetCurrentTip()
141-
if err != nil {
142-
return err
143-
}
144-
c.intersectPoints = []ocommon.Point{tip.Point}
145-
}
146-
if err := c.oConn.ChainSync().Client.Sync(c.intersectPoints); err != nil {
147-
return err
148-
}
122+
c.intersectPoints = []ocommon.Point{tip.Point}
123+
}
124+
if err := c.oConn.ChainSync().Client.Sync(c.intersectPoints); err != nil {
125+
return err
149126
}
150127
return nil
151128
}
@@ -228,6 +205,9 @@ func (c *ChainSync) setupConnection() error {
228205
ouroboros.WithBlockFetchConfig(
229206
blockfetch.NewConfig(
230207
blockfetch.WithBlockFunc(c.handleBlockFetchBlock),
208+
blockfetch.WithBatchDoneFunc(c.handleBlockFetchBatchDone),
209+
// Set the recv queue size to 2x our block batch size
210+
blockfetch.WithRecvQueueSize(1000),
231211
),
232212
),
233213
)
@@ -309,6 +289,7 @@ func (c *ChainSync) handleRollBackward(
309289
point ocommon.Point,
310290
tip ochainsync.Tip,
311291
) error {
292+
c.lastTip = tip
312293
evt := event.New(
313294
"chainsync.rollback",
314295
time.Now(),
@@ -350,21 +331,33 @@ func (c *ChainSync) handleRollForward(
350331
blockData any,
351332
tip ochainsync.Tip,
352333
) error {
334+
c.lastTip = tip
353335
var block ledger.Block
354-
var err error
355336
tmpEvents := make([]event.Event, 0, 20)
356337
switch v := blockData.(type) {
357338
case ledger.Block:
358339
block = v
359340
case ledger.BlockHeader:
360-
blockSlot := v.SlotNumber()
361-
block, err = c.oConn.BlockFetch().Client.GetBlock(ocommon.Point{Slot: blockSlot, Hash: v.Hash().Bytes()})
362-
if err != nil {
363-
return err
341+
c.pendingBlockPoints = append(
342+
c.pendingBlockPoints,
343+
ocommon.Point{
344+
Hash: v.Hash().Bytes(),
345+
Slot: v.SlotNumber(),
346+
},
347+
)
348+
// Don't fetch block unless we hit the batch size or are close to tip
349+
if v.SlotNumber() < (tip.Point.Slot-10000) && len(c.pendingBlockPoints) < blockBatchSize {
350+
return nil
364351
}
365-
if block == nil {
366-
return errors.New("blockfetch returned empty")
352+
// Request pending block range
353+
c.blockfetchDoneChan = make(chan struct{})
354+
if err := c.oConn.BlockFetch().Client.GetBlockRange(c.pendingBlockPoints[0], c.pendingBlockPoints[len(c.pendingBlockPoints)-1]); err != nil {
355+
return err
367356
}
357+
c.pendingBlockPoints = make([]ocommon.Point, 0)
358+
// Wait for block-fetch to finish
359+
<-c.blockfetchDoneChan
360+
return nil
368361
default:
369362
return errors.New("unknown type")
370363
}
@@ -462,15 +455,16 @@ func (c *ChainSync) handleBlockFetchBlock(
462455
block.SlotNumber(),
463456
block.BlockNumber(),
464457
block.Hash().String(),
465-
c.bulkRangeEnd.Slot,
466-
hex.EncodeToString(c.bulkRangeEnd.Hash),
458+
c.lastTip.Point.Slot,
459+
hex.EncodeToString(c.lastTip.Point.Hash),
467460
)
468-
// Start normal chain-sync if we've reached the last block of our bulk range
469-
if block.SlotNumber() == c.bulkRangeEnd.Slot {
470-
if err := c.oConn.ChainSync().Client.Sync([]ocommon.Point{c.bulkRangeEnd}); err != nil {
471-
return err
472-
}
473-
}
461+
return nil
462+
}
463+
464+
func (c *ChainSync) handleBlockFetchBatchDone(
465+
ctx blockfetch.CallbackContext,
466+
) error {
467+
close(c.blockfetchDoneChan)
474468
return nil
475469
}
476470

@@ -492,12 +486,9 @@ func (c *ChainSync) updateStatus(
492486
}
493487
// Determine if we've reached the chain tip
494488
if !c.status.TipReached {
495-
// Make sure we're past the end slot in any bulk range, since we don't update the tip during bulk sync
496-
if slotNumber > c.bulkRangeEnd.Slot {
497-
// Make sure our current slot is equal/higher than our last known tip slot
498-
if c.status.SlotNumber > 0 && slotNumber >= c.status.TipSlotNumber {
499-
c.status.TipReached = true
500-
}
489+
// Make sure our current slot is equal/higher than our last known tip slot
490+
if c.status.SlotNumber > 0 && slotNumber >= c.status.TipSlotNumber {
491+
c.status.TipReached = true
501492
}
502493
}
503494
c.status.SlotNumber = slotNumber

input/chainsync/options.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,10 @@ func WithStatusUpdateFunc(
102102
}
103103

104104
// WithBulkMode specifies whether to use the "bulk" sync mode with NtN (node-to-node). This should only be used against your own nodes for resource usage reasons
105+
//
106+
// Deprecated: this flag no longer does anything useful, as bulk mode is now the default (and only) mode of operation
105107
func WithBulkMode(bulkMode bool) ChainSyncOptionFunc {
106-
return func(c *ChainSync) {
107-
c.bulkMode = bulkMode
108-
}
108+
return func(c *ChainSync) {}
109109
}
110110

111111
// WithKupoUrl specifies the URL for a Kupo instance that will be queried for additional information

input/chainsync/plugin.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ var cmdlineOptions struct {
3131
address string
3232
socketPath string
3333
ntcTcp bool
34-
bulkMode bool
3534
intersectTip bool
3635
intersectPoint string
3736
includeCbor bool
@@ -85,13 +84,6 @@ func init() {
8584
DefaultValue: false,
8685
Dest: &(cmdlineOptions.ntcTcp),
8786
},
88-
{
89-
Name: "bulk-mode",
90-
Type: plugin.PluginOptionTypeBool,
91-
Description: "use the 'bulk' sync mode with NtN (node-to-node). This should only be used against your own nodes for resource usage reasons",
92-
DefaultValue: false,
93-
Dest: &(cmdlineOptions.bulkMode),
94-
},
9587
{
9688
Name: "intersect-tip",
9789
Type: plugin.PluginOptionTypeBool,
@@ -155,7 +147,6 @@ func NewFromCmdlineOptions() plugin.Plugin {
155147
WithAddress(cmdlineOptions.address),
156148
WithSocketPath(cmdlineOptions.socketPath),
157149
WithNtcTcp(cmdlineOptions.ntcTcp),
158-
WithBulkMode(cmdlineOptions.bulkMode),
159150
WithIncludeCbor(cmdlineOptions.includeCbor),
160151
WithAutoReconnect(cmdlineOptions.autoReconnect),
161152
WithKupoUrl(cmdlineOptions.kupoUrl),

0 commit comments

Comments
 (0)