Conversation
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
…ures that we don't have too many pending requests.
…encoder. We need a way to parse key easily for comparisons though.
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
…aji/new_reduce Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
Signed-off-by: balaji <rbalajis25@gmail.com>
manishrjain
left a comment
There was a problem hiding this comment.
Looks alright to me. Have some comments. Will defer to @animesh2049 to do the final review and LGTM.
Reviewed 1 of 5 files at r2, 1 of 3 files at r3, 7 of 8 files at r5.
Reviewable status: all files reviewed, 15 unresolved discussions (waiting on @balajijinnah)
dgraph/cmd/bulk/key.go, line 1 at r5 (raw file):
package bulk
Add a license, but use the license that protobuf generated code has (or protobuf has).
dgraph/cmd/bulk/mapper.go, line 129 at r5 (raw file):
PartitionKeys: [][]byte{}, } shardPartioionNo := len(entries) / partitionKeyShard
spelling mistake
dgraph/cmd/bulk/mapper.go, line 132 at r5 (raw file):
for i := range entries { if shardPartioionNo == 0 { // we have only less entries so no need for partition keys.
we have very few ...
dgraph/cmd/bulk/mapper.go, line 144 at r5 (raw file):
lenBuf := make([]byte, 4) binary.BigEndian.PutUint32(lenBuf, uint32(len(headerBuf))) _, err = w.Write(lenBuf)
x.Check2(...)
dgraph/cmd/bulk/mapper.go, line 146 at r5 (raw file):
_, err = w.Write(lenBuf) x.Check(err) _, err = w.Write(headerBuf)
x.Check2(...)
dgraph/cmd/bulk/reduce.go, line 89 at r5 (raw file):
}) // Start bactching for the given keys
spelling
dgraph/cmd/bulk/reduce.go, line 181 at r5 (raw file):
// readKey reads the next map entry key. readKey := func() {
readKey := func() error {
...
return io.EOF
}
Don't need eof variable.
Also, rename to readMapEntry
dgraph/cmd/bulk/reduce.go, line 185 at r5 (raw file):
return } if !prevKeyExist {
if prevKeyExist { return }
dgraph/cmd/bulk/reduce.go, line 202 at r5 (raw file):
x.Check2(io.ReadFull(r, eBuf)) key, err = GetKeyForMapEntry(eBuf)
return key, err
key, err = readMapEntry(...)
dgraph/cmd/bulk/reduce.go, line 217 at r5 (raw file):
ie.partitionKey = pKey for { readKey()
if err := readKey(); err == io.EOF { break }
dgraph/cmd/bulk/reduce.go, line 231 at r5 (raw file):
} mi.batchCh <- ie i++
Doesn't look like its used.
dgraph/cmd/bulk/reduce.go, line 237 at r5 (raw file):
batch := make([][]byte, 0, batchAlloc) for { readKey()
fix this up as well.
dgraph/cmd/bulk/reduce.go, line 257 at r5 (raw file):
} func (mi *mapIterator) Next() bool {
Next() *iteratorEntry
Remove current(). If nothing is there, then return nil.
dgraph/cmd/bulk/reduce.go, line 374 at r5 (raw file):
go r.encode(encoderCh, encoderCloser) } // Start lisenting to write the badger list.
spelling mistake.
dgraph/cmd/bulk/reduce.go, line 380 at r5 (raw file):
for i := 0; i < len(partitionKeys); i++ { batch := make([][]byte, 0, batchAlloc) for _, itr := range mapItrs {
Let's add the assert back.
Signed-off-by: balaji <rbalajis25@gmail.com>
poonai
left a comment
There was a problem hiding this comment.
Reviewable status: 6 of 9 files reviewed, 15 unresolved discussions (waiting on @manishrjain)
dgraph/cmd/bulk/key.go, line 1 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Add a license, but use the license that protobuf generated code has (or protobuf has).
Done.
dgraph/cmd/bulk/mapper.go, line 129 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
spelling mistake
Done.
dgraph/cmd/bulk/mapper.go, line 132 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
we have very few ...
Done.
dgraph/cmd/bulk/mapper.go, line 144 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
x.Check2(...)
Done.
dgraph/cmd/bulk/mapper.go, line 146 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
x.Check2(...)
Done.
dgraph/cmd/bulk/reduce.go, line 89 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
spelling
Done.
dgraph/cmd/bulk/reduce.go, line 181 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
readKey := func() error {
...
return io.EOF
}Don't need eof variable.
Also, rename to readMapEntry
Done.
dgraph/cmd/bulk/reduce.go, line 185 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
if prevKeyExist { return }
Done.
dgraph/cmd/bulk/reduce.go, line 202 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
return key, err
key, err = readMapEntry(...)
Done.
dgraph/cmd/bulk/reduce.go, line 217 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
if err := readKey(); err == io.EOF { break }
Done.
dgraph/cmd/bulk/reduce.go, line 231 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Doesn't look like its used.
Done.
dgraph/cmd/bulk/reduce.go, line 237 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
fix this up as well.
Done.
dgraph/cmd/bulk/reduce.go, line 257 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Next() *iteratorEntryRemove current(). If nothing is there, then return nil.
Done.
dgraph/cmd/bulk/reduce.go, line 374 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
spelling mistake.
Done.
dgraph/cmd/bulk/reduce.go, line 380 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Let's add the assert back.
Done.
| tmpBuf []byte | ||
| fd *os.File | ||
| reader *bufio.Reader | ||
| current *iteratorEntry |
There was a problem hiding this comment.
field current is unused (from unused)
martinmr
left a comment
There was a problem hiding this comment.
Reviewed 1 of 5 files at r2, 3 of 8 files at r5, 3 of 3 files at r6.
Reviewable status: all files reviewed, 21 unresolved discussions (waiting on @balajijinnah, @golangcibot, and @manishrjain)
dgraph/cmd/bulk/loader.go, line 235 at r6 (raw file):
r := reducer{ state: ld.state, mu: new(sync.RWMutex),
If you make mu not a pointer you won't have to worry about initializing it.
Also, maybe add a method NewReducer to encapsulate the initialization. Not needed if it's only happening once but makes the code a bit cleaner.
dgraph/cmd/bulk/reduce.go, line 154 at r6 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
field
currentis unused (fromunused)
I think you stopped using current but didn't remove the definition.
dgraph/cmd/bulk/reduce.go, line 172 at r6 (raw file):
} func (mi *mapIterator) startBatchingForKeys(partitionsKeys [][]byte) {
does renaming this to startBatching still conveys the same idea? Current name is a bit too long.
startBatchingKeys also could work.
dgraph/cmd/bulk/reduce.go, line 221 at r6 (raw file):
so,
can remove the comma here.
dgraph/cmd/bulk/reduce.go, line 244 at r6 (raw file):
} } func (mi *mapIterator) Close() error {
add a blank line above
dgraph/cmd/bulk/reduce.go, line 315 at r6 (raw file):
req.list = &bpb.KVList{} countKeys := r.toList(req.entries, req.list) // r.toList(req) // contains entries, list and freelist struct.
why is this commented out?
animesh2049
left a comment
There was a problem hiding this comment.
Reviewable status: all files reviewed, 26 unresolved discussions (waiting on @balajijinnah, @golangcibot, and @manishrjain)
dgraph/cmd/bulk/mapper.go, line 135 at r6 (raw file):
break } if i%shardPartitionNo == 0 {
I think it will be better if we do (i+1)%shardPartitionNo, otherwise the first entry will always become partition key.
dgraph/cmd/bulk/reduce.go, line 90 at r6 (raw file):
// Start batching for the given keys fmt.Printf("Num map iterators: %d\n", len(mapItrs))
Do you want to keep this?
dgraph/cmd/bulk/reduce.go, line 252 at r6 (raw file):
} func (r *reducer) newMapIterator(filename string) (*pb.MapHeader, *mapIterator) {
Why is this a method on reducer?
dgraph/cmd/bulk/reduce.go, line 348 at r6 (raw file):
idx++ if idx%100 == 0 { fmt.Printf("Wrote req: %d\n", idx)
Do you want to keep this ?
dgraph/cmd/bulk/reduce.go, line 359 at r6 (raw file):
writerCh := make(chan *encodeRequest, 2*cpu) encoderCloser := y.NewCloser(cpu) for i := 0; i < runtime.NumCPU(); i++ {
use cpu variable.
Signed-off-by: balaji <rbalajis25@gmail.com>
poonai
left a comment
There was a problem hiding this comment.
Reviewable status: 6 of 9 files reviewed, 27 unresolved discussions (waiting on @animesh2049, @golangcibot, @manishrjain, and @martinmr)
dgraph/cmd/bulk/loader.go, line 235 at r6 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
If you make
munot a pointer you won't have to worry about initializing it.Also, maybe add a method
NewReducerto encapsulate the initialization. Not needed if it's only happening once but makes the code a bit cleaner.
Done.
dgraph/cmd/bulk/mapper.go, line 135 at r6 (raw file):
Previously, animesh2049 (Animesh Chandra Pathak) wrote…
I think it will be better if we do
(i+1)%shardPartitionNo, otherwise the first entry will always become partition key.
Done.
dgraph/cmd/bulk/reduce.go, line 90 at r6 (raw file):
Previously, animesh2049 (Animesh Chandra Pathak) wrote…
Do you want to keep this?
Done.
dgraph/cmd/bulk/reduce.go, line 154 at r6 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
I think you stopped using current but didn't remove the definition.
Done.
dgraph/cmd/bulk/reduce.go, line 172 at r6 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
does renaming this to
startBatchingstill conveys the same idea? Current name is a bit too long.
startBatchingKeysalso could work.
Done.
dgraph/cmd/bulk/reduce.go, line 221 at r6 (raw file):
for i := 0; i < runtime.NumCPU(); i++ {
dgraph/cmd/bulk/reduce.go, line 221 at r6 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
so,can remove the comma here.
Done.
dgraph/cmd/bulk/reduce.go, line 244 at r6 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
add a blank line above
Done.
dgraph/cmd/bulk/reduce.go, line 252 at r6 (raw file):
Previously, animesh2049 (Animesh Chandra Pathak) wrote…
Why is this a method on
reducer?
Done.
dgraph/cmd/bulk/reduce.go, line 315 at r6 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
why is this commented out?
Done.
dgraph/cmd/bulk/reduce.go, line 348 at r6 (raw file):
Previously, animesh2049 (Animesh Chandra Pathak) wrote…
Do you want to keep this ?
Done.
dgraph/cmd/bulk/reduce.go, line 359 at r6 (raw file):
Previously, animesh2049 (Animesh Chandra Pathak) wrote…
use
cpuvariable.
Done.
animesh2049
left a comment
There was a problem hiding this comment.
Dismissed @balajijinnah from a discussion.
Reviewable status: 6 of 9 files reviewed, 22 unresolved discussions (waiting on @animesh2049, @golangcibot, @manishrjain, and @martinmr)
This pr will remove the heap-based sorting, instead we'll batch every keys in memory in partition manner and we sort it
This change is
Docs Preview: