Skip to content

add DeleteRange feature #2191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ func (t *table) Delete(key []byte) error {
return t.db.Delete(append([]byte(t.prefix), key...))
}

// DeleteRange deletes all of the keys (and values) in the range [start,end)
// (inclusive on start, exclusive on end).
func (t *table) DeleteRange(start, end []byte) error {
return t.db.DeleteRange(append([]byte(t.prefix), start...), append([]byte(t.prefix), end...))
}

// NewIterator creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
Expand Down
9 changes: 9 additions & 0 deletions ctxcdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ type KeyValueWriter interface {
Delete(key []byte) error
}

// KeyValueRangeDeleter wraps the DeleteRange method of a backing data store.
type KeyValueRangeDeleter interface {
// DeleteRange deletes all of the keys (and values) in the range [start,end)
// (inclusive on start, exclusive on end).
DeleteRange(start, end []byte) error
}

// KeyValueStater wraps the Stat method of a backing data store.
type KeyValueStater interface {
// Stat returns a particular internal stat of the database.
Expand All @@ -61,6 +68,7 @@ type KeyValueStore interface {
KeyValueReader
KeyValueWriter
KeyValueStater
KeyValueRangeDeleter
Batcher
Iteratee
Compacter
Expand Down Expand Up @@ -154,6 +162,7 @@ type Reader interface {
// immutable ancient data.
type Writer interface {
KeyValueWriter
KeyValueRangeDeleter
AncientWriter
}

Expand Down
82 changes: 82 additions & 0 deletions ctxcdb/dbtest/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/rand"
"reflect"
"sort"
"strconv"
"testing"

"github.com/CortexFoundation/CortexTheseus/ctxcdb"
Expand Down Expand Up @@ -403,6 +404,64 @@ func TestDatabaseSuite(t *testing.T, New func() ctxcdb.KeyValueStore) {
t.Fatalf("expected error on batch.Write after Close")
}
})

t.Run("DeleteRange", func(t *testing.T) {
db := New()
defer db.Close()

addRange := func(start, stop int) {
for i := start; i <= stop; i++ {
db.Put([]byte(strconv.Itoa(i)), nil)
}
}

checkRange := func(start, stop int, exp bool) {
for i := start; i <= stop; i++ {
has, _ := db.Has([]byte(strconv.Itoa(i)))
if has && !exp {
t.Fatalf("unexpected key %d", i)
}
if !has && exp {
t.Fatalf("missing expected key %d", i)
}
}
}

addRange(1, 9)
db.DeleteRange([]byte("9"), []byte("1"))
checkRange(1, 9, true)
db.DeleteRange([]byte("5"), []byte("5"))
checkRange(1, 9, true)
db.DeleteRange([]byte("5"), []byte("50"))
checkRange(1, 4, true)
checkRange(5, 5, false)
checkRange(6, 9, true)
db.DeleteRange([]byte(""), []byte("a"))
checkRange(1, 9, false)

addRange(1, 999)
db.DeleteRange([]byte("12345"), []byte("54321"))
checkRange(1, 1, true)
checkRange(2, 5, false)
checkRange(6, 12, true)
checkRange(13, 54, false)
checkRange(55, 123, true)
checkRange(124, 543, false)
checkRange(544, 999, true)

addRange(1, 999)
db.DeleteRange([]byte("3"), []byte("7"))
checkRange(1, 2, true)
checkRange(3, 6, false)
checkRange(7, 29, true)
checkRange(30, 69, false)
checkRange(70, 299, true)
checkRange(300, 699, false)
checkRange(700, 999, true)

db.DeleteRange([]byte(""), []byte("a"))
checkRange(1, 999, false)
})
}

// BenchDatabaseSuite runs a suite of benchmarks against a KeyValueStore database
Expand Down Expand Up @@ -498,6 +557,29 @@ func BenchDatabaseSuite(b *testing.B, New func() ctxcdb.KeyValueStore) {
benchBatchWrite(b, keys, vals)
})
})
b.Run("DeleteRange", func(b *testing.B) {
benchDeleteRange := func(b *testing.B, count int) {
db := New()
defer db.Close()

for i := 0; i < count; i++ {
db.Put([]byte(strconv.Itoa(i)), nil)
}
b.ResetTimer()
b.ReportAllocs()

db.DeleteRange([]byte("0"), []byte("999999999"))
}
b.Run("DeleteRange100", func(b *testing.B) {
benchDeleteRange(b, 100)
})
b.Run("DeleteRange1k", func(b *testing.B) {
benchDeleteRange(b, 1000)
})
b.Run("DeleteRange10k", func(b *testing.B) {
benchDeleteRange(b, 10000)
})
})
}

func iterateKeys(it ctxcdb.Iterator) []string {
Expand Down
31 changes: 31 additions & 0 deletions ctxcdb/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package leveldb

import (
"bytes"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -207,6 +208,36 @@ func (db *Database) Delete(key []byte) error {
return db.db.Delete(key, nil)
}

var ErrTooManyKeys = errors.New("too many keys in deleted range")

// DeleteRange deletes all of the keys (and values) in the range [start,end)
// (inclusive on start, exclusive on end).
// Note that this is a fallback implementation as leveldb does not natively
// support range deletion. It can be slow and therefore the number of deleted
// keys is limited in order to avoid blocking for a very long time.
// ErrTooManyKeys is returned if the range has only been partially deleted.
// In this case the caller can repeat the call until it finally succeeds.
func (db *Database) DeleteRange(start, end []byte) error {
batch := db.NewBatch()
it := db.NewIterator(nil, start)
defer it.Release()

var count int
for it.Next() && bytes.Compare(end, it.Key()) > 0 {
count++
if count > 10000 { // should not block for more than a second
if err := batch.Write(); err != nil {
return err
}
return ErrTooManyKeys
}
if err := batch.Delete(it.Key()); err != nil {
return err
}
}
return batch.Write()
}

// NewBatch creates a write-only key-value store that buffers changes to its host
// database until a final write is called.
func (db *Database) NewBatch() ctxcdb.Batch {
Expand Down
15 changes: 15 additions & 0 deletions ctxcdb/memorydb/memorydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package memorydb

import (
"bytes"
"errors"
"sort"
"strings"
Expand Down Expand Up @@ -125,6 +126,20 @@ func (db *Database) Delete(key []byte) error {
return nil
}

// DeleteRange deletes all of the keys (and values) in the range [start,end)
// (inclusive on start, exclusive on end).
func (db *Database) DeleteRange(start, end []byte) error {
it := db.NewIterator(nil, start)
defer it.Release()

for it.Next() && bytes.Compare(end, it.Key()) > 0 {
if err := db.Delete(it.Key()); err != nil {
return err
}
}
return nil
}

// NewBatch creates a write-only key-value store that buffers changes to its host
// database until a final write is called.
func (db *Database) NewBatch() ctxcdb.Batch {
Expand Down
13 changes: 12 additions & 1 deletion ctxcdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,18 @@ func (d *Database) Delete(key []byte) error {
if d.closed {
return pebble.ErrClosed
}
return d.db.Delete(key, nil)
return d.db.Delete(key, d.writeOptions)
}

// DeleteRange deletes all of the keys (and values) in the range [start,end)
// (inclusive on start, exclusive on end).
func (d *Database) DeleteRange(start, end []byte) error {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return pebble.ErrClosed
}
return d.db.DeleteRange(start, end, d.writeOptions)
}

// NewBatch creates a write-only key-value store that buffers changes to its host
Expand Down
4 changes: 4 additions & 0 deletions ctxcdb/remotedb/remotedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func (db *Database) Delete(key []byte) error {
panic("not supported")
}

func (db *Database) DeleteRange(start, end []byte) error {
panic("not supported")
}

func (db *Database) ModifyAncients(f func(ctxcdb.AncientWriteOp) error) (int64, error) {
panic("not supported")
}
Expand Down
1 change: 1 addition & 0 deletions tests/fuzzers/stacktrie/trie_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type spongeDb struct {
func (s *spongeDb) Has(key []byte) (bool, error) { panic("implement me") }
func (s *spongeDb) Get(key []byte) ([]byte, error) { return nil, errors.New("no such elem") }
func (s *spongeDb) Delete(key []byte) error { panic("implement me") }
func (s *spongeDb) DeleteRange(start, end []byte) error { panic("implement me") }
func (s *spongeDb) NewBatch() ctxcdb.Batch { return &spongeBatch{s} }
func (s *spongeDb) NewBatchWithSize(size int) ctxcdb.Batch { return &spongeBatch{s} }
func (s *spongeDb) NewSnapshot() (ctxcdb.Snapshot, error) { panic("implement me") }
Expand Down
2 changes: 1 addition & 1 deletion trie/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (l *loggingDb) Put(key []byte, value []byte) error {
func (l *loggingDb) Delete(key []byte) error {
return l.backend.Delete(key)
}

func (s *loggingDb) DeleteRange(start, end []byte) error { panic("implement me") }
func (l *loggingDb) NewBatch() ctxcdb.Batch {
return l.backend.NewBatch()
}
Expand Down
1 change: 1 addition & 0 deletions trie/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ func (s *spongeDb) NewBatch() ctxcdb.Batch { return &spongeBat
func (s *spongeDb) NewBatchWithSize(size int) ctxcdb.Batch { return &spongeBatch{s} }
func (s *spongeDb) NewSnapshot() (ctxcdb.Snapshot, error) { panic("implement me") }
func (s *spongeDb) Stat(property string) (string, error) { panic("implement me") }
func (s *spongeDb) DeleteRange(start, end []byte) error { panic("implement me") }
func (s *spongeDb) Compact(start []byte, limit []byte) error { panic("implement me") }
func (s *spongeDb) Close() error { return nil }
func (s *spongeDb) Put(key []byte, value []byte) error {
Expand Down