Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Commit da68eee

Browse files
committed
plan: compute all inner joins in memory if they fit
Fixes #577 Because we do not have a way to estimate the cost of each side of a join, it is really difficult to know when we can compute one in memory. But not doing so, causes inner joins to be painfully slow, as one of the branches is iterated multiple times. This PR addresses this by ensuring that if the right branch of the inner join fits in memory, it will be computed in memory even if the in-memory mode has not been activated by the user. An user can set the maximum threshold of memory the gitbase server can use before considering the joins should not be performed in memory using the `MAX_MEMORY_INNER_JOIN` environment variable or the `max_memory_joins` session variable specifying the number of bytes. The default value for this is the half of the available physical memory on the operating system. Because previously we had two iterators: `innerJoinIter` and `innerJoinMemoryIter`, and now `innerJoinIter` must be able to do the join in memory, `innerJoinMemoryIter` has been removed and `innerJoinIter` replaced with a version that can work with three modes: - `unknownMode` we don't know yet how to perform the join, so keep iterating until we can find out. By the end of the first full pass over the right branch `unknownMode` will either switch to `multipassMode` or `memoryMode`. - `memoryMode` which computes the rest of the join in memory. The iterator can have this mode before starting iterating if the user activated the in memory join via session or environment vars, in which case it will load all the right side on memory before doing any further iteration. Instead, if the iterator started in `unknownMode` and switched to this mode, it's guaranteed to already have loaded all the right side. From that point on, they work in exactly the same way. - `multipassMode`, which was the previous default mode. Iterate the right side of the join for each row in the left side. More expensive, but less memory consuming. The iterator can not start in this mode, and can only be switched to it from `unknownMode` in case the memory used by the gitbase server exceeds the maximum amount of memory either set by the user or by default. Signed-off-by: Miguel Molina <[email protected]>
1 parent b32d2fd commit da68eee

File tree

2 files changed

+197
-133
lines changed

2 files changed

+197
-133
lines changed

sql/plan/innerjoin.go

Lines changed: 164 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,46 @@ import (
44
"io"
55
"os"
66
"reflect"
7+
"runtime"
8+
"strconv"
79

810
opentracing "github.com/opentracing/opentracing-go"
11+
"github.com/pbnjay/memory"
12+
"github.com/sirupsen/logrus"
913
"gopkg.in/src-d/go-mysql-server.v0/sql"
1014
)
1115

12-
const experimentalInMemoryJoinKey = "EXPERIMENTAL_IN_MEMORY_JOIN"
13-
const inMemoryJoinSessionVar = "inmemory_joins"
16+
const (
17+
experimentalInMemoryJoinKey = "EXPERIMENTAL_IN_MEMORY_JOIN"
18+
maxMemoryJoinKey = "MAX_MEMORY_INNER_JOIN"
19+
inMemoryJoinSessionVar = "inmemory_joins"
20+
memoryThresholdSessionVar = "max_memory_joins"
21+
)
22+
23+
var (
24+
useInMemoryJoins = os.Getenv(experimentalInMemoryJoinKey) != ""
25+
// Half of the total physical memory available on the OS (ignoring the
26+
// memory used by other processes).
27+
defaultMemoryThreshold = memory.TotalMemory() / 2
28+
// Maximum amount of memory the gitbase server can have in use before
29+
// considering all inner joins should be done using multipass mode.
30+
maxMemoryJoin = loadMemoryThreshold()
31+
)
32+
33+
func loadMemoryThreshold() uint64 {
34+
v, ok := os.LookupEnv(maxMemoryJoinKey)
35+
if !ok {
36+
return defaultMemoryThreshold
37+
}
38+
39+
n, err := strconv.ParseUint(v, 10, 64)
40+
if err != nil {
41+
logrus.Warnf("invalid value %q given to %s environment variable", v, maxMemoryJoinKey)
42+
return defaultMemoryThreshold
43+
}
1444

15-
var useInMemoryJoins = os.Getenv(experimentalInMemoryJoinKey) != ""
45+
return n
46+
}
1647

1748
// InnerJoin is an inner join between two tables.
1849
type InnerJoin struct {
@@ -73,27 +104,17 @@ func (j *InnerJoin) RowIter(ctx *sql.Context) (sql.RowIter, error) {
73104
inMemorySession = true
74105
}
75106

76-
var iter sql.RowIter
107+
var mode = unknownMode
77108
if useInMemoryJoins || inMemorySession {
78-
r, err := j.Right.RowIter(ctx)
79-
if err != nil {
80-
span.Finish()
81-
return nil, err
82-
}
109+
mode = memoryMode
110+
}
83111

84-
iter = &innerJoinMemoryIter{
85-
l: l,
86-
r: r,
87-
ctx: ctx,
88-
cond: j.Cond,
89-
}
90-
} else {
91-
iter = &innerJoinIter{
92-
l: l,
93-
rp: j.Right,
94-
ctx: ctx,
95-
cond: j.Cond,
96-
}
112+
iter := &innerJoinIter{
113+
l: l,
114+
rp: j.Right,
115+
ctx: ctx,
116+
cond: j.Cond,
117+
mode: mode,
97118
}
98119

99120
return sql.NewSpanIter(span, iter), nil
@@ -156,6 +177,25 @@ func (j *InnerJoin) TransformExpressions(f sql.TransformExprFunc) (sql.Node, err
156177
return NewInnerJoin(j.Left, j.Right, cond), nil
157178
}
158179

180+
// innerJoinMode defines the mode in which an inner join will be performed.
181+
type innerJoinMode byte
182+
183+
const (
184+
// unknownMode is the default mode. It will start iterating without really
185+
// knowing in which mode it will end up computing the inner join. If it
186+
// iterates the right side fully one time and so far it fits in memory,
187+
// then it will switch to memory mode. Otherwise, if at some point during
188+
// this first iteration it finds that it does not fit in memory, will
189+
// switch to multipass mode.
190+
unknownMode innerJoinMode = iota
191+
// memoryMode computes all the inner join directly in memory iterating each
192+
// side of the join exactly once.
193+
memoryMode
194+
// multipassMode computes the inner join by iterating the left side once,
195+
// and the right side one time for each row in the left side.
196+
multipassMode
197+
)
198+
159199
type innerJoinIter struct {
160200
l sql.RowIter
161201
rp rowIterProvider
@@ -164,118 +204,140 @@ type innerJoinIter struct {
164204
cond sql.Expression
165205

166206
leftRow sql.Row
167-
}
168207

169-
func (i *innerJoinIter) Next() (sql.Row, error) {
170-
for {
171-
if i.leftRow == nil {
172-
r, err := i.l.Next()
173-
if err != nil {
174-
return nil, err
175-
}
208+
// used to compute in-memory
209+
mode innerJoinMode
210+
right []sql.Row
211+
pos int
212+
}
176213

177-
i.leftRow = r
214+
func (i *innerJoinIter) loadLeft() error {
215+
if i.leftRow == nil {
216+
r, err := i.l.Next()
217+
if err != nil {
218+
return err
178219
}
179220

180-
if i.r == nil {
181-
iter, err := i.rp.RowIter(i.ctx)
182-
if err != nil {
183-
return nil, err
184-
}
221+
i.leftRow = r
222+
}
185223

186-
i.r = iter
187-
}
224+
return nil
225+
}
188226

189-
rightRow, err := i.r.Next()
190-
if err == io.EOF {
191-
i.r = nil
192-
i.leftRow = nil
193-
continue
227+
func (i *innerJoinIter) loadRightInMemory() error {
228+
iter, err := i.rp.RowIter(i.ctx)
229+
if err != nil {
230+
return err
231+
}
232+
233+
i.right, err = sql.RowIterToRows(iter)
234+
if err != nil {
235+
return err
236+
}
237+
238+
if len(i.right) == 0 {
239+
return io.EOF
240+
}
241+
242+
return nil
243+
}
244+
245+
func (i *innerJoinIter) fitsInMemory() bool {
246+
var maxMemory uint64
247+
_, v := i.ctx.Session.Get(memoryThresholdSessionVar)
248+
if n, ok := v.(int64); ok {
249+
maxMemory = uint64(n)
250+
} else {
251+
maxMemory = maxMemoryJoin
252+
}
253+
254+
if maxMemory <= 0 {
255+
return true
256+
}
257+
258+
var ms runtime.MemStats
259+
runtime.ReadMemStats(&ms)
260+
261+
return (ms.Alloc - ms.HeapIdle) < maxMemory
262+
}
263+
264+
func (i *innerJoinIter) loadRight() (row sql.Row, skip bool, err error) {
265+
if i.mode == memoryMode {
266+
if len(i.right) == 0 {
267+
if err := i.loadRightInMemory(); err != nil {
268+
return nil, false, err
269+
}
194270
}
195271

196-
if err != nil {
197-
return nil, err
272+
if i.pos >= len(i.right) {
273+
i.leftRow = nil
274+
i.pos = 0
275+
return nil, true, nil
198276
}
199277

200-
var row = make(sql.Row, len(i.leftRow)+len(rightRow))
201-
copy(row, i.leftRow)
202-
copy(row[len(i.leftRow):], rightRow)
278+
row := i.right[i.pos]
279+
i.pos++
280+
return row, false, nil
281+
}
203282

204-
v, err := i.cond.Eval(i.ctx, row)
283+
if i.r == nil {
284+
iter, err := i.rp.RowIter(i.ctx)
205285
if err != nil {
206-
return nil, err
286+
return nil, false, err
207287
}
208288

209-
if v == true {
210-
return row, nil
211-
}
289+
i.r = iter
212290
}
213-
}
214291

215-
func (i *innerJoinIter) Close() error {
216-
if err := i.l.Close(); err != nil {
217-
if i.r != nil {
218-
_ = i.r.Close()
292+
rightRow, err := i.r.Next()
293+
if err != nil {
294+
if err == io.EOF {
295+
i.r = nil
296+
i.leftRow = nil
297+
298+
// If we got to this point and the mode is still unknown it means
299+
// the right side fits in memory, so the mode changes to memory
300+
// inner join.
301+
if i.mode == unknownMode {
302+
i.mode = memoryMode
303+
}
304+
305+
return nil, true, nil
219306
}
220-
return err
307+
return nil, false, err
221308
}
222309

223-
if i.r != nil {
224-
return i.r.Close()
310+
if i.mode == unknownMode {
311+
if !i.fitsInMemory() {
312+
i.right = nil
313+
i.mode = multipassMode
314+
} else {
315+
i.right = append(i.right, rightRow)
316+
}
225317
}
226318

227-
return nil
319+
return rightRow, false, err
228320
}
229321

230-
type innerJoinMemoryIter struct {
231-
l sql.RowIter
232-
r sql.RowIter
233-
ctx *sql.Context
234-
cond sql.Expression
235-
pos int
236-
leftRow sql.Row
237-
right []sql.Row
238-
}
239-
240-
func (i *innerJoinMemoryIter) Next() (sql.Row, error) {
322+
func (i *innerJoinIter) Next() (sql.Row, error) {
241323
for {
242-
if i.leftRow == nil {
243-
r, err := i.l.Next()
244-
if err != nil {
245-
return nil, err
246-
}
247-
248-
i.leftRow = r
324+
if err := i.loadLeft(); err != nil {
325+
return nil, err
249326
}
250327

251-
if i.r != nil {
252-
for {
253-
row, err := i.r.Next()
254-
if err != nil {
255-
if err == io.EOF {
256-
break
257-
}
258-
return nil, err
259-
}
260-
261-
i.right = append(i.right, row)
262-
}
263-
i.r = nil
328+
rightRow, skip, err := i.loadRight()
329+
if err != nil {
330+
return nil, err
264331
}
265332

266-
if i.pos >= len(i.right) {
267-
i.pos = 0
268-
i.leftRow = nil
333+
if skip {
269334
continue
270335
}
271336

272-
rightRow := i.right[i.pos]
273337
var row = make(sql.Row, len(i.leftRow)+len(rightRow))
274338
copy(row, i.leftRow)
275339
copy(row[len(i.leftRow):], rightRow)
276340

277-
i.pos++
278-
279341
v, err := i.cond.Eval(i.ctx, row)
280342
if err != nil {
281343
return nil, err
@@ -287,7 +349,7 @@ func (i *innerJoinMemoryIter) Next() (sql.Row, error) {
287349
}
288350
}
289351

290-
func (i *innerJoinMemoryIter) Close() error {
352+
func (i *innerJoinIter) Close() error {
291353
if err := i.l.Close(); err != nil {
292354
if i.r != nil {
293355
_ = i.r.Close()
@@ -299,5 +361,7 @@ func (i *innerJoinMemoryIter) Close() error {
299361
return i.r.Close()
300362
}
301363

364+
i.right = nil
365+
302366
return nil
303367
}

0 commit comments

Comments
 (0)