@@ -16,6 +16,7 @@ package sqle
16
16
17
17
import (
18
18
"fmt"
19
+ "os"
19
20
20
21
"github.com/sirupsen/logrus"
21
22
@@ -152,6 +153,11 @@ func (e *Engine) QueryNodeWithBindings(
152
153
return nil , nil , err
153
154
}
154
155
156
+ transactionDatabase , err := e .beginTransaction (ctx , parsed )
157
+ if err != nil {
158
+ return nil , nil , err
159
+ }
160
+
155
161
if len (bindings ) > 0 {
156
162
parsed , err = plan .ApplyBindings (ctx , parsed , bindings )
157
163
if err != nil {
@@ -164,11 +170,6 @@ func (e *Engine) QueryNodeWithBindings(
164
170
return nil , nil , err
165
171
}
166
172
167
- transactionDatabase , err := e .beginTransaction (ctx , parsed )
168
- if err != nil {
169
- return nil , nil , err
170
- }
171
-
172
173
iter , err = analyzed .RowIter (ctx , nil )
173
174
if err != nil {
174
175
return nil , nil , err
@@ -186,17 +187,34 @@ func (e *Engine) QueryNodeWithBindings(
186
187
return analyzed .Schema (), iter , nil
187
188
}
188
189
190
+ const (
191
+ fakeReadCommittedEnvVar = "READ_COMMITTED_HACK"
192
+ )
193
+
194
+ var fakeReadCommitted bool
195
+
196
+ func init () {
197
+ _ , ok := os .LookupEnv (fakeReadCommittedEnvVar )
198
+ if ok {
199
+ fakeReadCommitted = true
200
+ }
201
+ }
202
+
189
203
func (e * Engine ) beginTransaction (ctx * sql.Context , parsed sql.Node ) (string , error ) {
190
204
// Before we begin a transaction, we need to know if the database being operated on is not the one
191
205
// currently selected
192
- transactionDatabase := determineTransactionDatabase (ctx , parsed )
206
+ transactionDatabase := getTransactionDatabase (ctx , parsed )
193
207
194
208
// TODO: this won't work with transactions that cross database boundaries, we need to detect that and error out
195
- beginNewTransaction := ctx .GetTransaction () == nil
209
+ beginNewTransaction := ctx .GetTransaction () == nil || readCommitted ( ctx )
196
210
if beginNewTransaction {
211
+ logrus .Tracef ("Connection %d: beginning new transaction" , ctx .Session .ID ())
197
212
if len (transactionDatabase ) > 0 {
198
213
database , err := e .Catalog .Database (transactionDatabase )
199
- if err != nil {
214
+ // if the database doesn't exist, just don't start a transaction on it, let other layers complain
215
+ if sql .ErrDatabaseNotFound .Is (err ) {
216
+ return "" , nil
217
+ } else if err != nil {
200
218
return "" , err
201
219
}
202
220
@@ -214,6 +232,27 @@ func (e *Engine) beginTransaction(ctx *sql.Context, parsed sql.Node) (string, er
214
232
return transactionDatabase , nil
215
233
}
216
234
235
+ // Returns whether this session has a transaction isolation level of READ COMMITTED.
236
+ // If so, we always begin a new transaction for every statement, and commit after every statement as well.
237
+ // This is not what the READ COMMITTED isolation level is supposed to do.
238
+ func readCommitted (ctx * sql.Context ) bool {
239
+ if ! fakeReadCommitted {
240
+ return false
241
+ }
242
+
243
+ val , err := ctx .GetSessionVariable (ctx , "transaction_isolation" )
244
+ if err != nil {
245
+ return false
246
+ }
247
+
248
+ valStr , ok := val .(string )
249
+ if ! ok {
250
+ return false
251
+ }
252
+
253
+ return valStr == "READ-COMMITTED"
254
+ }
255
+
217
256
// transactionCommittingIter is a simple RowIter wrapper to allow the engine to conditionally commit a transaction
218
257
// during the Close() operation
219
258
type transactionCommittingIter struct {
@@ -247,21 +286,28 @@ func (t transactionCommittingIter) Close(ctx *sql.Context) error {
247
286
}
248
287
249
288
func isSessionAutocommit (ctx * sql.Context ) (bool , error ) {
289
+ if readCommitted (ctx ) {
290
+ return true , nil
291
+ }
292
+
250
293
autoCommitSessionVar , err := ctx .GetSessionVariable (ctx , sql .AutoCommitSessionVar )
251
294
if err != nil {
252
295
return false , err
253
296
}
254
297
return sql .ConvertToBool (autoCommitSessionVar )
255
298
}
256
299
257
- func determineTransactionDatabase (ctx * sql.Context , parsed sql.Node ) string {
258
- // For USE DATABASE statements, we need to process them here, before executing the query, so that we can set the
259
- // database for transactions appropriately
300
+ // getTransactionDatabase returns the name of the database that should be considered current for the transaction about
301
+ // to begin. The database is not guaranteed to exist.
302
+ func getTransactionDatabase (ctx * sql.Context , parsed sql.Node ) string {
303
+ // For USE DATABASE statements, we consider the transaction database to be the one being USEd
304
+ var transactionDatabase string
260
305
switch n := parsed .(type ) {
261
306
case * plan.Use :
262
- ctx .SetCurrentDatabase (n .Database ().Name ())
307
+ transactionDatabase = n .Database ().Name ()
308
+ default :
309
+ transactionDatabase = ctx .GetCurrentDatabase ()
263
310
}
264
- transactionDatabase := ctx .GetCurrentDatabase ()
265
311
266
312
switch n := parsed .(type ) {
267
313
case * plan.CreateTable :
0 commit comments