Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions core/filtermaps/filtermaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type FilterMaps struct {
indexLock sync.RWMutex
indexedRange filterMapsRange
indexedView *ChainView // always consistent with the log index
hasTempRange bool

// also accessed by indexer and matcher backend but no locking needed.
filterMapCache *lru.Cache[uint32, filterMap]
Expand All @@ -94,7 +95,7 @@ type FilterMaps struct {
ptrTailUnindexMap uint32

targetView *ChainView
matcherSyncRequest *FilterMapsMatcherBackend
matcherSyncRequests []*FilterMapsMatcherBackend
historyCutoff uint64
finalBlock, lastFinal uint64
lastFinalEpoch uint32
Expand Down Expand Up @@ -330,7 +331,7 @@ func (f *FilterMaps) init() error {
fmr.blocks = common.NewRange(cp.BlockNumber+1, 0)
fmr.maps = common.NewRange(uint32(bestLen)<<f.logMapsPerEpoch, 0)
}
f.setRange(batch, f.targetView, fmr)
f.setRange(batch, f.targetView, fmr, false)
return batch.Write()
}

Expand Down Expand Up @@ -373,9 +374,10 @@ func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool {
// setRange updates the indexed chain view and covered range and also adds the
// changes to the given batch.
// Note that this function assumes that the index write lock is being held.
func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, newRange filterMapsRange) {
func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, newRange filterMapsRange, isTempRange bool) {
f.indexedView = newView
f.indexedRange = newRange
f.hasTempRange = isTempRange
f.updateMatchersValidRange()
if newRange.initialized {
rs := rawdb.FilterMapsRange{
Expand Down Expand Up @@ -666,7 +668,7 @@ func (f *FilterMaps) deleteTailEpoch(epoch uint32) error {
} else {
return errors.New("invalid tail epoch number")
}
f.setRange(f.db, f.indexedView, fmr)
f.setRange(f.db, f.indexedView, fmr, false)
first := f.mapRowIndex(firstMap, 0)
count := f.mapRowIndex(firstMap+f.mapsPerEpoch, 0) - first
rawdb.DeleteFilterMapRows(f.db, common.NewRange(first, count))
Expand Down
14 changes: 9 additions & 5 deletions core/filtermaps/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,18 @@ func (f *FilterMaps) processEvents() {
// processSingleEvent processes a single event either in a blocking or
// non-blocking manner.
func (f *FilterMaps) processSingleEvent(blocking bool) bool {
if f.matcherSyncRequest != nil && f.targetHeadIndexed() {
f.matcherSyncRequest.synced()
f.matcherSyncRequest = nil
if !f.hasTempRange {
for _, mb := range f.matcherSyncRequests {
mb.synced()
}
f.matcherSyncRequests = nil
}
if blocking {
select {
case target := <-f.targetCh:
f.setTarget(target)
case f.matcherSyncRequest = <-f.matcherSyncCh:
case mb := <-f.matcherSyncCh:
f.matcherSyncRequests = append(f.matcherSyncRequests, mb)
case f.blockProcessing = <-f.blockProcessingCh:
case <-f.closeCh:
f.stop = true
Expand All @@ -160,7 +163,8 @@ func (f *FilterMaps) processSingleEvent(blocking bool) bool {
select {
case target := <-f.targetCh:
f.setTarget(target)
case f.matcherSyncRequest = <-f.matcherSyncCh:
case mb := <-f.matcherSyncCh:
f.matcherSyncRequests = append(f.matcherSyncRequests, mb)
case f.blockProcessing = <-f.blockProcessingCh:
case <-f.closeCh:
f.stop = true
Expand Down
8 changes: 6 additions & 2 deletions core/filtermaps/map_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,16 @@ func (r *mapRenderer) writeFinishedMaps(pauseCb func() bool) error {
}
// do not exit while in partially written state but do allow processing
// events and pausing while block processing is in progress
r.f.indexLock.Unlock()
pauseCb()
r.f.indexLock.Lock()
batch = r.f.db.NewBatch()
}
}

r.f.setRange(batch, r.f.indexedView, tempRange)
if tempRange != r.f.indexedRange {
r.f.setRange(batch, r.f.indexedView, tempRange, true)
}
// add or update filter rows
for rowIndex := uint32(0); rowIndex < r.f.mapHeight; rowIndex++ {
var (
Expand Down Expand Up @@ -469,7 +473,7 @@ func (r *mapRenderer) writeFinishedMaps(pauseCb func() bool) error {
}
r.finishedMaps = make(map[uint32]*renderedMap)
r.finished.SetFirst(r.finished.AfterLast())
r.f.setRange(batch, renderedView, newRange)
r.f.setRange(batch, renderedView, newRange, false)
if err := batch.Write(); err != nil {
log.Crit("Error writing log index update batch", "error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/filtermaps/matcher_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (fm *FilterMapsMatcherBackend) synced() {
indexedBlocks.SetAfterLast(indexedBlocks.Last()) // remove partially indexed last block
}
fm.syncCh <- SyncRange{
HeadNumber: fm.f.indexedView.headNumber,
HeadNumber: fm.f.targetView.headNumber,
ValidBlocks: fm.validBlocks,
IndexedBlocks: indexedBlocks,
}
Expand Down