Skip to content

perf: get database dialect using multiplexed session #3684

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@
*/
abstract class AbstractMultiplexedSessionDatabaseClient implements DatabaseClient {

@Override
public Dialect getDialect() {
throw new UnsupportedOperationException();
}

@Override
public String getDatabaseRole() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ private boolean canUseMultiplexedSessionsForPartitionedOps() {

@Override
public Dialect getDialect() {
MultiplexedSessionDatabaseClient client = getMultiplexedSessionDatabaseClient();
if (client != null) {
return client.getDialect();
}
return pool.getDialect();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,13 @@ public void onSessionReady(SessionImpl session) {
.getSkipVerifyBeginTransactionForMuxRW()) {
verifyBeginTransactionWithRWOnMultiplexedSessionAsync(session.getName());
}
if (sessionClient
.getSpanner()
.getOptions()
.getSessionPoolOptions()
.isAutoDetectDialect()) {
MAINTAINER_SERVICE.submit(() -> getDialect());
}
}

@Override
Expand Down Expand Up @@ -513,6 +520,30 @@ private int getSingleUseChannelHint() {
}
}

private final AbstractLazyInitializer<Dialect> dialectSupplier =
new AbstractLazyInitializer<Dialect>() {
@Override
protected Dialect initialize() {
try (ResultSet dialectResultSet =
singleUse().executeQuery(SessionPool.DETERMINE_DIALECT_STATEMENT)) {
if (dialectResultSet.next()) {
return Dialect.fromName(dialectResultSet.getString(0));
}
}
// This should not really happen, but it is the safest fallback value.
return Dialect.GOOGLE_STANDARD_SQL;
}
};

@Override
public Dialect getDialect() {
try {
return dialectSupplier.get();
} catch (Exception exception) {
throw SpannerExceptionFactory.asSpannerException(exception);
}
}

@Override
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
return createMultiplexedSessionTransaction(/* singleUse = */ false).write(mutations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2256,13 +2256,9 @@ enum Position {
@VisibleForTesting
static final Statement DETERMINE_DIALECT_STATEMENT =
Statement.newBuilder(
"SELECT 'POSTGRESQL' AS DIALECT\n"
+ "FROM INFORMATION_SCHEMA.SCHEMATA\n"
+ "WHERE SCHEMA_NAME='information_schema'\n"
+ "UNION ALL\n"
+ "SELECT 'GOOGLE_STANDARD_SQL' AS DIALECT\n"
+ "FROM INFORMATION_SCHEMA.SCHEMATA\n"
+ "WHERE SCHEMA_NAME='INFORMATION_SCHEMA' AND CATALOG_NAME=''")
"select option_value "
+ "from information_schema.database_options "
+ "where option_name='database_dialect'")
.build();

private final SessionPoolOptions options;
Expand Down Expand Up @@ -3211,7 +3207,9 @@ public void onSessionReady(SessionImpl session) {
if (allSessions.size() >= minSessions) {
waitOnMinSessionsLatch.countDown();
}
if (options.isAutoDetectDialect() && !detectDialectStarted) {
if (options.isAutoDetectDialect()
&& !detectDialectStarted
&& !options.getUseMultiplexedSession()) {
// Get the dialect of the underlying database if that has not yet been done. Note that
// this method will release the session into the pool once it is done.
detectDialectStarted = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3966,7 +3966,7 @@ public void testCreateSessionsFailure_shouldNotPropagateToCloseMethod() {
try {
// Simulate session creation failures on the backend.
mockSpanner.setCreateSessionExecutionTime(
SimulatedExecutionTime.ofStickyException(Status.RESOURCE_EXHAUSTED.asRuntimeException()));
SimulatedExecutionTime.ofStickyException(Status.PERMISSION_DENIED.asRuntimeException()));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not directly relevant for the change in this PR, but I noticed that this test took ~30 seconds to execute. The reason for that is that RESOURCE_EXHAUSTED is now a retriable error, which means that it will automatically be retried by Gax for up to 30 seconds before failing.

// This will not cause any failure as getting a session from the pool is guaranteed to be
// non-blocking, and any exceptions will be delayed until actual query execution.
mockSpanner.freeze();
Expand All @@ -3975,8 +3975,8 @@ public void testCreateSessionsFailure_shouldNotPropagateToCloseMethod() {
DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
try (ResultSet rs = client.singleUse().executeQuery(SELECT1)) {
mockSpanner.unfreeze();
SpannerException e = assertThrows(SpannerException.class, rs::next);
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.RESOURCE_EXHAUSTED);
SpannerException exception = assertThrows(SpannerException.class, rs::next);
assertEquals(ErrorCode.PERMISSION_DENIED, exception.getErrorCode());
}
} finally {
mockSpanner.setCreateSessionExecutionTime(SimulatedExecutionTime.none());
Expand Down Expand Up @@ -4515,6 +4515,8 @@ public void testGetDialectPostgreSQLPreloaded() {
public void testGetDialect_FailsDirectlyIfDatabaseNotFound() {
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.stickyDatabaseNotFoundException("invalid-database"));
mockSpanner.setCreateSessionExecutionTime(
SimulatedExecutionTime.stickyDatabaseNotFoundException("invalid-database"));
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));

Expand All @@ -4531,6 +4533,8 @@ public void testGetDialect_FailsDirectlyIfDatabaseNotFound() {
public void testGetDialectDefaultPreloaded_FailsDirectlyIfDatabaseNotFound() {
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.stickyDatabaseNotFoundException("invalid-database"));
mockSpanner.setCreateSessionExecutionTime(
SimulatedExecutionTime.stickyDatabaseNotFoundException("invalid-database"));
try (Spanner spanner =
this.spanner
.getOptions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ForceCloseSpannerFunction;
import com.google.cloud.spanner.MockSpannerServiceImpl;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
Expand Down Expand Up @@ -198,6 +199,8 @@ public void getOperation(
mockSpanner.putStatementResult(
StatementResult.query(SELECT_RANDOM_STATEMENT, RANDOM_RESULT_SET));
mockSpanner.putStatementResult(StatementResult.query(SELECT1_STATEMENT, SELECT1_RESULTSET));
mockSpanner.putStatementResult(
StatementResult.detectDialectResult(Dialect.GOOGLE_STANDARD_SQL));

futureParentHandlers = Logger.getLogger(AbstractFuture.class.getName()).getUseParentHandlers();
exceptionRunnableParentHandlers =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,8 @@ public void testPostgreSQLGetDialect() {
public void testGetDialect_DatabaseNotFound() throws Exception {
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.stickyDatabaseNotFoundException("invalid-database"));
mockSpanner.setCreateSessionExecutionTime(
SimulatedExecutionTime.stickyDatabaseNotFoundException("invalid-database"));
try (Connection connection = createConnection()) {
SpannerException exception = assertThrows(SpannerException.class, connection::getDialect);
assertEquals(ErrorCode.NOT_FOUND, exception.getErrorCode());
Expand Down
Loading