Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions sql/analyzer/parallelize.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,14 @@ func isParallelizable(node sql.Node) bool {
// IndexedTablesAccess already uses an index for lookups, so parallelizing it won't help in most cases (and can
// blow up the query execution graph)
case *plan.IndexedTableAccess:
parallelizable = false
return false
// If this IndexedTableAccess received a process node, it is parallelizable
if _, ok := node.ResolvedTable.Table.(*plan.ProcessTable); !ok {
parallelizable = false
return false
}
parallelizable = true
lastWasTable = true
tableSeen = true
// Foreign keys expect specific nodes as children and face issues when they're swapped with Exchange nodes
case *plan.ForeignKeyHandler:
parallelizable = false
Expand Down
40 changes: 40 additions & 0 deletions sql/analyzer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,46 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node, scope *Scope, sel R
var seen = make(map[string]struct{})
n, _, err := transform.Node(n, func(n sql.Node) (sql.Node, transform.TreeIdentity, error) {
switch n := n.(type) {
case *plan.IndexedTableAccess:
// Only add a process table if ResolvedTable implements ParallelizedIndexAddressableTable
parallelizedTable, ok := n.ResolvedTable.Table.(sql.ParallelizedIndexAddressableTable)
if !ok {
return n, transform.SameTree, nil
}

name := parallelizedTable.Name()
if _, ok := seen[name]; ok {
return n, transform.SameTree, nil
}

// TODO: what should total be?
processList.AddTableProgress(ctx.Pid(), name, -1)

seen[name] = struct{}{}

onPartitionDone := func(partitionName string) {
processList.UpdateTableProgress(ctx.Pid(), name, 1)
processList.RemovePartitionProgress(ctx.Pid(), name, partitionName)
}

onPartitionStart := func(partitionName string) {
processList.AddPartitionProgress(ctx.Pid(), name, partitionName, -1)
}

var onRowNext plan.NamedNotifyFunc
// TODO: coarser default for row updates (like updating every 100 rows) that doesn't kill performance
if updateQueryProgressEachRow {
onRowNext = func(partitionName string) {
processList.UpdatePartitionProgress(ctx.Pid(), name, partitionName, 1)
}
}

// Wrap with ProcessTable
t := plan.NewProcessTable(parallelizedTable, onPartitionDone, onPartitionStart, onRowNext)

// Replace child
n, err := n.WithTable(t)
return n, transform.NewTree, err
case *plan.ResolvedTable:
switch n.Table.(type) {
case *plan.ProcessTable, *plan.ProcessIndexableTable:
Expand Down
13 changes: 12 additions & 1 deletion sql/analyzer/pushdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,10 +680,21 @@ func pushdownIndexesToTable(a *Analyzer, tableNode NameableNode, indexes map[str
if table == nil {
return n, transform.SameTree, nil
}
if _, ok := table.(sql.IndexAddressableTable); ok {
if indexAddressableTable, ok := table.(sql.IndexAddressableTable); ok {
indexLookup, ok := indexes[tableNode.Name()]
if ok {
a.Log("table %q transformed with pushdown of index", tableNode.Name())

// Only pass lookup into resolved table if it's Parallelizable
if _, ok := indexAddressableTable.(sql.ParallelizedIndexAddressableTable); ok {
indexedTable := indexAddressableTable.WithIndexLookup(indexLookup.lookup)
newResolvedTable, err := n.WithTable(indexedTable)
if err != nil {
return nil, transform.SameTree, err
}
return plan.NewStaticIndexedTableAccess(newResolvedTable, indexLookup.lookup, indexLookup.indexes[0], indexLookup.fields), transform.NewTree, nil

}
return plan.NewStaticIndexedTableAccess(n, indexLookup.lookup, indexLookup.indexes[0], indexLookup.fields), transform.NewTree, nil
}
}
Expand Down
5 changes: 5 additions & 0 deletions sql/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ type IndexAddressableTable interface {
IndexAddressable
}

type ParallelizedIndexAddressableTable interface {
IndexAddressableTable
ShouldParallelizeAccess() bool
}

// IndexAlterableTable represents a table that supports index modification operations.
type IndexAlterableTable interface {
Table
Expand Down
28 changes: 26 additions & 2 deletions sql/plan/indexed_table_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,13 @@ func (i *IndexedTableAccess) Index() sql.Index {
func (i *IndexedTableAccess) RowIter(ctx *sql.Context, row sql.Row) (sql.RowIter, error) {
span, ctx := ctx.Span("plan.IndexedTableAccess")

resolvedTable, ok := i.ResolvedTable.Table.(sql.IndexAddressableTable)
// child is ProcessTable, so get underlying
t := i.ResolvedTable.Table
if wrapperTable, ok := i.ResolvedTable.Table.(sql.TableWrapper); ok {
t = wrapperTable.Underlying()
}

indexAddressableTable, ok := t.(sql.IndexAddressableTable)
if !ok {
return nil, ErrNoIndexableTable.New(i.ResolvedTable)
}
Expand All @@ -112,7 +118,7 @@ func (i *IndexedTableAccess) RowIter(ctx *sql.Context, row sql.Row) (sql.RowIter
return nil, err
}

indexedTable := resolvedTable.WithIndexLookup(lookup)
indexedTable := indexAddressableTable.WithIndexLookup(lookup)
partIter, err := indexedTable.Partitions(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -293,6 +299,24 @@ func (i IndexedTableAccess) WithTable(table sql.Table) (*IndexedTableAccess, err
return &i, nil
}

// Partitions implements sql.Table
func (i *IndexedTableAccess) Partitions(ctx *sql.Context) (sql.PartitionIter, error) {
if i.lookup == nil {
return i.ResolvedTable.Partitions(ctx)
}

table := i.ResolvedTable.Table
if indexAddressableTable, ok := i.ResolvedTable.Table.(sql.IndexAddressable); ok {
table = indexAddressableTable.WithIndexLookup(i.lookup)
}
return table.Partitions(ctx)
}

// PartitionRows implements sql.Table
func (i *IndexedTableAccess) PartitionRows(ctx *sql.Context, partition sql.Partition) (sql.RowIter, error) {
return i.ResolvedTable.PartitionRows(ctx, partition)
}

// GetIndexLookup returns the sql.IndexLookup from an IndexedTableAccess.
// This method is exported for use in integration tests.
func GetIndexLookup(ita *IndexedTableAccess) sql.IndexLookup {
Expand Down