Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions dgraph/cmd/bulk/count_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ type current struct {

type countIndexer struct {
*reducer
writer *badger.StreamWriter
cur current
counts map[int][]uint64
wg sync.WaitGroup
writer *badger.StreamWriter
splitWriter *badger.WriteBatch
tmpDb *badger.DB
cur current
counts map[int][]uint64
wg sync.WaitGroup
}

// addUid adds the uid from rawKey to a count index if a count index is
Expand Down
6 changes: 5 additions & 1 deletion dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ type state struct {
readerChunkCh chan *bytes.Buffer
mapFileId uint32 // Used atomically to name the output files of the mappers.
dbs []*badger.DB
writeTs uint64 // All badger writes use this timestamp
tmpDbs []*badger.DB // Temporary DB to write the split lists to avoid ordering issues.
writeTs uint64 // All badger writes use this timestamp
}

type loader struct {
Expand Down Expand Up @@ -331,5 +332,8 @@ func (ld *loader) cleanup() {
for _, db := range ld.dbs {
x.Check(db.Close())
}
for _, db := range ld.tmpDbs {
x.Check(db.Close())
}
ld.prog.endSummary()
}
140 changes: 126 additions & 14 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -52,6 +54,7 @@ type reducer struct {
}

const batchSize = 10000
const maxSplitBatchLen = 1000
const batchAlloc = batchSize * 11 / 10

func (r *reducer) run() error {
Expand All @@ -64,7 +67,7 @@ func (r *reducer) run() error {
if err := thr.Do(); err != nil {
return err
}
go func(shardId int, db *badger.DB) {
go func(shardId int, db *badger.DB, tmpDb *badger.DB) {
defer thr.Done(nil)

mapFiles := filenamesInTree(dirs[shardId])
Expand All @@ -78,8 +81,15 @@ func (r *reducer) run() error {

writer := db.NewStreamWriter()
x.Check(writer.Prepare())

ci := &countIndexer{reducer: r, writer: writer}
// Split lists are written to a separate DB first to avoid ordering issues.
splitWriter := tmpDb.NewManagedWriteBatch()

ci := &countIndexer{
reducer: r,
writer: writer,
splitWriter: splitWriter,
tmpDb: tmpDb,
}
sort.Slice(partitionKeys, func(i, j int) bool {
return bytes.Compare(partitionKeys[i], partitionKeys[j]) < 0
})
Expand All @@ -92,17 +102,21 @@ func (r *reducer) run() error {
ci.wait()

x.Check(writer.Flush())

// Write split lists back to the main DB.
r.writeSplitLists(db, tmpDb)

for _, itr := range mapItrs {
if err := itr.Close(); err != nil {
fmt.Printf("Error while closing iterator: %v", err)
}
}
}(i, r.createBadger(i))
}(i, r.createBadger(i), r.createTmpBadger())
}
return thr.Finish()
}

func (r *reducer) createBadger(i int) *badger.DB {
func (r *reducer) createBadgerInternal(dir string, compression bool) *badger.DB {
if r.opt.BadgerKeyFile != "" {
// Need to set zero addr in WorkerConfig before checking the license.
x.WorkerConfig.ZeroAddr = []string{r.opt.ZeroAddr}
Expand All @@ -115,25 +129,43 @@ func (r *reducer) createBadger(i int) *badger.DB {
}
}

opt := badger.DefaultOptions(r.opt.shardOutputDirs[i]).WithSyncWrites(false).
opt := badger.DefaultOptions(dir).WithSyncWrites(false).
WithTableLoadingMode(bo.MemoryMap).WithValueThreshold(1 << 10 /* 1 KB */).
WithLogger(nil).WithMaxCacheSize(1 << 20).
WithEncryptionKey(enc.ReadEncryptionKeyFile(r.opt.BadgerKeyFile)).WithCompression(bo.None)

// Overwrite badger options based on the options provided by the user.
r.setBadgerOptions(&opt)
r.setBadgerOptions(&opt, compression)

db, err := badger.OpenManaged(opt)
x.Check(err)

// Zero out the key from memory.
opt.EncryptionKey = nil
return db
}

func (r *reducer) createBadger(i int) *badger.DB {
db := r.createBadgerInternal(r.opt.shardOutputDirs[i], true)
r.dbs = append(r.dbs, db)
return db
}

func (r *reducer) setBadgerOptions(opt *badger.Options) {
func (r *reducer) createTmpBadger() *badger.DB {
tmpDir, err := ioutil.TempDir(r.opt.TmpDir, "split")
x.Check(err)
// Do not enable compression in temporary badger to improve performance.
db := r.createBadgerInternal(tmpDir, false)
r.tmpDbs = append(r.tmpDbs, db)
return db
}

func (r *reducer) setBadgerOptions(opt *badger.Options, compression bool) {
if !compression {
opt.Compression = bo.None
opt.ZSTDCompressionLevel = 0
return
}
// Set the compression level.
opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel
if r.state.opt.BadgerCompressionLevel < 1 {
Expand Down Expand Up @@ -282,6 +314,7 @@ type encodeRequest struct {
countKeys []*countIndexEntry
wg *sync.WaitGroup
list *bpb.KVList
splitList *bpb.KVList
}

func (r *reducer) streamIdFor(pred string) uint32 {
Expand All @@ -306,24 +339,60 @@ func (r *reducer) encode(entryCh chan *encodeRequest, closer *y.Closer) {
for req := range entryCh {

req.list = &bpb.KVList{}
countKeys := r.toList(req.entries, req.list)
req.splitList = &bpb.KVList{}
countKeys := r.toList(req.entries, req.list, req.splitList)
for _, kv := range req.list.Kv {
pk, err := x.Parse(kv.Key)
x.Check(err)
x.AssertTrue(len(pk.Attr) > 0)
kv.StreamId = r.streamIdFor(pk.Attr)
if pk.HasStartUid {
kv.StreamId |= 0x80000000
}
}
req.countKeys = countKeys
req.wg.Done()
atomic.AddInt32(&r.prog.numEncoding, -1)
}
}

func (r *reducer) writeTmpSplits(ci *countIndexer, kvsCh chan *bpb.KVList, wg *sync.WaitGroup) {
defer wg.Done()
splitBatchLen := 0

for kvs := range kvsCh {
if kvs == nil || len(kvs.Kv) == 0 {
continue
}

for i := 0; i < len(kvs.Kv); i += maxSplitBatchLen {
// Flush the write batch when the max batch length is reached to prevent the
// value log from growing over the allowed limit.
if splitBatchLen >= maxSplitBatchLen {
x.Check(ci.splitWriter.Flush())
ci.splitWriter = ci.tmpDb.NewManagedWriteBatch()
splitBatchLen = 0
}

batch := &bpb.KVList{}
if i+maxSplitBatchLen >= len(kvs.Kv) {
batch.Kv = kvs.Kv[i:]
} else {
batch.Kv = kvs.Kv[i : i+maxSplitBatchLen]
}
splitBatchLen += len(batch.Kv)
x.Check(ci.splitWriter.Write(batch))
}
}
x.Check(ci.splitWriter.Flush())
}

func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *y.Closer) {
defer closer.Done()

// Concurrently write split lists to a temporary badger.
tmpWg := new(sync.WaitGroup)
tmpWg.Add(1)
splitCh := make(chan *bpb.KVList, 2*runtime.NumCPU())
go r.writeTmpSplits(ci, splitCh, tmpWg)

for req := range writerCh {
req.wg.Wait()

Expand All @@ -334,11 +403,53 @@ func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, c
start := time.Now()

x.Check(ci.writer.Write(req.list))
if req.splitList != nil && len(req.splitList.Kv) > 0 {
splitCh <- req.splitList
}

if dur := time.Since(start).Round(time.Millisecond); dur > time.Second {
fmt.Printf("writeCh: Time taken to write req: %v\n",
time.Since(start).Round(time.Millisecond))
}
}

// Wait for split lists to be written to the temporary badger.
close(splitCh)
tmpWg.Wait()
}

func (r *reducer) writeSplitLists(db, tmpDb *badger.DB) {
txn := tmpDb.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions)
defer itr.Close()

writer := db.NewManagedWriteBatch()
splitBatchLen := 0

for itr.Rewind(); itr.Valid(); itr.Next() {
// Flush the write batch when the max batch length is reached to prevent the
// value log from growing over the allowed limit.
if splitBatchLen >= maxSplitBatchLen {
x.Check(writer.Flush())
writer = db.NewManagedWriteBatch()
splitBatchLen = 0
}
item := itr.Item()

valCopy, err := item.ValueCopy(nil)
x.Check(err)
kv := &bpb.KV{
Key: item.KeyCopy(nil),
Value: valCopy,
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
ExpiresAt: item.ExpiresAt(),
}
x.Check(writer.Write(&bpb.KVList{Kv: []*bpb.KV{kv}}))
splitBatchLen += 1
}
x.Check(writer.Flush())
}

func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *countIndexer) {
Expand Down Expand Up @@ -395,7 +506,7 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
writerCloser.SignalAndWait()
}

func (r *reducer) toList(bufEntries [][]byte, list *bpb.KVList) []*countIndexEntry {
func (r *reducer) toList(bufEntries [][]byte, list, splitList *bpb.KVList) []*countIndexEntry {
sort.Slice(bufEntries, func(i, j int) bool {
lh, err := GetKeyForMapEntry(bufEntries[i])
x.Check(err)
Expand Down Expand Up @@ -479,7 +590,8 @@ func (r *reducer) toList(bufEntries [][]byte, list *bpb.KVList) []*countIndexEnt
l := posting.NewList(y.Copy(currentKey), pl, writeVersionTs)
kvs, err := l.Rollup()
x.Check(err)
list.Kv = append(list.Kv, kvs...)
list.Kv = append(list.Kv, kvs[0])
splitList.Kv = append(splitList.Kv, kvs[1:]...)
} else {
val, err := pl.Marshal()
x.Check(err)
Expand Down