Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions go/vt/mysqlctl/tmutils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ const (
TableView = "VIEW"
)

// TableDefinitionGetColumn returns the index of a column inside a
// TableDefinition.
func TableDefinitionGetColumn(td *tabletmanagerdatapb.TableDefinition, name string) (index int, ok bool) {
for i, n := range td.Columns {
if name == n {
return i, true
}
}
return -1, false
}

// TableDefinitions is a list of TableDefinition, for sorting
type TableDefinitions []*tabletmanagerdatapb.TableDefinition

Expand Down
28 changes: 19 additions & 9 deletions go/vt/vtgate/vindexes/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type VSchema struct {
Keyspaces map[string]*KeyspaceSchema
}

// Table represnts a table in VSchema.
// Table represents a table in VSchema.
type Table struct {
IsSequence bool
Name string
Expand Down Expand Up @@ -78,21 +78,31 @@ func BuildVSchema(source *VSchemaFormal) (vschema *VSchema, err error) {
return vschema, nil
}

// VSchemaFormalForKeyspace returns a VSchemaFormal for the single keyspace
// based on the JSON input.
func VSchemaFormalForKeyspace(input []byte, name string) (*VSchemaFormal, error) {
var ks KeyspaceFormal
if err := json.Unmarshal(input, &ks); err != nil {
return nil, fmt.Errorf("Unmarshal failed: %v %s %v", ks, input, err)
}

return &VSchemaFormal{
Keyspaces: map[string]KeyspaceFormal{
name: ks,
},
}, nil
}

// ValidateVSchema ensures that the the JSON representation
// of the keyspace vschema are valid.
// External references (like sequence) are not validated.
func ValidateVSchema(input []byte) error {
var ks KeyspaceFormal
if err := json.Unmarshal(input, &ks); err != nil {
return fmt.Errorf("Unmarshal failed: %v %s %v", ks, input, err)
formal, err := VSchemaFormalForKeyspace(input, "ks")
if err != nil {
return err
}
// We go through the motion of building the vschema,
// but just for this keyspace
formal := &VSchemaFormal{
Keyspaces: map[string]KeyspaceFormal{
"ks": ks,
},
}
vschema := &VSchema{
tables: make(map[string]*Table),
Keyspaces: make(map[string]*KeyspaceSchema),
Expand Down
123 changes: 88 additions & 35 deletions go/vt/worker/diff_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"golang.org/x/net/context"

"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/vtgate/vindexes"

querypb "github.com/youtube/vitess/go/vt/proto/query"
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
Expand Down Expand Up @@ -71,6 +73,36 @@ func NewQueryResultReaderForTablet(ctx context.Context, ts topo.Server, tabletAl
}, nil
}

// v3KeyRangeFilter is a sqltypes.ResultStream implementation that filters
// the underlying results to match the keyrange using a v3 resolver.
type v3KeyRangeFilter struct {
input sqltypes.ResultStream
resolver *v3Resolver
keyRange *topodatapb.KeyRange
}

// Recv is part of sqltypes.ResultStream interface
func (f *v3KeyRangeFilter) Recv() (*sqltypes.Result, error) {
r, err := f.input.Recv()
if err != nil {
return nil, err
}

rows := make([][]sqltypes.Value, 0, len(r.Rows))
for _, row := range r.Rows {
ksid, err := f.resolver.keyspaceID(row)
if err != nil {
return nil, err
}

if key.KeyRangeContains(f.keyRange, ksid) {
rows = append(rows, row)
}
}
r.Rows = rows
return r, nil
}

// orderedColumns returns the list of columns:
// - first the primary key columns in the right order
// - then the rest of the columns
Expand Down Expand Up @@ -112,46 +144,67 @@ func TableScan(ctx context.Context, log logutil.Logger, ts topo.Server, tabletAl
// rows from a table that match the supplied KeyRange, ordered by
// Primary Key. The returned columns are ordered with the Primary Key
// columns in front.
func TableScanByKeyRange(ctx context.Context, log logutil.Logger, ts topo.Server, tabletAlias *topodatapb.TabletAlias, tableDefinition *tabletmanagerdatapb.TableDefinition, keyRange *topodatapb.KeyRange, shardingColumnName string, shardingColumnType topodatapb.KeyspaceIdType) (*QueryResultReader, error) {
// If keyspaceSchema is passed in, we go into v3 mode, and we ask for all
// source data, and filter here. Otherwise we stick with v2 mode, where we can
// ask the source tablet to do the filtering.
func TableScanByKeyRange(ctx context.Context, log logutil.Logger, ts topo.Server, tabletAlias *topodatapb.TabletAlias, tableDefinition *tabletmanagerdatapb.TableDefinition, keyRange *topodatapb.KeyRange, keyspaceSchema *vindexes.KeyspaceSchema, shardingColumnName string, shardingColumnType topodatapb.KeyspaceIdType) (*QueryResultReader, error) {
if keyspaceSchema != nil {
// switch to v3 mode.
keyResolver, err := newV3ResolverFromColumnList(keyspaceSchema, tableDefinition.Name, orderedColumns(tableDefinition))
if err != nil {
return nil, fmt.Errorf("cannot resolve v3 sharding keys for table %v: %v", tableDefinition.Name, err)
}

// full table scan
scan, err := TableScan(ctx, log, ts, tabletAlias, tableDefinition)
if err != nil {
return nil, err
}

// with extra filter
scan.Output = &v3KeyRangeFilter{
input: scan.Output,
resolver: keyResolver.(*v3Resolver),
keyRange: keyRange,
}
return scan, nil
}

// in v2 mode, we can do the filtering at the source
where := ""
// TODO(aaijazi): this currently only works with V2 style sharding keys.
// We should add FilteredQueryResultReader that will do a full table scan, with client-side
// filtering to remove rows that don't map to the keyrange that we want.
if keyRange != nil {
switch shardingColumnType {
case topodatapb.KeyspaceIdType_UINT64:
if len(keyRange.Start) > 0 {
if len(keyRange.End) > 0 {
// have start & end
where = fmt.Sprintf("WHERE %v >= %v AND %v < %v ", shardingColumnName, uint64FromKeyspaceID(keyRange.Start), shardingColumnName, uint64FromKeyspaceID(keyRange.End))
} else {
// have start only
where = fmt.Sprintf("WHERE %v >= %v ", shardingColumnName, uint64FromKeyspaceID(keyRange.Start))
}
switch shardingColumnType {
case topodatapb.KeyspaceIdType_UINT64:
if len(keyRange.Start) > 0 {
if len(keyRange.End) > 0 {
// have start & end
where = fmt.Sprintf("WHERE %v >= %v AND %v < %v ", shardingColumnName, uint64FromKeyspaceID(keyRange.Start), shardingColumnName, uint64FromKeyspaceID(keyRange.End))
} else {
if len(keyRange.End) > 0 {
// have end only
where = fmt.Sprintf("WHERE %v < %v ", shardingColumnName, uint64FromKeyspaceID(keyRange.End))
}
// have start only
where = fmt.Sprintf("WHERE %v >= %v ", shardingColumnName, uint64FromKeyspaceID(keyRange.Start))
}
} else {
if len(keyRange.End) > 0 {
// have end only
where = fmt.Sprintf("WHERE %v < %v ", shardingColumnName, uint64FromKeyspaceID(keyRange.End))
}
case topodatapb.KeyspaceIdType_BYTES:
if len(keyRange.Start) > 0 {
if len(keyRange.End) > 0 {
// have start & end
where = fmt.Sprintf("WHERE HEX(%v) >= '%v' AND HEX(%v) < '%v' ", shardingColumnName, hex.EncodeToString(keyRange.Start), shardingColumnName, hex.EncodeToString(keyRange.End))
} else {
// have start only
where = fmt.Sprintf("WHERE HEX(%v) >= '%v' ", shardingColumnName, hex.EncodeToString(keyRange.Start))
}
}
case topodatapb.KeyspaceIdType_BYTES:
if len(keyRange.Start) > 0 {
if len(keyRange.End) > 0 {
// have start & end
where = fmt.Sprintf("WHERE HEX(%v) >= '%v' AND HEX(%v) < '%v' ", shardingColumnName, hex.EncodeToString(keyRange.Start), shardingColumnName, hex.EncodeToString(keyRange.End))
} else {
if len(keyRange.End) > 0 {
// have end only
where = fmt.Sprintf("WHERE HEX(%v) < '%v' ", shardingColumnName, hex.EncodeToString(keyRange.End))
}
// have start only
where = fmt.Sprintf("WHERE HEX(%v) >= '%v' ", shardingColumnName, hex.EncodeToString(keyRange.Start))
}
} else {
if len(keyRange.End) > 0 {
// have end only
where = fmt.Sprintf("WHERE HEX(%v) < '%v' ", shardingColumnName, hex.EncodeToString(keyRange.End))
}
default:
return nil, fmt.Errorf("Unsupported ShardingColumnType: %v", shardingColumnType)
}
default:
return nil, fmt.Errorf("Unsupported ShardingColumnType: %v", shardingColumnType)
}

sql := fmt.Sprintf("SELECT %v FROM %v %vORDER BY %v", strings.Join(orderedColumns(tableDefinition), ", "), tableDefinition.Name, where, strings.Join(tableDefinition.PrimaryKeyColumns, ", "))
Expand Down Expand Up @@ -183,7 +236,7 @@ func NewRowReader(queryResultReader *QueryResultReader) *RowReader {
// (nil, nil) for EOF
// (nil, error) if an error occurred
func (rr *RowReader) Next() ([]sqltypes.Value, error) {
if rr.currentResult == nil || rr.currentIndex == len(rr.currentResult.Rows) {
for rr.currentResult == nil || rr.currentIndex == len(rr.currentResult.Rows) {
var err error
rr.currentResult, err = rr.queryResultReader.Output.Recv()
if err != nil {
Expand Down
107 changes: 98 additions & 9 deletions go/vt/worker/key_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vtgate/vindexes"
)

// This file defines the interface and implementations of sharding key resolvers.
Expand Down Expand Up @@ -47,15 +48,10 @@ func newV2Resolver(keyspaceInfo *topo.KeyspaceInfo, td *tabletmanagerdatapb.Tabl
if td.Type != tmutils.TableBaseTable {
return nil, fmt.Errorf("a keyspaceID resolver can only be created for a base table, got %v", td.Type)
}

// Find the sharding key column index.
columnIndex := -1
for i, name := range td.Columns {
if name == keyspaceInfo.ShardingColumnName {
columnIndex = i
break
}
}
if columnIndex == -1 {
columnIndex, ok := tmutils.TableDefinitionGetColumn(td, keyspaceInfo.ShardingColumnName)
if !ok {
return nil, fmt.Errorf("table %v doesn't have a column named '%v'", td.Name, keyspaceInfo.ShardingColumnName)
}

Expand All @@ -79,4 +75,97 @@ func (r *v2Resolver) keyspaceID(row []sqltypes.Value) ([]byte, error) {
}
}

// TODO(sougou): implement a V3 keyspaceIDResolver, which takes advantage of a table's primary ColVindex.
// v3Resolver is the keyspace id resolver that is used by VTGate V3 deployments.
// In V3, we use the VSchema to find a Unique VIndex of cost 0 or 1 for each
// table.
type v3Resolver struct {
shardingColumnIndex int
vindex vindexes.Unique
}

// newV3ResolverFromTableDefinition returns a keyspaceIDResolver for a v3 table.
func newV3ResolverFromTableDefinition(keyspaceSchema *vindexes.KeyspaceSchema, td *tabletmanagerdatapb.TableDefinition) (keyspaceIDResolver, error) {
if td.Type != tmutils.TableBaseTable {
return nil, fmt.Errorf("a keyspaceID resolver can only be created for a base table, got %v", td.Type)
}
tableSchema, ok := keyspaceSchema.Tables[td.Name]
if !ok {
return nil, fmt.Errorf("no vschema definition for table %v", td.Name)
}
// the primary vindex is most likely the sharding key, and has to
// be unique.
if len(tableSchema.ColVindexes) == 0 {
return nil, fmt.Errorf("no vindex definition for table %v", td.Name)
}
colVindex := tableSchema.ColVindexes[0]
if colVindex.Vindex.Cost() > 1 {
return nil, fmt.Errorf("primary vindex cost is too high for table %v", td.Name)
}
unique, ok := colVindex.Vindex.(vindexes.Unique)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use vindexes.IsUnique instead.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh. Nvm. Looks like you need the Unique interface var also.

if !ok {
return nil, fmt.Errorf("primary vindex is not unique for table %v", td.Name)
}

// Find the sharding key column index.
columnIndex, ok := tmutils.TableDefinitionGetColumn(td, colVindex.Col)
if !ok {
return nil, fmt.Errorf("table %v has a Vindex on unknown column %v", td.Name, colVindex.Col)
}

return &v3Resolver{
shardingColumnIndex: columnIndex,
vindex: unique,
}, nil
}

// newV3ResolverFromColumnList returns a keyspaceIDResolver for a v3 table.
func newV3ResolverFromColumnList(keyspaceSchema *vindexes.KeyspaceSchema, name string, columns []string) (keyspaceIDResolver, error) {
tableSchema, ok := keyspaceSchema.Tables[name]
if !ok {
return nil, fmt.Errorf("no vschema definition for table %v", name)
}
// the primary vindex is most likely the sharding key, and has to
// be unique.
if len(tableSchema.ColVindexes) == 0 {
return nil, fmt.Errorf("no vindex definition for table %v", name)
}
colVindex := tableSchema.ColVindexes[0]
if colVindex.Vindex.Cost() > 1 {
return nil, fmt.Errorf("primary vindex cost is too high for table %v", name)
}
unique, ok := colVindex.Vindex.(vindexes.Unique)
if !ok {
return nil, fmt.Errorf("primary vindex is not unique for table %v", name)
}

// Find the sharding key column index.
columnIndex := -1
for i, n := range columns {
if n == colVindex.Col {
columnIndex = i
break
}
}
if columnIndex == -1 {
return nil, fmt.Errorf("table %v has a Vindex on unknown column %v", name, colVindex.Col)
}

return &v3Resolver{
shardingColumnIndex: columnIndex,
vindex: unique,
}, nil
}

// keyspaceID implements the keyspaceIDResolver interface.
func (r *v3Resolver) keyspaceID(row []sqltypes.Value) ([]byte, error) {
v := row[r.shardingColumnIndex]
ids := []interface{}{v}
ksids, err := r.vindex.Map(nil, ids)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be some future use cases where someone may use a lookup based sharding key. If so, we may have to supply a VCursor that's capable of fetching data from db. Not an immediate requirement.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then performance would be really bad, having to do a lookup for each row. We would probably not implement it this way...

if err != nil {
return nil, err
}
if len(ksids) != 1 {
return nil, fmt.Errorf("maping row to keyspace id returned an invalid array of keyspace ids: %v", ksids)
}
return ksids[0], nil
}
Loading