Skip to content

Commit 158b52a

Browse files
committed
Performance Fixes for Vitess 18 (#14383)
(backport) Signed-off-by: Vicent Marti <vmg@strn.cat>
1 parent 1cc5683 commit 158b52a

8 files changed

Lines changed: 283 additions & 38 deletions

File tree

go/vt/vtgate/endtoend/main_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,19 @@ var (
153153
Name: "hash",
154154
}},
155155
},
156+
"oltp_test": {
157+
ColumnVindexes: []*vschemapb.ColumnVindex{{
158+
Column: "id",
159+
Name: "hash",
160+
}},
161+
Columns: []*vschemapb.Column{{
162+
Name: "c",
163+
Type: sqltypes.Char,
164+
}, {
165+
Name: "pad",
166+
Type: sqltypes.Char,
167+
}},
168+
},
156169
},
157170
}
158171

go/vt/vtgate/endtoend/oltp_test.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package endtoend
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"math/rand"
8+
"sync"
9+
"testing"
10+
11+
"vitess.io/vitess/go/mysql"
12+
)
13+
14+
// 10 groups, 119 characters
15+
const cValueTemplate = "###########-###########-###########-" +
16+
"###########-###########-###########-" +
17+
"###########-###########-###########-" +
18+
"###########"
19+
20+
// 5 groups, 59 characters
21+
const padValueTemplate = "###########-###########-###########-" +
22+
"###########-###########"
23+
24+
func sysbenchRandom(rng *rand.Rand, template string) []byte {
25+
out := make([]byte, 0, len(template))
26+
for i := range template {
27+
switch template[i] {
28+
case '#':
29+
out = append(out, '0'+byte(rng.Intn(10)))
30+
default:
31+
out = append(out, template[i])
32+
}
33+
}
34+
return out
35+
}
36+
37+
var oltpInitOnce sync.Once
38+
39+
func BenchmarkOLTP(b *testing.B) {
40+
const MaxRows = 10000
41+
const RangeSize = 100
42+
43+
rng := rand.New(rand.NewSource(1234))
44+
45+
ctx := context.Background()
46+
conn, err := mysql.Connect(ctx, &vtParams)
47+
if err != nil {
48+
b.Fatal(err)
49+
}
50+
defer conn.Close()
51+
52+
var query bytes.Buffer
53+
54+
oltpInitOnce.Do(func() {
55+
b.Logf("seeding database for benchmark...")
56+
57+
var rows int = 1
58+
for i := 0; i < MaxRows/10; i++ {
59+
query.Reset()
60+
query.WriteString("insert into oltp_test(id, k, c, pad) values ")
61+
for j := 0; j < 10; j++ {
62+
if j > 0 {
63+
query.WriteString(", ")
64+
}
65+
_, _ = fmt.Fprintf(&query, "(%d, %d, '%s', '%s')", rows, rng.Int31n(0xFFFF), sysbenchRandom(rng, cValueTemplate), sysbenchRandom(rng, padValueTemplate))
66+
rows++
67+
}
68+
69+
_, err = conn.ExecuteFetch(query.String(), -1, false)
70+
if err != nil {
71+
b.Fatal(err)
72+
}
73+
}
74+
b.Logf("finshed (inserted %d rows)", rows)
75+
})
76+
77+
b.Run("SimpleRanges", func(b *testing.B) {
78+
b.ResetTimer()
79+
for i := 0; i < b.N; i++ {
80+
id := rng.Intn(MaxRows)
81+
82+
query.Reset()
83+
_, _ = fmt.Fprintf(&query, "SELECT c FROM oltp_test WHERE id BETWEEN %d AND %d", id, id+rng.Intn(RangeSize)-1)
84+
_, err := conn.ExecuteFetch(query.String(), 1000, false)
85+
if err != nil {
86+
b.Error(err)
87+
}
88+
}
89+
})
90+
91+
b.Run("SumRanges", func(b *testing.B) {
92+
b.ResetTimer()
93+
for i := 0; i < b.N; i++ {
94+
id := rng.Intn(MaxRows)
95+
96+
query.Reset()
97+
_, _ = fmt.Fprintf(&query, "SELECT SUM(k) FROM oltp_test WHERE id BETWEEN %d AND %d", id, id+rng.Intn(RangeSize)-1)
98+
_, err := conn.ExecuteFetch(query.String(), 1000, false)
99+
if err != nil {
100+
b.Error(err)
101+
}
102+
}
103+
})
104+
105+
b.Run("OrderRanges", func(b *testing.B) {
106+
b.ResetTimer()
107+
for i := 0; i < b.N; i++ {
108+
id := rng.Intn(MaxRows)
109+
110+
query.Reset()
111+
_, _ = fmt.Fprintf(&query, "SELECT c FROM oltp_test WHERE id BETWEEN %d AND %d ORDER BY c", id, id+rng.Intn(RangeSize)-1)
112+
_, err := conn.ExecuteFetch(query.String(), 1000, false)
113+
if err != nil {
114+
b.Error(err)
115+
}
116+
}
117+
})
118+
119+
b.Run("DistinctRanges", func(b *testing.B) {
120+
b.ResetTimer()
121+
for i := 0; i < b.N; i++ {
122+
id := rng.Intn(MaxRows)
123+
124+
query.Reset()
125+
_, _ = fmt.Fprintf(&query, "SELECT DISTINCT c FROM oltp_test WHERE id BETWEEN %d AND %d ORDER BY c", id, id+rng.Intn(RangeSize)-1)
126+
_, err := conn.ExecuteFetch(query.String(), 1000, false)
127+
if err != nil {
128+
b.Error(err)
129+
}
130+
}
131+
})
132+
}

go/vt/vtgate/endtoend/schema.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,11 @@ create table t1_sharded(
7272
id2 bigint,
7373
primary key(id1)
7474
) Engine=InnoDB;
75+
76+
create table oltp_test(
77+
id bigint not null auto_increment,
78+
k bigint default 0 not null,
79+
c char(120) default '' not null,
80+
pad char(60) default '' not null,
81+
primary key (id)
82+
) Engine=InnoDB;

go/vt/vtgate/engine/ordered_aggregate.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,35 @@ func (oa *OrderedAggregate) TryExecute(ctx context.Context, vcursor VCursor, bin
115115
return qr.Truncate(oa.TruncateColumnCount), nil
116116
}
117117

118+
func (oa *OrderedAggregate) executeGroupBy(result *sqltypes.Result) (*sqltypes.Result, error) {
119+
if len(result.Rows) < 1 {
120+
return result, nil
121+
}
122+
123+
out := &sqltypes.Result{
124+
Fields: result.Fields,
125+
Rows: result.Rows[:0],
126+
}
127+
128+
var currentKey []sqltypes.Value
129+
var lastRow sqltypes.Row
130+
var err error
131+
for _, row := range result.Rows {
132+
var nextGroup bool
133+
134+
currentKey, nextGroup, err = oa.nextGroupBy(currentKey, row)
135+
if err != nil {
136+
return nil, err
137+
}
138+
if nextGroup {
139+
out.Rows = append(out.Rows, lastRow)
140+
}
141+
lastRow = row
142+
}
143+
out.Rows = append(out.Rows, lastRow)
144+
return out, nil
145+
}
146+
118147
func (oa *OrderedAggregate) execute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
119148
result, err := vcursor.ExecutePrimitive(
120149
ctx,
@@ -125,6 +154,10 @@ func (oa *OrderedAggregate) execute(ctx context.Context, vcursor VCursor, bindVa
125154
if err != nil {
126155
return nil, err
127156
}
157+
if len(oa.Aggregates) == 0 {
158+
return oa.executeGroupBy(result)
159+
}
160+
128161
agg, fields, err := newAggregation(result.Fields, oa.Aggregates)
129162
if err != nil {
130163
return nil, err
@@ -161,8 +194,63 @@ func (oa *OrderedAggregate) execute(ctx context.Context, vcursor VCursor, bindVa
161194
return out, nil
162195
}
163196

197+
func (oa *OrderedAggregate) executeStreamGroupBy(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
198+
cb := func(qr *sqltypes.Result) error {
199+
return callback(qr.Truncate(oa.TruncateColumnCount))
200+
}
201+
202+
var fields []*querypb.Field
203+
var currentKey []sqltypes.Value
204+
var lastRow sqltypes.Row
205+
206+
visitor := func(qr *sqltypes.Result) error {
207+
var err error
208+
if fields == nil && len(qr.Fields) > 0 {
209+
fields = qr.Fields
210+
if err = cb(&sqltypes.Result{Fields: fields}); err != nil {
211+
return err
212+
}
213+
}
214+
for _, row := range qr.Rows {
215+
var nextGroup bool
216+
217+
currentKey, nextGroup, err = oa.nextGroupBy(currentKey, row)
218+
if err != nil {
219+
return err
220+
}
221+
222+
if nextGroup {
223+
// this is a new grouping. let's yield the old one, and start a new
224+
if err := cb(&sqltypes.Result{Rows: []sqltypes.Row{lastRow}}); err != nil {
225+
return err
226+
}
227+
}
228+
229+
lastRow = row
230+
}
231+
return nil
232+
}
233+
234+
/* we need the input fields types to correctly calculate the output types */
235+
err := vcursor.StreamExecutePrimitive(ctx, oa.Input, bindVars, true, visitor)
236+
if err != nil {
237+
return err
238+
}
239+
240+
if lastRow != nil {
241+
if err := cb(&sqltypes.Result{Rows: [][]sqltypes.Value{lastRow}}); err != nil {
242+
return err
243+
}
244+
}
245+
return nil
246+
}
247+
164248
// TryStreamExecute is a Primitive function.
165249
func (oa *OrderedAggregate) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool, callback func(*sqltypes.Result) error) error {
250+
if len(oa.Aggregates) == 0 {
251+
return oa.executeStreamGroupBy(ctx, vcursor, bindVars, callback)
252+
}
253+
166254
cb := func(qr *sqltypes.Result) error {
167255
return callback(qr.Truncate(oa.TruncateColumnCount))
168256
}

go/vt/vtgate/engine/route.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package engine
1919
import (
2020
"context"
2121
"fmt"
22+
"slices"
2223
"sort"
2324
"strconv"
2425
"strings"
@@ -418,27 +419,26 @@ func (route *Route) sort(in *sqltypes.Result) (*sqltypes.Result, error) {
418419

419420
comparers := extractSlices(route.OrderBy)
420421

421-
sort.Slice(out.Rows, func(i, j int) bool {
422+
slices.SortFunc(out.Rows, func(a, b sqltypes.Row) int {
422423
var cmp int
423424
if err != nil {
424-
return true
425+
return -1
425426
}
426427
// If there are any errors below, the function sets
427428
// the external err and returns true. Once err is set,
428429
// all subsequent calls return true. This will make
429430
// Slice think that all elements are in the correct
430431
// order and return more quickly.
431432
for _, c := range comparers {
432-
cmp, err = c.compare(out.Rows[i], out.Rows[j])
433+
cmp, err = c.compare(a, b)
433434
if err != nil {
434-
return true
435+
return -1
435436
}
436-
if cmp == 0 {
437-
continue
437+
if cmp != 0 {
438+
return cmp
438439
}
439-
return cmp < 0
440440
}
441-
return true
441+
return 0
442442
})
443443

444444
return out.Truncate(route.TruncateColumnCount), err

go/vt/vtgate/evalengine/api_compare.go

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,37 @@ func (err UnsupportedCollationError) Error() string {
5252
var UnsupportedCollationHashError = vterrors.Errorf(vtrpcpb.Code_INTERNAL, "text type with an unknown/unsupported collation cannot be hashed")
5353

5454
func compare(v1, v2 sqltypes.Value, collationID collations.ID) (int, error) {
55+
v1t := v1.Type()
56+
5557
// We have a fast path here for the case where both values are
5658
// the same type, and it's one of the basic types we can compare
5759
// directly. This is a common case for equality checks.
58-
if v1.Type() == v2.Type() {
60+
if v1t == v2.Type() {
5961
switch {
60-
case sqltypes.IsSigned(v1.Type()):
62+
case sqltypes.IsText(v1t):
63+
if collationID == collations.CollationBinaryID {
64+
return bytes.Compare(v1.Raw(), v2.Raw()), nil
65+
}
66+
coll := colldata.Lookup(collationID)
67+
if coll == nil {
68+
return 0, UnsupportedCollationError{ID: collationID}
69+
}
70+
result := coll.Collate(v1.Raw(), v2.Raw(), false)
71+
switch {
72+
case result < 0:
73+
return -1, nil
74+
case result > 0:
75+
return 1, nil
76+
default:
77+
return 0, nil
78+
}
79+
case sqltypes.IsBinary(v1t), v1t == sqltypes.Date, v1t == sqltypes.Datetime, v1t == sqltypes.Timestamp:
80+
// We can't optimize for Time here, since Time is not sortable
81+
// based on the raw bytes. This is because of cases like
82+
// '24:00:00' and '101:00:00' which are both valid times and
83+
// order wrong based on the raw bytes.
84+
return bytes.Compare(v1.Raw(), v2.Raw()), nil
85+
case sqltypes.IsSigned(v1t):
6186
i1, err := v1.ToInt64()
6287
if err != nil {
6388
return 0, err
@@ -74,7 +99,7 @@ func compare(v1, v2 sqltypes.Value, collationID collations.ID) (int, error) {
7499
default:
75100
return 0, nil
76101
}
77-
case sqltypes.IsUnsigned(v1.Type()):
102+
case sqltypes.IsUnsigned(v1t):
78103
u1, err := v1.ToUint64()
79104
if err != nil {
80105
return 0, err
@@ -91,30 +116,6 @@ func compare(v1, v2 sqltypes.Value, collationID collations.ID) (int, error) {
91116
default:
92117
return 0, nil
93118
}
94-
case sqltypes.IsBinary(v1.Type()), v1.Type() == sqltypes.Date,
95-
v1.Type() == sqltypes.Datetime, v1.Type() == sqltypes.Timestamp:
96-
// We can't optimize for Time here, since Time is not sortable
97-
// based on the raw bytes. This is because of cases like
98-
// '24:00:00' and '101:00:00' which are both valid times and
99-
// order wrong based on the raw bytes.
100-
return bytes.Compare(v1.Raw(), v2.Raw()), nil
101-
case sqltypes.IsText(v1.Type()):
102-
if collationID == collations.CollationBinaryID {
103-
return bytes.Compare(v1.Raw(), v2.Raw()), nil
104-
}
105-
coll := colldata.Lookup(collationID)
106-
if coll == nil {
107-
return 0, UnsupportedCollationError{ID: collationID}
108-
}
109-
result := coll.Collate(v1.Raw(), v2.Raw(), false)
110-
switch {
111-
case result < 0:
112-
return -1, nil
113-
case result > 0:
114-
return 1, nil
115-
default:
116-
return 0, nil
117-
}
118119
}
119120
}
120121

0 commit comments

Comments
 (0)