Skip to content

Commit 408a1b6

Browse files
committed
Fix for Bug#103878 (32954449), CONNECTOR/J 8 : QUERY WITH 'SHOW XXX'
WILL GET EXCEPTION WHEN USE CURSOR.
1 parent 2ff451f commit 408a1b6

File tree

11 files changed

+285
-32
lines changed

11 files changed

+285
-32
lines changed

CHANGES

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
Version 8.0.27
55

6+
- Fix for Bug#103878 (32954449), CONNECTOR/J 8 : QUERY WITH 'SHOW XXX' WILL GET EXCEPTION WHEN USE CURSOR.
7+
68
- Fix for Bug#103796 (32922715), CONNECTOR/J 8 STMT SETQUERYTIMEOUT CAN NOT WORK.
79
Thanks to Hong Wang for his contribution.
810

src/main/core-api/java/com/mysql/cj/protocol/MessageReader.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018, 2020, Oracle and/or its affiliates.
2+
* Copyright (c) 2018, 2021, Oracle and/or its affiliates.
33
*
44
* This program is free software; you can redistribute it and/or modify it under
55
* the terms of the GNU General Public License, version 2.0, as published by the
@@ -46,6 +46,18 @@ public interface MessageReader<H extends MessageHeader, M extends Message> {
4646
*/
4747
H readHeader() throws IOException;
4848

49+
/**
50+
* Read the next message header from server, possibly blocking indefinitely until the message is received,
51+
* and cache it so that the next {@link #readHeader()} return the same header.
52+
*
53+
* @return {@link MessageHeader} of the next message
54+
* @throws IOException
55+
* if an error occurs
56+
*/
57+
default H probeHeader() throws IOException {
58+
return readHeader();
59+
}
60+
4961
/**
5062
* Read message from server into to the given {@link Message} instance or into the new one if not present.
5163
* For asynchronous channel it synchronously reads the next message in the stream, blocking until the message is read fully.
@@ -61,6 +73,24 @@ public interface MessageReader<H extends MessageHeader, M extends Message> {
6173
*/
6274
M readMessage(Optional<M> reuse, H header) throws IOException;
6375

76+
/**
77+
* Read message from server into to the given {@link Message} instance or into the new one if not present
78+
* and cache it so that the next {@link #readMessage(Optional, MessageHeader)} return the same message.
79+
* For asynchronous channel it synchronously reads the next message in the stream, blocking until the message is read fully.
80+
* Could throw CJCommunicationsException wrapping an {@link IOException} during read or parse
81+
*
82+
* @param reuse
83+
* {@link Message} object to reuse. May be ignored by implementation.
84+
* @param header
85+
* {@link MessageHeader} instance
86+
* @return {@link Message} instance
87+
* @throws IOException
88+
* if an error occurs
89+
*/
90+
default M probeMessage(Optional<M> reuse, H header) throws IOException {
91+
return readMessage(reuse, header);
92+
}
93+
6494
/**
6595
* Read message from server into to the given {@link Message} instance or into the new one if not present.
6696
* For asynchronous channel it synchronously reads the next message in the stream, blocking until the message is read fully.

src/main/protocol-impl/java/com/mysql/cj/protocol/a/BinaryResultsetReader.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,25 @@ public Resultset read(int maxRows, boolean streamResults, NativePacketPayload re
7272
boolean isCursorPossible = this.protocol.getPropertySet().getBooleanProperty(PropertyKey.useCursorFetch).getValue()
7373
&& resultSetFactory.getResultSetType() == Type.FORWARD_ONLY && resultSetFactory.getFetchSize() > 0;
7474

75-
// There is no EOF packet after fields when CLIENT_DEPRECATE_EOF is set;
76-
// if we asked to use cursor then there should be an OK or an ERR packet here
75+
// At this point 3 types of packets are expected:
76+
// 1. If CLIENT_DEPRECATE_EOF is not set then an EOF packet is always expected to be the next one.
77+
// 2. If CLIENT_DEPRECATE_EOF is set and a cursor was created then the next packet is an OK with 0xFE signature.
78+
// 3. If CLIENT_DEPRECATE_EOF is set and a cursor was not created then the next packet is a ProtocolBinary::ResultsetRow.
79+
// If CLIENT_DEPRECATE_EOF is set, there is no way to tell which one, OK or ResultsetRow, is the next packet, so it should be read with a special caching method.
7780
if (isCursorPossible || !this.protocol.getServerSession().isEOFDeprecated()) {
78-
NativePacketPayload rowPacket = this.protocol.readMessage(this.protocol.getReusablePacket());
81+
// Read the next packet but leave it in the reader cache. In case it's not the OK or EOF one it will be read again by ResultSet factories.
82+
NativePacketPayload rowPacket = this.protocol.probeMessage(this.protocol.getReusablePacket());
7983
this.protocol.checkErrorMessage(rowPacket);
80-
this.protocol.readServerStatusForResultSets(rowPacket, true);
84+
if (rowPacket.isResultSetOKPacket() || rowPacket.isEOFPacket()) {
85+
// Consume the OK/EOF packet from the reader cache and read the status flags from it;
86+
// The SERVER_STATUS_CURSOR_EXISTS flag should indicate the cursor state in this case.
87+
rowPacket = this.protocol.readMessage(this.protocol.getReusablePacket());
88+
this.protocol.readServerStatusForResultSets(rowPacket, true);
89+
} else {
90+
// If it's not an OK/EOF then the cursor is not created and this recent packet is a row.
91+
// Retain the packet in the reader cache.
92+
isCursorPossible = false;
93+
}
8194
}
8295

8396
ResultsetRows rows = null;

src/main/protocol-impl/java/com/mysql/cj/protocol/a/DebugBufferingPacketReader.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016, 2020, Oracle and/or its affiliates.
2+
* Copyright (c) 2016, 2021, Oracle and/or its affiliates.
33
*
44
* This program is free software; you can redistribute it and/or modify it under
55
* the terms of the GNU General Public License, version 2.0, as published by the
@@ -63,11 +63,17 @@ public DebugBufferingPacketReader(MessageReader<NativePacketHeader, NativePacket
6363

6464
@Override
6565
public NativePacketHeader readHeader() throws IOException {
66-
6766
byte prevPacketSeq = this.packetReader.getMessageSequence();
67+
return readHeaderLocal(prevPacketSeq, this.packetReader.readHeader());
68+
}
6869

69-
NativePacketHeader hdr = this.packetReader.readHeader();
70+
@Override
71+
public NativePacketHeader probeHeader() throws IOException {
72+
byte prevPacketSeq = this.packetReader.getMessageSequence();
73+
return readHeaderLocal(prevPacketSeq, this.packetReader.probeHeader());
74+
}
7075

76+
private NativePacketHeader readHeaderLocal(byte prevPacketSeq, NativePacketHeader hdr) throws IOException {
7177
// Normally we shouldn't get into situation of getting packets out of order from server,
7278
// so we do this check only in debug mode.
7379
byte currPacketSeq = hdr.getMessageSequence();
@@ -124,6 +130,36 @@ public NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, Nati
124130
return buf;
125131
}
126132

133+
@Override
134+
public NativePacketPayload probeMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {
135+
int packetLength = header.getMessageSize();
136+
NativePacketPayload buf = this.packetReader.probeMessage(reuse, header);
137+
138+
int bytesToDump = Math.min(MAX_PACKET_DUMP_LENGTH, packetLength);
139+
String PacketPayloadImpl = StringUtils.dumpAsHex(buf.getByteBuffer(), bytesToDump);
140+
141+
StringBuilder packetDump = new StringBuilder(DEBUG_MSG_LEN + NativeConstants.HEADER_LENGTH + PacketPayloadImpl.length());
142+
packetDump.append("Server ");
143+
packetDump.append(reuse.isPresent() ? "(re-used) " : "(new) ");
144+
packetDump.append(buf.toString());
145+
packetDump.append(" --------------------> Client\n");
146+
packetDump.append("\nPacket payload:\n\n");
147+
packetDump.append(this.lastHeaderPayload);
148+
packetDump.append(PacketPayloadImpl);
149+
150+
if (bytesToDump == MAX_PACKET_DUMP_LENGTH) {
151+
packetDump.append("\nNote: Packet of " + packetLength + " bytes truncated to " + MAX_PACKET_DUMP_LENGTH + " bytes.\n");
152+
}
153+
154+
if ((this.packetDebugBuffer.size() + 1) > this.packetDebugBufferSize.getValue()) {
155+
this.packetDebugBuffer.removeFirst();
156+
}
157+
158+
this.packetDebugBuffer.addLast(packetDump);
159+
160+
return buf;
161+
}
162+
127163
@Override
128164
public byte getMessageSequence() {
129165
return this.packetReader.getMessageSequence();

src/main/protocol-impl/java/com/mysql/cj/protocol/a/MultiPacketReader.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016, 2020, Oracle and/or its affiliates.
2+
* Copyright (c) 2016, 2021, Oracle and/or its affiliates.
33
*
44
* This program is free software; you can redistribute it and/or modify it under
55
* the terms of the GNU General Public License, version 2.0, as published by the
@@ -54,6 +54,11 @@ public NativePacketHeader readHeader() throws IOException {
5454
return this.packetReader.readHeader();
5555
}
5656

57+
@Override
58+
public NativePacketHeader probeHeader() throws IOException {
59+
return this.packetReader.probeHeader();
60+
}
61+
5762
@Override
5863
public NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {
5964

@@ -93,6 +98,45 @@ public NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, Nati
9398
return buf;
9499
}
95100

101+
@Override
102+
public NativePacketPayload probeMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {
103+
104+
int packetLength = header.getMessageSize();
105+
NativePacketPayload buf = this.packetReader.probeMessage(reuse, header);
106+
107+
if (packetLength == NativeConstants.MAX_PACKET_SIZE) { // it's a multi-packet
108+
109+
buf.setPosition(NativeConstants.MAX_PACKET_SIZE);
110+
111+
NativePacketPayload multiPacket = null;
112+
int multiPacketLength = -1;
113+
byte multiPacketSeq = getMessageSequence();
114+
115+
do {
116+
NativePacketHeader hdr = readHeader();
117+
multiPacketLength = hdr.getMessageSize();
118+
119+
if (multiPacket == null) {
120+
multiPacket = new NativePacketPayload(multiPacketLength);
121+
}
122+
123+
multiPacketSeq++;
124+
if (multiPacketSeq != hdr.getMessageSequence()) {
125+
throw new IOException(Messages.getString("PacketReader.10"));
126+
}
127+
128+
this.packetReader.probeMessage(Optional.of(multiPacket), hdr);
129+
130+
buf.writeBytes(StringLengthDataType.STRING_FIXED, multiPacket.getByteBuffer(), 0, multiPacketLength);
131+
132+
} while (multiPacketLength == NativeConstants.MAX_PACKET_SIZE);
133+
134+
buf.setPosition(0);
135+
}
136+
137+
return buf;
138+
}
139+
96140
@Override
97141
public byte getMessageSequence() {
98142
return this.packetReader.getMessageSequence();

src/main/protocol-impl/java/com/mysql/cj/protocol/a/NativePacketPayload.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public boolean isErrorPacket() {
217217
* @return true if it is a EOF packet
218218
*/
219219
public final boolean isEOFPacket() {
220-
return (this.byteBuffer[0] & 0xff) == TYPE_ID_EOF && (getPayloadLength() <= 5);
220+
return (this.byteBuffer[0] & 0xff) == TYPE_ID_EOF && (this.payloadLength <= 5);
221221
}
222222

223223
/**
@@ -247,7 +247,7 @@ public final boolean isOKPacket() {
247247
* @return true if it is an OK packet for ResultSet
248248
*/
249249
public final boolean isResultSetOKPacket() {
250-
return (this.byteBuffer[0] & 0xff) == TYPE_ID_EOF && (getPayloadLength() < 16777215);
250+
return (this.byteBuffer[0] & 0xff) == TYPE_ID_EOF && (this.payloadLength > 5) && (this.payloadLength < 16777215);
251251
}
252252

253253
/**

src/main/protocol-impl/java/com/mysql/cj/protocol/a/NativeProtocol.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,22 @@ public final NativePacketPayload readMessage(NativePacketPayload reuse) {
525525
}
526526
}
527527

528+
public final NativePacketPayload probeMessage(NativePacketPayload reuse) {
529+
try {
530+
NativePacketHeader header = this.packetReader.probeHeader();
531+
NativePacketPayload buf = this.packetReader.probeMessage(Optional.ofNullable(reuse), header);
532+
this.packetSequence = header.getMessageSequence();
533+
return buf;
534+
535+
} catch (IOException ioEx) {
536+
throw ExceptionFactory.createCommunicationsException(this.propertySet, this.serverSession, this.getPacketSentTimeHolder(),
537+
this.getPacketReceivedTimeHolder(), ioEx, getExceptionInterceptor());
538+
} catch (OutOfMemoryError oom) {
539+
throw ExceptionFactory.createException(oom.getMessage(), MysqlErrorNumbers.SQL_STATE_MEMORY_ALLOCATION_ERROR, 0, false, oom,
540+
this.exceptionInterceptor);
541+
}
542+
}
543+
528544
/**
529545
* @param packet
530546
* {@link Message}

src/main/protocol-impl/java/com/mysql/cj/protocol/a/SimplePacketReader.java

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016, 2020, Oracle and/or its affiliates.
2+
* Copyright (c) 2016, 2021, Oracle and/or its affiliates.
33
*
44
* This program is free software; you can redistribute it and/or modify it under
55
* the terms of the GNU General Public License, version 2.0, as published by the
@@ -49,25 +49,40 @@ public class SimplePacketReader implements MessageReader<NativePacketHeader, Nat
4949

5050
private byte readPacketSequence = -1;
5151

52+
NativePacketHeader lastHeader = null;
53+
NativePacketPayload lastMessage = null;
54+
5255
public SimplePacketReader(SocketConnection socketConnection, RuntimeProperty<Integer> maxAllowedPacket) {
5356
this.socketConnection = socketConnection;
5457
this.maxAllowedPacket = maxAllowedPacket;
5558
}
5659

5760
@Override
5861
public NativePacketHeader readHeader() throws IOException {
62+
if (this.lastHeader == null) {
63+
return readHeaderLocal();
64+
}
65+
NativePacketHeader hdr = this.lastHeader;
66+
this.lastHeader = null;
67+
this.readPacketSequence = hdr.getMessageSequence();
68+
return hdr;
69+
}
5970

71+
@Override
72+
public NativePacketHeader probeHeader() throws IOException {
73+
this.lastHeader = readHeaderLocal();
74+
return this.lastHeader;
75+
}
76+
77+
private NativePacketHeader readHeaderLocal() throws IOException {
6078
NativePacketHeader hdr = new NativePacketHeader();
6179

6280
try {
6381
this.socketConnection.getMysqlInput().readFully(hdr.getBuffer().array(), 0, NativeConstants.HEADER_LENGTH);
64-
6582
int packetLength = hdr.getMessageSize();
66-
6783
if (packetLength > this.maxAllowedPacket.getValue()) {
6884
throw new CJPacketTooBigException(packetLength, this.maxAllowedPacket.getValue());
6985
}
70-
7186
} catch (IOException | CJPacketTooBigException e) {
7287
try {
7388
this.socketConnection.forceClose();
@@ -78,38 +93,52 @@ public NativePacketHeader readHeader() throws IOException {
7893
}
7994

8095
this.readPacketSequence = hdr.getMessageSequence();
81-
8296
return hdr;
8397
}
8498

8599
@Override
86100
public NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {
101+
if (this.lastMessage == null) {
102+
return readMessageLocal(reuse, header);
103+
}
104+
NativePacketPayload buf = this.lastMessage;
105+
this.lastMessage = null;
106+
return buf;
107+
}
108+
109+
@Override
110+
public NativePacketPayload probeMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {
111+
this.lastMessage = readMessageLocal(reuse, header);
112+
return this.lastMessage;
113+
}
114+
115+
private NativePacketPayload readMessageLocal(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {
87116
try {
88117
int packetLength = header.getMessageSize();
89-
NativePacketPayload buf;
118+
NativePacketPayload message;
90119
if (reuse.isPresent()) {
91-
buf = reuse.get();
120+
message = reuse.get();
92121
// Set the Buffer to it's original state
93-
buf.setPosition(0);
122+
message.setPosition(0);
94123
// Do we need to re-alloc the byte buffer?
95-
if (buf.getByteBuffer().length < packetLength) {
124+
if (message.getByteBuffer().length < packetLength) {
96125
// Note: We actually check the length of the buffer, rather than getBufLength(), because getBufLength()
97126
// is not necessarily the actual length of the byte array used as the buffer
98-
buf.setByteBuffer(new byte[packetLength]);
127+
message.setByteBuffer(new byte[packetLength]);
99128
}
100129

101130
// Set the new length
102-
buf.setPayloadLength(packetLength);
131+
message.setPayloadLength(packetLength);
103132
} else {
104-
buf = new NativePacketPayload(new byte[packetLength]);
133+
message = new NativePacketPayload(new byte[packetLength]);
105134
}
106135

107136
// Read the data from the server
108-
int numBytesRead = this.socketConnection.getMysqlInput().readFully(buf.getByteBuffer(), 0, packetLength);
137+
int numBytesRead = this.socketConnection.getMysqlInput().readFully(message.getByteBuffer(), 0, packetLength);
109138
if (numBytesRead != packetLength) {
110139
throw new IOException(Messages.getString("PacketReader.1", new Object[] { packetLength, numBytesRead }));
111140
}
112-
return buf;
141+
return message;
113142

114143
} catch (IOException e) {
115144
try {

0 commit comments

Comments
 (0)