Skip to content

Commit 79c0684

Browse files
authored
fix: return non-empty metadata for DataBoost queries (#3936)
Queries that use the PartitionedQuery API in combination with DataBoost can return completely empty result sets (no results and no metadata) for some partitions. The MergedResultSet should ignore these result sets and not use them as the source of metadata for the merged result.
1 parent f056436 commit 79c0684

File tree

2 files changed

+69
-14
lines changed

2 files changed

+69
-14
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/MergedResultSet.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.cloud.spanner.SpannerExceptionFactory;
2626
import com.google.cloud.spanner.Struct;
2727
import com.google.cloud.spanner.Type;
28+
import com.google.cloud.spanner.Type.Code;
2829
import com.google.common.base.Preconditions;
2930
import com.google.common.base.Supplier;
3031
import com.google.spanner.v1.ResultSetMetadata;
@@ -82,9 +83,11 @@ public void run() {
8283
break;
8384
}
8485
}
85-
if (first) {
86-
// Special case: The result set did not return any rows. Push the metadata to the merged
87-
// result set.
86+
if (first
87+
&& resultSet.getType().getCode() == Code.STRUCT
88+
&& !resultSet.getType().getStructFields().isEmpty()) {
89+
// Special case: The result set did not return any rows, but did return metadata.
90+
// Push the metadata to the merged result set.
8891
queue.put(
8992
PartitionExecutorResult.typeAndMetadata(
9093
resultSet.getType(), resultSet.getMetadata()));
@@ -319,13 +322,17 @@ public Struct get() {
319322
return currentRow;
320323
}
321324

322-
private PartitionExecutorResult getFirstResult() {
325+
private PartitionExecutorResult getFirstResultWithMetadata() {
323326
try {
324327
metadataAvailableLatch.await();
325328
} catch (InterruptedException interruptedException) {
326329
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
327330
}
328-
PartitionExecutorResult result = queue.peek();
331+
PartitionExecutorResult result =
332+
queue.stream()
333+
.filter(rs -> rs.metadata != null || rs.exception != null)
334+
.findFirst()
335+
.orElse(null);
329336
if (result == null) {
330337
throw SpannerExceptionFactory.newSpannerException(
331338
ErrorCode.FAILED_PRECONDITION, "Thread-unsafe access to ResultSet");
@@ -338,7 +345,7 @@ private PartitionExecutorResult getFirstResult() {
338345

339346
public ResultSetMetadata getMetadata() {
340347
if (metadata == null) {
341-
return getFirstResult().metadata;
348+
return getFirstResultWithMetadata().metadata;
342349
}
343350
return metadata;
344351
}
@@ -355,7 +362,7 @@ public int getParallelism() {
355362

356363
public Type getType() {
357364
if (type == null) {
358-
return getFirstResult().type;
365+
return getFirstResultWithMetadata().type;
359366
}
360367
return type;
361368
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/MergedResultSetTest.java

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import com.google.cloud.spanner.SpannerExceptionFactory;
3333
import com.google.cloud.spanner.Struct;
3434
import com.google.cloud.spanner.Type;
35+
import com.google.spanner.v1.ResultSetMetadata;
36+
import com.google.spanner.v1.StructType;
3537
import java.util.ArrayList;
3638
import java.util.BitSet;
3739
import java.util.Collection;
@@ -103,7 +105,7 @@ public static Collection<Object[]> parameters() {
103105
return params;
104106
}
105107

106-
private MockedResults setupResults(boolean withErrors) {
108+
private MockedResults setupResults(boolean withErrors, boolean withEmptyResults) {
107109
Random random = new Random();
108110
Connection connection = mock(Connection.class);
109111
List<String> partitions = new ArrayList<>();
@@ -122,10 +124,22 @@ private MockedResults setupResults(boolean withErrors) {
122124
when(connection.runPartition(partition))
123125
.thenReturn(new ResultSetWithError(ResultSetsHelper.fromProto(proto), errorIndex));
124126
} else {
125-
when(connection.runPartition(partition)).thenReturn(ResultSetsHelper.fromProto(proto));
126-
try (ResultSet resultSet = ResultSetsHelper.fromProto(proto)) {
127-
while (resultSet.next()) {
128-
allRows.add(resultSet.getCurrentRowAsStruct());
127+
if (withEmptyResults && numPartitions > 1 && index == 0) {
128+
when(connection.runPartition(partition))
129+
.thenReturn(
130+
ResultSetsHelper.fromProto(
131+
com.google.spanner.v1.ResultSet.newBuilder()
132+
.setMetadata(
133+
ResultSetMetadata.newBuilder()
134+
.setRowType(StructType.newBuilder().build())
135+
.build())
136+
.build()));
137+
} else {
138+
when(connection.runPartition(partition)).thenReturn(ResultSetsHelper.fromProto(proto));
139+
try (ResultSet resultSet = ResultSetsHelper.fromProto(proto)) {
140+
while (resultSet.next()) {
141+
allRows.add(resultSet.getCurrentRowAsStruct());
142+
}
129143
}
130144
}
131145
}
@@ -135,7 +149,7 @@ private MockedResults setupResults(boolean withErrors) {
135149

136150
@Test
137151
public void testAllResultsAreReturned() {
138-
MockedResults results = setupResults(false);
152+
MockedResults results = setupResults(/* withErrors= */ false, /* withEmptyResults= */ false);
139153
BitSet rowsFound = new BitSet(results.allRows.size());
140154
try (MergedResultSet resultSet =
141155
new MergedResultSet(results.connection, results.partitions, maxParallelism)) {
@@ -170,7 +184,7 @@ public void testAllResultsAreReturned() {
170184

171185
@Test
172186
public void testResultSetStopsAfterFirstError() {
173-
MockedResults results = setupResults(true);
187+
MockedResults results = setupResults(/* withErrors= */ true, /* withEmptyResults= */ false);
174188
try (MergedResultSet resultSet =
175189
new MergedResultSet(results.connection, results.partitions, maxParallelism)) {
176190
if (numPartitions > 0) {
@@ -194,6 +208,40 @@ public void testResultSetStopsAfterFirstError() {
194208
}
195209
}
196210

211+
@Test
212+
public void testResultSetReturnsNonEmptyMetadata() {
213+
MockedResults results = setupResults(/* withErrors= */ false, /* withEmptyResults= */ true);
214+
BitSet rowsFound = new BitSet(results.allRows.size());
215+
try (MergedResultSet resultSet =
216+
new MergedResultSet(results.connection, results.partitions, maxParallelism)) {
217+
if (numPartitions > 0) {
218+
assertNotNull(resultSet.getMetadata());
219+
assertEquals(26, resultSet.getMetadata().getRowType().getFieldsCount());
220+
}
221+
while (resultSet.next()) {
222+
assertRowExists(results.allRows, resultSet.getCurrentRowAsStruct(), rowsFound);
223+
}
224+
if (numPartitions == 0) {
225+
assertEquals(0, resultSet.getColumnCount());
226+
} else {
227+
assertEquals(26, resultSet.getColumnCount());
228+
assertEquals(Type.bool(), resultSet.getColumnType(0));
229+
assertEquals(Type.bool(), resultSet.getColumnType("COL0"));
230+
assertEquals(10, resultSet.getColumnIndex("COL10"));
231+
}
232+
// Check that all rows were found.
233+
assertEquals(results.allRows.size(), rowsFound.nextClearBit(0));
234+
// Check extended metadata.
235+
assertEquals(numPartitions, resultSet.getNumPartitions());
236+
if (maxParallelism > 0) {
237+
assertEquals(Math.min(numPartitions, maxParallelism), resultSet.getParallelism());
238+
} else {
239+
int processors = Runtime.getRuntime().availableProcessors();
240+
assertEquals(Math.min(numPartitions, processors), resultSet.getParallelism());
241+
}
242+
}
243+
}
244+
197245
private void assertRowExists(List<Struct> expectedRows, Struct row, BitSet rowsFound) {
198246
for (int i = 0; i < expectedRows.size(); i++) {
199247
if (row.equals(expectedRows.get(i))) {

0 commit comments

Comments
 (0)