Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Commit c9ee09f

Browse files
authored
Merge pull request #644 from erizocosmico/feature/parallel-indexes
sql/index/pilosa: parallelize index creation
2 parents 7b6a0f6 + a9e32d0 commit c9ee09f

File tree

7 files changed

+200
-68
lines changed

7 files changed

+200
-68
lines changed

sql/index/pilosa/driver.go

Lines changed: 114 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ import (
88
"io/ioutil"
99
"os"
1010
"path/filepath"
11+
"runtime"
12+
"strconv"
1113
"strings"
14+
"sync"
15+
"sync/atomic"
1216
"time"
1317

1418
opentracing "github.com/opentracing/opentracing-go"
@@ -49,6 +53,11 @@ var (
4953
errInvalidIndexType = errors.NewKind("expecting a pilosa index, instead got %T")
5054
)
5155

56+
const (
57+
pilosaIndexThreadsKey = "PILOSA_INDEX_THREADS"
58+
pilosaIndexThreadsVar = "pilosa_index_threads"
59+
)
60+
5261
type (
5362
bitBatch struct {
5463
size uint64
@@ -217,13 +226,13 @@ func (d *Driver) savePartition(
217226
kviter sql.IndexKeyValueIter,
218227
idx *pilosaIndex,
219228
pilosaIndex *concurrentPilosaIndex,
220-
offset uint64,
221229
b *batch,
222230
) (uint64, error) {
223231
var (
224232
colID uint64
225233
err error
226234
)
235+
227236
for i, e := range idx.Expressions() {
228237
name := fieldName(idx.ID(), e, p)
229238
pilosaIndex.DeleteField(name)
@@ -254,7 +263,7 @@ func (d *Driver) savePartition(
254263
kviter.Close()
255264
}()
256265

257-
for colID = offset; err == nil; colID++ {
266+
for colID = 0; err == nil; colID++ {
258267
// commit each batch of objects (pilosa and boltdb)
259268
if colID%sql.IndexBatchSize == 0 && colID != 0 {
260269
if err = d.saveBatch(ctx, idx.mapping, colID, b); err != nil {
@@ -265,28 +274,30 @@ func (d *Driver) savePartition(
265274
select {
266275
case <-ctx.Context.Done():
267276
return 0, ctx.Context.Err()
268-
269277
default:
270-
var (
271-
values []interface{}
272-
location []byte
273-
)
274-
if values, location, err = kviter.Next(); err != nil {
275-
break
276-
}
278+
}
277279

278-
for i, field := range b.fields {
279-
if values[i] == nil {
280-
continue
281-
}
280+
values, location, err := kviter.Next()
281+
if err != nil {
282+
break
283+
}
282284

283-
rowID, err := idx.mapping.getRowID(field.Name(), values[i])
284-
if err != nil {
285-
return 0, err
286-
}
287-
b.bitBatches[i].Add(rowID, colID)
285+
for i, field := range b.fields {
286+
if values[i] == nil {
287+
continue
288+
}
289+
290+
rowID, err := idx.mapping.getRowID(field.Name(), values[i])
291+
if err != nil {
292+
return 0, err
288293
}
289-
err = idx.mapping.putLocation(pilosaIndex.Name(), colID, location)
294+
295+
b.bitBatches[i].Add(rowID, colID)
296+
}
297+
298+
err = idx.mapping.putLocation(pilosaIndex.Name(), p, colID, location)
299+
if err != nil {
300+
return 0, err
290301
}
291302
}
292303

@@ -307,7 +318,7 @@ func (d *Driver) savePartition(
307318
}
308319
}
309320

310-
return colID - offset, err
321+
return colID, err
311322
}
312323

313324
// Save the given index (mapping and bitmap)
@@ -331,44 +342,86 @@ func (d *Driver) Save(
331342
idx.wg.Add(1)
332343
defer idx.wg.Done()
333344

334-
var b = batch{
335-
fields: make([]*pilosa.Field, len(idx.Expressions())),
336-
bitBatches: make([]*bitBatch, len(idx.Expressions())),
337-
}
338-
339345
ctx.Context, idx.cancel = context.WithCancel(ctx.Context)
340346
processingFile := d.processingFilePath(i.Database(), i.Table(), i.ID())
341-
if err := index.WriteProcessingFile(
347+
err = index.WriteProcessingFile(
342348
processingFile,
343349
[]byte{processingFileOnSave},
344-
); err != nil {
350+
)
351+
if err != nil {
345352
return err
346353
}
347354

348355
defer iter.Close()
349356
pilosaIndex := idx.index
350-
var rows uint64
357+
358+
var (
359+
rows, timePilosa, timeMapping uint64
360+
361+
wg sync.WaitGroup
362+
tokens = make(chan struct{}, indexThreads(ctx))
363+
364+
errors []error
365+
errmut sync.Mutex
366+
)
367+
351368
for {
369+
select {
370+
case <-ctx.Done():
371+
return
372+
default:
373+
}
374+
352375
p, kviter, err := iter.Next()
353376
if err != nil {
354377
if err == io.EOF {
355378
break
356379
}
357-
return err
358-
}
359380

360-
numRows, err := d.savePartition(ctx, p, kviter, idx, pilosaIndex, rows, &b)
361-
if err != nil {
381+
idx.cancel()
382+
wg.Wait()
362383
return err
363384
}
364385

365-
rows += numRows
386+
wg.Add(1)
387+
388+
go func() {
389+
defer func() {
390+
wg.Done()
391+
<-tokens
392+
}()
393+
394+
tokens <- struct{}{}
395+
396+
var b = &batch{
397+
fields: make([]*pilosa.Field, len(idx.Expressions())),
398+
bitBatches: make([]*bitBatch, len(idx.Expressions())),
399+
}
400+
401+
numRows, err := d.savePartition(ctx, p, kviter, idx, pilosaIndex, b)
402+
if err != nil {
403+
errmut.Lock()
404+
errors = append(errors, err)
405+
idx.cancel()
406+
errmut.Unlock()
407+
return
408+
}
409+
410+
atomic.AddUint64(&timeMapping, uint64(b.timeMapping))
411+
atomic.AddUint64(&timePilosa, uint64(b.timePilosa))
412+
atomic.AddUint64(&rows, numRows)
413+
}()
414+
}
415+
416+
wg.Wait()
417+
if len(errors) > 0 {
418+
return errors[0]
366419
}
367420

368421
logrus.WithFields(logrus.Fields{
369422
"duration": time.Since(start),
370-
"pilosa": b.timePilosa,
371-
"mapping": b.timeMapping,
423+
"pilosa": timePilosa,
424+
"mapping": timeMapping,
372425
"rows": rows,
373426
"id": i.ID(),
374427
}).Debugf("finished pilosa indexing")
@@ -421,18 +474,18 @@ func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error {
421474
return partitions.Close()
422475
}
423476

424-
func (d *Driver) saveBatch(ctx *sql.Context, m *mapping, colID uint64, b *batch) error {
425-
err := d.savePilosa(ctx, colID, b)
477+
func (d *Driver) saveBatch(ctx *sql.Context, m *mapping, cols uint64, b *batch) error {
478+
err := d.savePilosa(ctx, cols, b)
426479
if err != nil {
427480
return err
428481
}
429482

430-
return d.saveMapping(ctx, m, colID, true, b)
483+
return d.saveMapping(ctx, m, cols, true, b)
431484
}
432485

433-
func (d *Driver) savePilosa(ctx *sql.Context, colID uint64, b *batch) error {
486+
func (d *Driver) savePilosa(ctx *sql.Context, cols uint64, b *batch) error {
434487
span, _ := ctx.Span("pilosa.Save.bitBatch",
435-
opentracing.Tag{Key: "cols", Value: colID},
488+
opentracing.Tag{Key: "cols", Value: cols},
436489
opentracing.Tag{Key: "fields", Value: len(b.fields)},
437490
)
438491
defer span.Finish()
@@ -457,12 +510,12 @@ func (d *Driver) savePilosa(ctx *sql.Context, colID uint64, b *batch) error {
457510
func (d *Driver) saveMapping(
458511
ctx *sql.Context,
459512
m *mapping,
460-
colID uint64,
513+
cols uint64,
461514
cont bool,
462515
b *batch,
463516
) error {
464517
span, _ := ctx.Span("pilosa.Save.mapping",
465-
opentracing.Tag{Key: "cols", Value: colID},
518+
opentracing.Tag{Key: "cols", Value: cols},
466519
opentracing.Tag{Key: "continues", Value: cont},
467520
)
468521
defer span.Finish()
@@ -541,3 +594,21 @@ func (d *Driver) newPilosaIndex(db, table string) (*pilosa.Index, error) {
541594
}
542595
return idx, nil
543596
}
597+
598+
func indexThreads(ctx *sql.Context) int {
599+
typ, val := ctx.Session.Get(pilosaIndexThreadsVar)
600+
if val != nil && typ == sql.Int64 {
601+
return int(val.(int64))
602+
}
603+
604+
var value int
605+
if v, ok := os.LookupEnv(pilosaIndexThreadsKey); ok {
606+
value, _ = strconv.Atoi(v)
607+
}
608+
609+
if value <= 0 {
610+
value = runtime.NumCPU()
611+
}
612+
613+
return value
614+
}

sql/index/pilosa/driver_test.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ func TestSaveAndLoad(t *testing.T) {
189189
require.Equal(1, len(indexes))
190190

191191
var locations = make([][]string, len(it.records))
192+
192193
for partition, records := range it.records {
193194
for _, r := range records {
194195
lookup, err := sqlIdx.Get(r.values...)
@@ -555,6 +556,7 @@ func TestIntersection(t *testing.T) {
555556

556557
lookupLang, err := sqlIdxLang.Get(itLang.records[0][0].values...)
557558
require.NoError(err)
559+
558560
lookupPath, err := sqlIdxPath.Get(itPath.records[0][itPath.total-1].values...)
559561
require.NoError(err)
560562

@@ -1291,10 +1293,10 @@ func (it *testIndexKeyValueIter) Next() ([]interface{}, []byte, error) {
12911293
values[i] = e + "-" + loc + "-" + string(it.partition.Key())
12921294
}
12931295

1294-
*it.records = append(*it.records, testRecord{
1296+
(*it.records)[it.offset] = testRecord{
12951297
values,
12961298
[]byte(loc),
1297-
})
1299+
}
12981300
it.offset++
12991301

13001302
return values, []byte(loc), nil
@@ -1430,13 +1432,23 @@ type partitionKeyValueIter struct {
14301432
records [][]testRecord
14311433
}
14321434

1435+
func (i *partitionKeyValueIter) init() {
1436+
i.records = make([][]testRecord, i.partitions)
1437+
for j := 0; j < i.partitions; j++ {
1438+
i.records[j] = make([]testRecord, i.total)
1439+
}
1440+
}
1441+
14331442
func (i *partitionKeyValueIter) Next() (sql.Partition, sql.IndexKeyValueIter, error) {
14341443
if i.pos >= i.partitions {
14351444
return nil, nil, io.EOF
14361445
}
14371446

1447+
if i.pos == 0 {
1448+
i.init()
1449+
}
1450+
14381451
i.pos++
1439-
i.records = append(i.records, []testRecord{})
14401452
return testPartition(i.pos - 1), &testIndexKeyValueIter{
14411453
offset: i.offset,
14421454
total: i.total,

sql/index/pilosa/iterator.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/sirupsen/logrus"
77
bolt "go.etcd.io/bbolt"
8+
"gopkg.in/src-d/go-mysql-server.v0/sql"
89
)
910

1011
type locationValueIter struct {
@@ -31,6 +32,7 @@ type indexValueIter struct {
3132
total uint64
3233
bits []uint64
3334
mapping *mapping
35+
partition sql.Partition
3436
indexName string
3537

3638
// share transaction and bucket on all getLocation calls
@@ -45,7 +47,7 @@ func (it *indexValueIter) Next() ([]byte, error) {
4547
return nil, err
4648
}
4749

48-
bucket, err := it.mapping.getBucket(it.indexName, false)
50+
bucket, err := it.mapping.getBucket(it.indexName, it.partition, false)
4951
if err != nil {
5052
_ = it.Close()
5153
return nil, err

0 commit comments

Comments
 (0)