Skip to content

Commit 436a1bb

Browse files
damzIbrahim Jarif
andcommitted
Rework DB.DropPrefix (#1381)
Fixes three issues with the current implementation: - It can generate compaction requests that break the invariant that bottom tables need to be consecutive (issue #1380). See #1380 (comment) - It performs the same level compactions in increasing order of levels (starting from L0) which leads to old versions of keys for the prefix re-surfacing to active transactions. - When you have to drop multiple prefixes, the API forces you to drop one prefix at a time and go through the whole expensive table rewriting multiple times. Fixes #1381 Co-authored-by: Ibrahim Jarif <ibrahim@dgraph.io> (cherry picked from commit e013bfd)
1 parent afd9eb5 commit 436a1bb

4 files changed

Lines changed: 143 additions & 59 deletions

File tree

db.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,7 @@ func writeLevel0Table(ft flushTask, f io.Writer) error {
856856
b := table.NewTableBuilder()
857857
defer b.Close()
858858
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
859-
if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) {
859+
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
860860
continue
861861
}
862862
b.Add(iter.Key(), iter.Value())
@@ -866,9 +866,9 @@ func writeLevel0Table(ft flushTask, f io.Writer) error {
866866
}
867867

868868
type flushTask struct {
869-
mt *skl.Skiplist
870-
vptr valuePointer
871-
dropPrefix []byte
869+
mt *skl.Skiplist
870+
vptr valuePointer
871+
dropPrefixes [][]byte
872872
}
873873

874874
// handleFlushTask must be run serially.
@@ -1452,7 +1452,8 @@ func (db *DB) dropAll() (func(), error) {
14521452
// - Compact L0->L1, skipping over Kp.
14531453
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
14541454
// - Resume memtable flushes, compactions and writes.
1455-
func (db *DB) DropPrefix(prefix []byte) error {
1455+
func (db *DB) DropPrefix(prefixes ...[]byte) error {
1456+
db.opt.Infof("DropPrefix Called")
14561457
f := db.prepareToDrop()
14571458
defer f()
14581459
// Block all foreign interactions with memory tables.
@@ -1468,8 +1469,8 @@ func (db *DB) DropPrefix(prefix []byte) error {
14681469
task := flushTask{
14691470
mt: memtable,
14701471
// Ensure that the head of value log gets persisted to disk.
1471-
vptr: db.vhead,
1472-
dropPrefix: prefix,
1472+
vptr: db.vhead,
1473+
dropPrefixes: prefixes,
14731474
}
14741475
db.opt.Debugf("Flushing memtable")
14751476
if err := db.handleFlushTask(task); err != nil {
@@ -1484,7 +1485,7 @@ func (db *DB) DropPrefix(prefix []byte) error {
14841485
db.mt = skl.NewSkiplist(arenaSize(db.opt))
14851486

14861487
// Drop prefixes from the levels.
1487-
if err := db.lc.dropPrefix(prefix); err != nil {
1488+
if err := db.lc.dropPrefixes(prefixes); err != nil {
14881489
return err
14891490
}
14901491
db.opt.Infof("DropPrefix done")

levels.go

Lines changed: 92 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,25 @@ func (s *levelsController) dropTree() (int, error) {
262262
// tables who only have keys with this prefix are quickly dropped. The ones which have other keys
263263
// are run through MergeIterator and compacted to create new tables. All the mechanisms of
264264
// compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow.
265-
func (s *levelsController) dropPrefix(prefix []byte) error {
265+
func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
266+
// Internal move keys related to the given prefix should also be skipped.
267+
for _, prefix := range prefixes {
268+
key := make([]byte, 0, len(badgerMove)+len(prefix))
269+
key = append(key, badgerMove...)
270+
key = append(key, prefix...)
271+
prefixes = append(prefixes, key)
272+
}
273+
266274
opt := s.kv.opt
267-
for _, l := range s.levels {
275+
// Iterate levels in the reverse order because if we were to iterate from
276+
// lower level (say level 0) to a higher level (say level 3) we could have
277+
// a state in which level 0 is compacted and an older version of a key exists in lower level.
278+
// At this point, if someone creates an iterator, they would see an old
279+
// value for a key from lower levels. Iterating in reverse order ensures we
280+
// drop the oldest data first so that lookups never return stale data.
281+
for i := len(s.levels) - 1; i >= 0; i-- {
282+
l := s.levels[i]
283+
268284
l.RLock()
269285
if l.level == 0 {
270286
size := len(l.tables)
@@ -276,7 +292,7 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
276292
score: 1.74,
277293
// A unique number greater than 1.0 does two things. Helps identify this
278294
// function in logs, and forces a compaction.
279-
dropPrefix: prefix,
295+
dropPrefixes: prefixes,
280296
}
281297
if err := s.doCompact(cp); err != nil {
282298
opt.Warningf("While compacting level 0: %v", err)
@@ -286,39 +302,49 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
286302
continue
287303
}
288304

289-
var tables []*table.Table
290-
// Internal move keys related to the given prefix should also be skipped.
291-
moveKeyForPrefix := append(badgerMove, prefix...)
292-
prefixesToSkip := [][]byte{prefix, moveKeyForPrefix}
293-
for _, table := range l.tables {
294-
var absent bool
295-
switch {
296-
case hasAnyPrefixes(table.Smallest(), prefixesToSkip):
297-
case hasAnyPrefixes(table.Biggest(), prefixesToSkip):
298-
case containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixesToSkip):
299-
default:
300-
absent = true
305+
// Build a list of compaction tableGroups affecting all the prefixes we
306+
// need to drop. We need to build tableGroups that satisfy the invariant that
307+
// bottom tables are consecutive.
308+
// tableGroup contains groups of consecutive tables.
309+
var tableGroups [][]*table.Table
310+
var tableGroup []*table.Table
311+
312+
finishGroup := func() {
313+
if len(tableGroup) > 0 {
314+
tableGroups = append(tableGroups, tableGroup)
315+
tableGroup = nil
301316
}
302-
if !absent {
303-
tables = append(tables, table)
317+
}
318+
319+
for _, table := range l.tables {
320+
if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) {
321+
tableGroup = append(tableGroup, table)
322+
} else {
323+
finishGroup()
304324
}
305325
}
326+
finishGroup()
327+
306328
l.RUnlock()
307-
if len(tables) == 0 {
329+
330+
if len(tableGroups) == 0 {
308331
continue
309332
}
310333

311-
cd := compactDef{
312-
elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"),
313-
thisLevel: l,
314-
nextLevel: l,
315-
top: []*table.Table{},
316-
bot: tables,
317-
dropPrefix: prefix,
318-
}
319-
if err := s.runCompactDef(l.level, cd); err != nil {
320-
opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
321-
return err
334+
opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups))
335+
for _, operation := range tableGroups {
336+
cd := compactDef{
337+
elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"),
338+
thisLevel: l,
339+
nextLevel: l,
340+
top: nil,
341+
bot: operation,
342+
dropPrefixes: prefixes,
343+
}
344+
if err := s.runCompactDef(l.level, cd); err != nil {
345+
opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
346+
return err
347+
}
322348
}
323349
}
324350
return nil
@@ -380,9 +406,9 @@ func (l *levelHandler) isCompactable(delSize int64) bool {
380406
}
381407

382408
type compactionPriority struct {
383-
level int
384-
score float64
385-
dropPrefix []byte
409+
level int
410+
score float64
411+
dropPrefixes [][]byte
386412
}
387413

388414
// pickCompactLevel determines which level to compact.
@@ -470,13 +496,19 @@ func (s *levelsController) compactBuildTables(
470496

471497
// Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
472498
var valid []*table.Table
499+
500+
nextTable:
473501
for _, table := range botTables {
474-
if len(cd.dropPrefix) > 0 &&
475-
bytes.HasPrefix(table.Smallest(), cd.dropPrefix) &&
476-
bytes.HasPrefix(table.Biggest(), cd.dropPrefix) {
477-
// All the keys in this table have the dropPrefix. So, this table does not need to be
478-
// in the iterator and can be dropped immediately.
479-
continue
502+
if len(cd.dropPrefixes) > 0 {
503+
for _, prefix := range cd.dropPrefixes {
504+
if bytes.HasPrefix(table.Smallest(), prefix) &&
505+
bytes.HasPrefix(table.Biggest(), prefix) {
506+
// All the keys in this table have the dropPrefix. So, this
507+
// table does not need to be in the iterator and can be
508+
// dropped immediately.
509+
continue nextTable
510+
}
511+
}
480512
}
481513
valid = append(valid, table)
482514
}
@@ -503,12 +535,9 @@ func (s *levelsController) compactBuildTables(
503535
timeStart := time.Now()
504536
builder := table.NewTableBuilder()
505537
var numKeys, numSkips uint64
506-
// Internal move keys related to the given prefix should also be skipped.
507-
moveKeyForPrefix := append(badgerMove, cd.dropPrefix...)
508-
prefixesToSkip := [][]byte{cd.dropPrefix, moveKeyForPrefix}
509538
for ; it.Valid(); it.Next() {
510539
// See if we need to skip the prefix.
511-
if len(cd.dropPrefix) > 0 && hasAnyPrefixes(it.Key(), prefixesToSkip) {
540+
if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) {
512541
numSkips++
513542
updateStats(it.Value())
514543
continue
@@ -672,10 +701,24 @@ func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool {
672701
return false
673702
}
674703

704+
func containsPrefix(smallValue, largeValue, prefix []byte) bool {
705+
if bytes.HasPrefix(smallValue, prefix) {
706+
return true
707+
}
708+
if bytes.HasPrefix(largeValue, prefix) {
709+
return true
710+
}
711+
if bytes.Compare(prefix, smallValue) > 0 &&
712+
bytes.Compare(prefix, largeValue) < 0 {
713+
return true
714+
}
715+
716+
return false
717+
}
718+
675719
func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool {
676720
for _, prefix := range listOfPrefixes {
677-
if bytes.Compare(prefix, smallValue) > 0 &&
678-
bytes.Compare(prefix, largeValue) < 0 {
721+
if containsPrefix(smallValue, largeValue, prefix) {
679722
return true
680723
}
681724
}
@@ -697,7 +740,7 @@ type compactDef struct {
697740

698741
thisSize int64
699742

700-
dropPrefix []byte
743+
dropPrefixes [][]byte
701744
}
702745

703746
func (cd *compactDef) lockLevels() {
@@ -859,10 +902,10 @@ func (s *levelsController) doCompact(p compactionPriority) error {
859902
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.
860903

861904
cd := compactDef{
862-
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
863-
thisLevel: s.levels[l],
864-
nextLevel: s.levels[l+1],
865-
dropPrefix: p.dropPrefix,
905+
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
906+
thisLevel: s.levels[l],
907+
nextLevel: s.levels[l+1],
908+
dropPrefixes: p.dropPrefixes,
866909
}
867910
cd.elog.SetMaxEvents(100)
868911
defer cd.elog.Finish()

levels_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,3 +638,43 @@ func TestDiscardFirstVersion(t *testing.T) {
638638
getAllAndCheck(t, db, ExpectedKeys)
639639
})
640640
}
641+
642+
// Regression test for https://github.com/dgraph-io/dgraph/issues/5573
643+
func TestDropPrefixMoveBug(t *testing.T) {
644+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
645+
// l1 is used to verify that drop prefix actually drops move keys from all the levels.
646+
l1 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}}
647+
createAndOpen(db, l1, 1)
648+
649+
// Mutiple levels can have the exact same move key with version.
650+
l2 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}, {"A", "", 0, 0}}
651+
l21 := []keyValVersion{{"B", "", 0, 0}, {"C", "", 0, 0}}
652+
l22 := []keyValVersion{{"F", "", 0, 0}, {"G", "", 0, 0}}
653+
654+
// Level 2 has all the tables.
655+
createAndOpen(db, l2, 2)
656+
createAndOpen(db, l21, 2)
657+
createAndOpen(db, l22, 2)
658+
659+
require.NoError(t, db.lc.validate())
660+
require.NoError(t, db.DropPrefix([]byte("F")))
661+
662+
db.View(func(txn *Txn) error {
663+
iopt := DefaultIteratorOptions
664+
iopt.AllVersions = true
665+
666+
it := txn.NewIterator(iopt)
667+
defer it.Close()
668+
669+
specialKey := []byte("F")
670+
droppedPrefixes := [][]byte{specialKey, append(badgerMove, specialKey...)}
671+
for it.Rewind(); it.Valid(); it.Next() {
672+
key := it.Item().Key()
673+
// Ensure we don't have any "F" or "!badger!move!F" left
674+
require.False(t, hasAnyPrefixes(key, droppedPrefixes))
675+
}
676+
return nil
677+
})
678+
require.NoError(t, db.lc.validate())
679+
})
680+
}

util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ func (s *levelHandler) validate() error {
6060

6161
if y.CompareKeys(s.tables[j].Smallest(), s.tables[j].Biggest()) > 0 {
6262
return errors.Errorf(
63-
"Intra: %q vs %q: level=%d j=%d numTables=%d",
64-
s.tables[j].Smallest(), s.tables[j].Biggest(), s.level, j, numTables)
63+
"Intra: \n%s\n vs \n%s\n: level=%d j=%d numTables=%d",
64+
hex.Dump(s.tables[j].Smallest()), hex.Dump(s.tables[j].Biggest()), s.level, j, numTables)
6565
}
6666
}
6767
return nil

0 commit comments

Comments
 (0)