Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/mysql/binlog/rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.F
if err != nil {
panic(err)
}
d := jsonVal.MarshalTo(nil)
d := jsonVal.MarshalSQLTo(nil)
return sqltypes.MakeTrusted(sqltypes.Expression,
d), l + int(metadata), nil
}
Expand Down
10 changes: 8 additions & 2 deletions go/mysql/binlog/rbr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func TestCellLengthAndData(t *testing.T) {
0, 1, 0, 14, 0, 11, 0, 1, 0, 12, 12, 0, 97, 1, 98,
},
out: sqltypes.MakeTrusted(sqltypes.Expression,
[]byte(`{"a": "b"}`)),
[]byte(`JSON_OBJECT(_utf8mb4'a', _utf8mb4'b')`)),
}, {
typ: TypeJSON,
metadata: 4,
Expand All @@ -419,7 +419,13 @@ func TestCellLengthAndData(t *testing.T) {
0, 1, 0, 14, 0, 11, 0, 1, 0, 12, 12, 0, 97, 1, 98,
},
out: sqltypes.MakeTrusted(sqltypes.Expression,
[]byte(`{"a": "b"}`)),
[]byte(`JSON_OBJECT(_utf8mb4'a', _utf8mb4'b')`)),
}, {
typ: TypeJSON,
metadata: 2,
data: []byte{0x0b, 0x00, 15, 10, 8, 0, 0, 0, 0, 0, 30, 149, 25},
out: sqltypes.MakeTrusted(sqltypes.Expression,
[]byte(`CAST(date '2015-01-15' as JSON)`)),
}, {
typ: TypeEnum,
metadata: 1,
Expand Down
56 changes: 56 additions & 0 deletions go/mysql/json/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package json

import (
"bytes"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -166,6 +167,61 @@ func (v *Value) marshalSQLInternal(top bool, dst []byte) []byte {
}
}

var (
prefixJSONObject = []byte("JSON_OBJECT(")
prefixJSONArray = []byte("JSON_ARRAY(")
prefixCAST = []byte("CAST(")
Comment on lines +171 to +173
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels very hacky to me. 🤔

Copy link
Copy Markdown
Author

@herbenderbler herbenderbler May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels very hacky to me. 🤔

The prefix check is a workaround for type erasure on the VReplication wire path. binlog CellValue already returns sqltypes.Expression for typed JSON, but RowToProto3 only ships bytes and the target reconstructs the cell as Type_JSON. That being said, I can’t tell SQL expressions from text JSON without either a heuristic or extra metadata (similar to json_partial_values for partial updates). The prefixes match exactly what MarshalSQLTo emits. text JSON is rejected via {/[/".

I'm happy to switch to an explicit RowChange bitmap in a follow-up (or something else) if you’d prefer that over sniffing.

)

func preserializedJSONSQL(raw []byte) (trimmed []byte, ok bool) {
trimmed = bytes.TrimSpace(raw)
if len(trimmed) == 0 {
return trimmed, false
}
switch trimmed[0] {
case '{', '[', '"':
return trimmed, false
}
if bytes.HasPrefix(trimmed, prefixJSONObject) {
return trimmed, true
}
if bytes.HasPrefix(trimmed, prefixJSONArray) {
return trimmed, true
}
if bytes.HasPrefix(trimmed, prefixCAST) {
return trimmed, true
}
return trimmed, false
}

// IsPreserializedJSONSQL reports whether raw is a SQL JSON expression produced by
// Value.MarshalSQLTo on the vstreamer binlog path, as opposed to standard JSON text.
func IsPreserializedJSONSQL(raw []byte) bool {
_, ok := preserializedJSONSQL(raw)
return ok
}

// JSONSQLValue converts JSON column bytes for VReplication into a bindable SQL value.
// Bytes from the vstreamer binlog path are already SQL expressions; text JSON from
// table copy or older streams is converted via MarshalSQLValue.
func JSONSQLValue(raw []byte) (*sqltypes.Value, error) {
if trimmed, ok := preserializedJSONSQL(raw); ok {
v := sqltypes.MakeTrusted(querypb.Type_RAW, trimmed)
return &v, nil
}
return MarshalSQLValue(raw)
}

// AppendJSONSQL writes a JSON column value as SQL into buf, preserving values that
// were already encoded with MarshalSQLTo on the vstreamer binlog path.
func AppendJSONSQL(buf *bytes2.Buffer, raw []byte) error {
if trimmed, ok := preserializedJSONSQL(raw); ok {
buf.Write(trimmed)
return nil
}
return AppendMarshalSQL(buf, raw)
}

// MarshalSQLValue converts text JSON bytes into a SQL expression using
// JSON_OBJECT/JSON_ARRAY syntax. It scans the raw bytes directly without
// building an intermediate tree, avoiding the ~60x memory amplification
Expand Down
36 changes: 36 additions & 0 deletions go/mysql/json/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,42 @@ import (
"vitess.io/vitess/go/sqltypes"
)

func TestIsPreserializedJSONSQL(t *testing.T) {
assert.True(t, IsPreserializedJSONSQL([]byte(` JSON_OBJECT(_utf8mb4'a', 1)`)))
assert.True(t, IsPreserializedJSONSQL([]byte(`JSON_ARRAY(1)`)))
assert.True(t, IsPreserializedJSONSQL([]byte(`CAST(date '2015-01-15' as JSON)`)))
assert.False(t, IsPreserializedJSONSQL([]byte(`{"a": 1}`)))
assert.False(t, IsPreserializedJSONSQL([]byte(`[1, 2]`)))
assert.False(t, IsPreserializedJSONSQL([]byte(`"scalar"`)))
assert.False(t, IsPreserializedJSONSQL([]byte(` {"a": 1}`)))
assert.False(t, IsPreserializedJSONSQL(nil))
assert.False(t, IsPreserializedJSONSQL([]byte(` `)))
}

func TestJSONSQLValue(t *testing.T) {
sqlExpr := []byte(`JSON_OBJECT(_utf8mb4'created', CAST(date '2024-01-15' as JSON))`)
got, err := JSONSQLValue(sqlExpr)
require.NoError(t, err)
require.Equal(t, string(sqlExpr), string(got.Raw()))

textJSON := []byte(`{"created": "2024-01-15"}`)
got, err = JSONSQLValue(textJSON)
require.NoError(t, err)
require.Contains(t, got.RawStr(), "JSON_OBJECT(")
require.Contains(t, got.RawStr(), "2024-01-15")
require.NotContains(t, got.RawStr(), "CAST(date")
}

func TestAppendJSONSQL(t *testing.T) {
buf := &bytes2.Buffer{}
require.NoError(t, AppendJSONSQL(buf, []byte(`CAST(date '2015-01-15' as JSON)`)))
require.Equal(t, `CAST(date '2015-01-15' as JSON)`, buf.String())

buf.Reset()
require.NoError(t, AppendJSONSQL(buf, []byte(`{"a":1}`)))
require.Equal(t, `JSON_OBJECT(_utf8mb4'a', 1)`, buf.String())
}

func TestMarshalSQLTo(t *testing.T) {
testcases := []struct {
input string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ func (tp *TablePlan) bindAfterJSONFieldVals(rowChange *binlogdatapb.RowChange, a
fmt.Appendf(nil, afterVals[i].RawStr(), sqlescape.EscapeID(field.Name))))
}
default: // A JSON value (which may be a JSON null literal value)
newVal, err = vjson.MarshalSQLValue(afterVals[i].Raw())
newVal, err = vjson.JSONSQLValue(afterVals[i].Raw())
if err != nil {
return err
}
Expand Down Expand Up @@ -783,7 +783,7 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
// If the JSON column was NOT updated then the JSON column is marked as partial
// and the diff is empty as a way to exclude it from the AFTER image. So we
// want to use the BEFORE image value.
beforeVal, err := vjson.MarshalSQLValue(bindvars["b_"+field.Name].Value)
beforeVal, err := vjson.JSONSQLValue(bindvars["b_"+field.Name].Value)
if err != nil {
return nil, vterrors.Wrapf(err, "failed to convert JSON to SQL field value for %s.%s when building insert query",
tp.TargetName, field.Name)
Expand Down Expand Up @@ -945,7 +945,7 @@ func (tp *TablePlan) applyBulkInsertChanges(rowInserts []*binlogdatapb.RowChange
if vals[n].IsNull() { // An SQL NULL and not an actual JSON value
jsVal = &sqltypes.NULL
} else { // A JSON value (which may be a JSON null literal value)
jsVal, err = vjson.MarshalSQLValue(vals[n].Raw())
jsVal, err = vjson.JSONSQLValue(vals[n].Raw())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
buf.WriteString(sqltypes.NullStr)
} else {
raw := row.Values[offset : offset+length]
if err := vjson.AppendMarshalSQL(buf, raw); err != nil {
if err := vjson.AppendJSONSQL(buf, raw); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,52 @@ func TestAppendFromRowLargeJSON(t *testing.T) {
assert.Contains(t, result, "JSON_ARRAY(")
}

func TestAppendFromRowPreserializedJSON(t *testing.T) {
sqlExpr := `JSON_OBJECT(_utf8mb4'created', CAST(date '2024-01-15' as JSON))`
tp := &TablePlan{
BulkInsertValues: sqlparser.BuildParsedQuery("(%a)",
":c1",
),
Fields: []*querypb.Field{
{Name: "c1", Type: querypb.Type_JSON},
},
FieldsToSkip: map[string]bool{},
}

row := sqltypes.RowToProto3([]sqltypes.Value{
sqltypes.MakeTrusted(querypb.Type_JSON, []byte(sqlExpr)),
})

buf := &bytes2.Buffer{}
err := tp.appendFromRow(buf, row)
require.NoError(t, err)
result := buf.String()
assert.Contains(t, result, "CAST(date '2024-01-15' as JSON)")
assert.NotContains(t, result, "JSON_QUOTE")
}

func TestBindAfterJSONFieldValsPreservesMySQLJSONTypes(t *testing.T) {
sqlExpr := `JSON_OBJECT(_utf8mb4'created', CAST(date '2024-01-15' as JSON))`
tp := &TablePlan{
Insert: sqlparser.BuildParsedQuery("insert into t(id, doc) values (%a, %a)",
":a_id", ":a_doc",
),
Fields: []*querypb.Field{
{Name: "id", Type: querypb.Type_INT64},
{Name: "doc", Type: querypb.Type_JSON},
},
}
afterVals := sqltypes.MakeRowTrusted(tp.Fields, sqltypes.RowToProto3([]sqltypes.Value{
sqltypes.MakeTrusted(querypb.Type_INT64, []byte("1")),
sqltypes.MakeTrusted(querypb.Type_JSON, []byte(sqlExpr)),
}))
bindvars := make(map[string]*querypb.BindVariable)
err := tp.bindAfterJSONFieldVals(&binlogdatapb.RowChange{}, afterVals, bindvars)
require.NoError(t, err)
assert.Contains(t, string(bindvars["a_doc"].Value), "CAST(date '2024-01-15' as JSON)")
assert.NotContains(t, string(bindvars["a_doc"].Value), "JSON_QUOTE")
}

func TestAppendFromRowSmallJSON(t *testing.T) {
// Verify that small JSON values use the tree encoding (JSON_OBJECT/JSON_ARRAY).
tp := &TablePlan{
Expand Down
Loading