From e3d5e514be29089ee95a98e10a9bcaa862591b20 Mon Sep 17 00:00:00 2001 From: Miguel Molina Date: Thu, 13 Jun 2019 10:50:25 +0200 Subject: [PATCH] sql/analyzer: refactor pushdown rule Signed-off-by: Miguel Molina --- sql/analyzer/pushdown.go | 394 +++++++++++++++++++++------------------ 1 file changed, 215 insertions(+), 179 deletions(-) diff --git a/sql/analyzer/pushdown.go b/sql/analyzer/pushdown.go index 56bae25fc..55321f54d 100644 --- a/sql/analyzer/pushdown.go +++ b/sql/analyzer/pushdown.go @@ -24,64 +24,110 @@ func pushdown(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) { return n, nil } - var fieldsByTable = make(map[string][]string) - var exprsByTable = make(map[string][]sql.Expression) - type tableField struct { - table string - field string - } - var tableFields = make(map[tableField]struct{}) - a.Log("finding used columns in node") colSpan, _ := ctx.Span("find_pushdown_columns") // First step is to find all col exprs and group them by the table they mention. // Even if they appear multiple times, only the first one will be used. + fieldsByTable := findFieldsByTable(n) + + colSpan.Finish() + + a.Log("finding filters in node") + filters := findFilters(ctx, n) + + indexSpan, _ := ctx.Span("assign_indexes") + indexes, err := assignIndexes(a, n) + if err != nil { + return nil, err + } + indexSpan.Finish() + + a.Log("transforming nodes with pushdown of filters, projections and indexes") + + return transformPushdown(a, n, filters, indexes, fieldsByTable) +} + +// fixFieldIndexesOnExpressions executes fixFieldIndexes on a list of exprs. +func fixFieldIndexesOnExpressions(schema sql.Schema, expressions ...sql.Expression) ([]sql.Expression, error) { + var result = make([]sql.Expression, len(expressions)) + for i, e := range expressions { + var err error + result[i], err = fixFieldIndexes(schema, e) + if err != nil { + return nil, err + } + } + return result, nil +} + +// fixFieldIndexes transforms the given expression setting correct indexes +// for GetField expressions according to the schema of the row in the table +// and not the one where the filter came from. +func fixFieldIndexes(schema sql.Schema, exp sql.Expression) (sql.Expression, error) { + return exp.TransformUp(func(e sql.Expression) (sql.Expression, error) { + switch e := e.(type) { + case *expression.GetField: + // we need to rewrite the indexes for the table row + for i, col := range schema { + if e.Name() == col.Name && e.Table() == col.Source { + return expression.NewGetFieldWithTable( + i, + e.Type(), + e.Table(), + e.Name(), + e.IsNullable(), + ), nil + } + } + + return nil, ErrFieldMissing.New(e.Name()) + } + + return e, nil + }) +} + +func findFieldsByTable(n sql.Node) map[string][]string { + var fieldsByTable = make(map[string][]string) plan.InspectExpressions(n, func(e sql.Expression) bool { if gf, ok := e.(*expression.GetField); ok { - tf := tableField{gf.Table(), gf.Name()} - if _, ok := tableFields[tf]; !ok { - a.Log("found used column %s.%s", gf.Table(), gf.Name()) - tableFields[tf] = struct{}{} + if !stringContains(fieldsByTable[gf.Table()], gf.Name()) { fieldsByTable[gf.Table()] = append(fieldsByTable[gf.Table()], gf.Name()) - exprsByTable[gf.Table()] = append(exprsByTable[gf.Table()], gf) } } return true }) + return fieldsByTable +} - colSpan.Finish() - - a.Log("finding filters in node") - - filterSpan, _ := ctx.Span("find_pushdown_filters") +func findFilters(ctx *sql.Context, n sql.Node) filters { + span, _ := ctx.Span("find_pushdown_filters") + defer span.Finish() - // then find all filters, also by table. Note that filters that mention + // Find all filters, also by table. Note that filters that mention // more than one table will not be passed to neither. - filt := make(filters) + filters := make(filters) plan.Inspect(n, func(node sql.Node) bool { - a.Log("inspecting node of type: %T", node) switch node := node.(type) { case *plan.Filter: fs := exprToTableFilters(node.Expression) - a.Log("found filters for %d tables %s", len(fs), node.Expression) - filt.merge(fs) + filters.merge(fs) } return true }) - filterSpan.Finish() - - indexSpan, _ := ctx.Span("assign_indexes") - indexes, err := assignIndexes(a, n) - if err != nil { - return nil, err - } - indexSpan.Finish() - - a.Log("transforming nodes with pushdown of filters, projections and indexes") + return filters +} +func transformPushdown( + a *Analyzer, + n sql.Node, + filters filters, + indexes map[string]*indexLookup, + fieldsByTable map[string][]string, +) (sql.Node, error) { // Now all nodes can be transformed. Since traversal of the tree is done // from inner to outer the filters have to be processed first so they get // to the tables. @@ -92,185 +138,177 @@ func pushdown(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) { a.Log("transforming node of type: %T", node) switch node := node.(type) { case *plan.Filter: - if len(handledFilters) == 0 { - a.Log("no handled filters, leaving filter untouched") - return node, nil - } - - unhandled := getUnhandledFilters( - splitExpression(node.Expression), - handledFilters, + return pushdownFilter(a, node, handledFilters) + case *plan.ResolvedTable: + return pushdownTable( + a, + node, + filters, + &handledFilters, + &queryIndexes, + fieldsByTable, + indexes, ) + default: + return transformExpressioners(node) + } + }) - if len(unhandled) == 0 { - a.Log("filter node has no unhandled filters, so it will be removed") - return node.Child, nil - } + release := func() { + for _, idx := range queryIndexes { + a.Catalog.ReleaseIndex(idx) + } + } - a.Log( - "%d handled filters removed from filter node, filter has now %d filters", - len(handledFilters), - len(unhandled), - ) + if err != nil { + release() + return nil, err + } - return plan.NewFilter(expression.JoinAnd(unhandled...), node.Child), nil - case *plan.ResolvedTable: - var table = node.Table - - if ft, ok := table.(sql.FilteredTable); ok { - tableFilters := filt[node.Name()] - handled := ft.HandledFilters(tableFilters) - handledFilters = append(handledFilters, handled...) - schema := node.Schema() - handled, err = fixFieldIndexesOnExpressions(schema, handled...) - if err != nil { - return nil, err - } + if len(queryIndexes) > 0 { + return &releaser{node, release}, nil + } - table = ft.WithFilters(handled) - a.Log( - "table %q transformed with pushdown of filters, %d filters handled of %d", - node.Name(), - len(handled), - len(tableFilters), - ) - } + return node, nil +} - if pt, ok := table.(sql.ProjectedTable); ok { - table = pt.WithProjection(fieldsByTable[node.Name()]) - a.Log("table %q transformed with pushdown of projection", node.Name()) - } +func transformExpressioners(node sql.Node) (sql.Node, error) { + expressioner, ok := node.(sql.Expressioner) + if !ok { + return node, nil + } - if it, ok := table.(sql.IndexableTable); ok { - indexLookup, ok := indexes[node.Name()] - if ok { - queryIndexes = append(queryIndexes, indexLookup.indexes...) - table = it.WithIndexLookup(indexLookup.lookup) - a.Log("table %q transformed with pushdown of index", node.Name()) - } - } + var schemas []sql.Schema + for _, child := range node.Children() { + schemas = append(schemas, child.Schema()) + } - return plan.NewResolvedTable(table), nil - default: - expressioner, ok := node.(sql.Expressioner) - if !ok { - return node, nil - } + if len(schemas) < 1 { + return node, nil + } - var schemas []sql.Schema - for _, child := range node.Children() { - schemas = append(schemas, child.Schema()) + n, err := expressioner.TransformExpressions(func(e sql.Expression) (sql.Expression, error) { + for _, schema := range schemas { + fixed, err := fixFieldIndexes(schema, e) + if err == nil { + return fixed, nil } - if len(schemas) < 1 { - return node, nil + if ErrFieldMissing.Is(err) { + continue } - n, err := expressioner.TransformExpressions(func(e sql.Expression) (sql.Expression, error) { - for _, schema := range schemas { - fixed, err := fixFieldIndexes(schema, e) - if err == nil { - return fixed, nil - } - - if ErrFieldMissing.Is(err) { - continue - } + return nil, err + } - return nil, err - } + return e, nil + }) - return e, nil - }) + if err != nil { + return nil, err + } - if err != nil { - return nil, err - } + switch j := n.(type) { + case *plan.InnerJoin: + cond, err := fixFieldIndexes(j.Schema(), j.Cond) + if err != nil { + return nil, err + } - switch j := n.(type) { - case *plan.InnerJoin: - cond, err := fixFieldIndexes(j.Schema(), j.Cond) - if err != nil { - return nil, err - } + n = plan.NewInnerJoin(j.Left, j.Right, cond) + case *plan.RightJoin: + cond, err := fixFieldIndexes(j.Schema(), j.Cond) + if err != nil { + return nil, err + } - n = plan.NewInnerJoin(j.Left, j.Right, cond) - case *plan.RightJoin: - cond, err := fixFieldIndexes(j.Schema(), j.Cond) - if err != nil { - return nil, err - } + n = plan.NewRightJoin(j.Left, j.Right, cond) + case *plan.LeftJoin: + cond, err := fixFieldIndexes(j.Schema(), j.Cond) + if err != nil { + return nil, err + } - n = plan.NewRightJoin(j.Left, j.Right, cond) - case *plan.LeftJoin: - cond, err := fixFieldIndexes(j.Schema(), j.Cond) - if err != nil { - return nil, err - } + n = plan.NewLeftJoin(j.Left, j.Right, cond) + } - n = plan.NewLeftJoin(j.Left, j.Right, cond) - } + return n, nil +} - return n, nil +func pushdownTable( + a *Analyzer, + node *plan.ResolvedTable, + filters filters, + handledFilters *[]sql.Expression, + queryIndexes *[]sql.Index, + fieldsByTable map[string][]string, + indexes map[string]*indexLookup, +) (sql.Node, error) { + var table = node.Table + + if ft, ok := table.(sql.FilteredTable); ok { + tableFilters := filters[node.Name()] + handled := ft.HandledFilters(tableFilters) + *handledFilters = append(*handledFilters, handled...) + schema := node.Schema() + handled, err := fixFieldIndexesOnExpressions(schema, handled...) + if err != nil { + return nil, err } - }) - release := func() { - for _, idx := range queryIndexes { - a.Catalog.ReleaseIndex(idx) - } + table = ft.WithFilters(handled) + a.Log( + "table %q transformed with pushdown of filters, %d filters handled of %d", + node.Name(), + len(handled), + len(tableFilters), + ) } - if err != nil { - release() - return nil, err + if pt, ok := table.(sql.ProjectedTable); ok { + table = pt.WithProjection(fieldsByTable[node.Name()]) + a.Log("table %q transformed with pushdown of projection", node.Name()) } - if len(queryIndexes) > 0 { - return &releaser{node, release}, nil + if it, ok := table.(sql.IndexableTable); ok { + indexLookup, ok := indexes[node.Name()] + if ok { + *queryIndexes = append(*queryIndexes, indexLookup.indexes...) + table = it.WithIndexLookup(indexLookup.lookup) + a.Log("table %q transformed with pushdown of index", node.Name()) + } } - return node, nil + return plan.NewResolvedTable(table), nil } -// fixFieldIndexesOnExpressions executes fixFieldIndexes on a list of exprs. -func fixFieldIndexesOnExpressions(schema sql.Schema, expressions ...sql.Expression) ([]sql.Expression, error) { - var result = make([]sql.Expression, len(expressions)) - for i, e := range expressions { - var err error - result[i], err = fixFieldIndexes(schema, e) - if err != nil { - return nil, err - } +func pushdownFilter( + a *Analyzer, + node *plan.Filter, + handledFilters []sql.Expression, +) (sql.Node, error) { + if len(handledFilters) == 0 { + a.Log("no handled filters, leaving filter untouched") + return node, nil } - return result, nil -} -// fixFieldIndexes transforms the given expression setting correct indexes -// for GetField expressions according to the schema of the row in the table -// and not the one where the filter came from. -func fixFieldIndexes(schema sql.Schema, exp sql.Expression) (sql.Expression, error) { - return exp.TransformUp(func(e sql.Expression) (sql.Expression, error) { - switch e := e.(type) { - case *expression.GetField: - // we need to rewrite the indexes for the table row - for i, col := range schema { - if e.Name() == col.Name && e.Table() == col.Source { - return expression.NewGetFieldWithTable( - i, - e.Type(), - e.Table(), - e.Name(), - e.IsNullable(), - ), nil - } - } + unhandled := getUnhandledFilters( + splitExpression(node.Expression), + handledFilters, + ) - return nil, ErrFieldMissing.New(e.Name()) - } + if len(unhandled) == 0 { + a.Log("filter node has no unhandled filters, so it will be removed") + return node.Child, nil + } - return e, nil - }) + a.Log( + "%d handled filters removed from filter node, filter has now %d filters", + len(handledFilters), + len(unhandled), + ) + + return plan.NewFilter(expression.JoinAnd(unhandled...), node.Child), nil } type releaser struct { @@ -278,8 +316,6 @@ type releaser struct { Release func() } -var _ sql.Node = (*releaser)(nil) - func (r *releaser) Resolved() bool { return r.Child.Resolved() }