Skip to content

Commit f47ab0a

Browse files
zsfelfoldijakub-freebit
authored andcommitted
core: update blockProcFeed in insertChain (ethereum#31065)
This PR moves the updating of the `blockProcFeed` event feed from `InsertChain` to `insertChain` in order to ensure that the feed subscribers are notified whenever block processing happens. Note that this event is not subscribed to anywhere in our codebase at the moment, earlier it was used by the LES server to avoid slowing down block processing. Now I want to do the same with the log indexer, the problem is that back then every block insertion was done by `InsertChain`, now the regular payload insertion is done by `InsertBlockWithoutSetHead`. Both of these (and also `SetCanonical` if needed) calls `insertChain` so I moved the feed update there.
1 parent 2840719 commit f47ab0a

File tree

1 file changed

+19
-10
lines changed

1 file changed

+19
-10
lines changed

core/blockchain.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -235,14 +235,15 @@ type BlockChain struct {
235235
statedb *state.CachingDB // State database to reuse between imports (contains state cache)
236236
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
237237

238-
hc *HeaderChain
239-
rmLogsFeed event.Feed
240-
chainFeed event.Feed
241-
chainHeadFeed event.Feed
242-
logsFeed event.Feed
243-
blockProcFeed event.Feed
244-
scope event.SubscriptionScope
245-
genesisBlock *types.Block
238+
hc *HeaderChain
239+
rmLogsFeed event.Feed
240+
chainFeed event.Feed
241+
chainHeadFeed event.Feed
242+
logsFeed event.Feed
243+
blockProcFeed event.Feed
244+
blockProcCounter int32
245+
scope event.SubscriptionScope
246+
genesisBlock *types.Block
246247

247248
// This mutex synchronizes chain write operations.
248249
// Readers don't need to take it, they can just read the database.
@@ -1574,8 +1575,6 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
15741575
if len(chain) == 0 {
15751576
return 0, nil
15761577
}
1577-
bc.blockProcFeed.Send(true)
1578-
defer bc.blockProcFeed.Send(false)
15791578

15801579
// Do a sanity check that the provided chain is actually ordered and linked.
15811580
for i := 1; i < len(chain); i++ {
@@ -1615,6 +1614,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
16151614
if bc.insertStopped() {
16161615
return nil, 0, nil
16171616
}
1617+
1618+
if atomic.AddInt32(&bc.blockProcCounter, 1) == 1 {
1619+
bc.blockProcFeed.Send(true)
1620+
}
1621+
defer func() {
1622+
if atomic.AddInt32(&bc.blockProcCounter, -1) == 0 {
1623+
bc.blockProcFeed.Send(false)
1624+
}
1625+
}()
1626+
16181627
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
16191628
SenderCacher().RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain)
16201629

0 commit comments

Comments
 (0)