Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,38 @@ func (r *reducer) run() {
thr.Start()
NumReducers.Add(1)
NumQueuedReduceJobs.Add(-1)
r.writesThr.Start()
go func(job shuffleOutput) {
r.reduce(job)
thr.Done()
NumReducers.Add(-1)
}(reduceJob)
}
thr.Wait()
// We should wait for all pending transactions to commit here. In this function we are not
// calling r.writesThr.Start(), but those are called as part of r.reduce(job) function.
r.writesThr.Wait()
}

func (r *reducer) reduce(job shuffleOutput) {
var currentKey []byte
var uids []uint64
pl := new(pb.PostingList)
txn := job.db.NewTransactionAt(r.state.writeTs, true)

newTxn := func() *badger.Txn {
r.writesThr.Start()
return job.db.NewTransactionAt(r.state.writeTs, true)
}

commitTxn := func(txn *badger.Txn) {
NumBadgerWrites.Add(1)
x.Check(txn.CommitAt(r.state.writeTs, func(err error) {
x.Check(err)
NumBadgerWrites.Add(-1)
r.writesThr.Done()
}))
}

txn := newTxn()

outputPostingList := func() {
atomic.AddInt64(&r.prog.reduceKeyCount, 1)
Expand All @@ -68,7 +84,20 @@ func (r *reducer) reduce(job shuffleOutput) {
pl.Pack = codec.Encode(uids, 256)
val, err := pl.Marshal()
x.Check(err)
x.Check(txn.SetEntry(badger.NewEntry(currentKey, val).WithMeta(meta)))

// If error returned while setting entry is badger.ErrTxnTooBig, we should commit current
// txn and start a new one. But if setEntry returns same error on new Txn, that means
// entry size is bigger than maximum size Txn can have. Hence we should panic.
e := badger.NewEntry(currentKey, val).WithMeta(meta)
err = txn.SetEntry(e)
if err == badger.ErrTxnTooBig {
commitTxn(txn)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extra newline doesn't seem necessary

txn = newTxn()
x.Check(txn.SetEntry(e)) // We are not checking ErrTxnTooBig the second time.
} else {
x.Check(err)
}

uids = uids[:0]
pl.Reset()
Expand Down Expand Up @@ -96,10 +125,5 @@ func (r *reducer) reduce(job shuffleOutput) {
}
outputPostingList()

NumBadgerWrites.Add(1)
x.Check(txn.CommitAt(r.state.writeTs, func(err error) {
x.Check(err)
NumBadgerWrites.Add(-1)
r.writesThr.Done()
}))
commitTxn(txn)
}
14 changes: 13 additions & 1 deletion dgraph/cmd/bulk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,19 @@ func (s *schemaStore) write(db *badger.DB, preds []string) {
k := x.SchemaKey(pred)
v, err := sch.Marshal()
x.Check(err)
x.Check(txn.SetEntry(badger.NewEntry(k, v).WithMeta(posting.BitCompletePosting)))

// If error returned while setting entry is badger.ErrTxnTooBig, we should commit current
// txn and start a new one. But if setEntry returns same error on new Txn, that means
// entry size is bigger than maximum size Txn can have. Hence we should panic.
e := badger.NewEntry(k, v).WithMeta(posting.BitCompletePosting)
err = txn.SetEntry(e)
if err == badger.ErrTxnTooBig {
x.Check(txn.CommitAt(1, nil))
txn = db.NewTransactionAt(math.MaxUint64, true)
x.Check(txn.SetEntry(e)) // We are not checking ErrTxnTooBig for second time.
} else {
x.Check(err)
}
}

// Write schema always at timestamp 1, s.state.writeTs may not be equal to 1
Expand Down