diff --git a/docs/changelog/130463.yaml b/docs/changelog/130463.yaml new file mode 100644 index 0000000000000..e1a38ae6c96e6 --- /dev/null +++ b/docs/changelog/130463.yaml @@ -0,0 +1,5 @@ +pr: 130463 +summary: Refresh potential lost connections at query start for `_search` +area: Search +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteSearchForceConnectTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteSearchForceConnectTimeoutIT.java new file mode 100644 index 0000000000000..4856f20d07353 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteSearchForceConnectTimeoutIT.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.indices.cluster; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.TransportSearchAction; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; +import org.hamcrest.Matchers; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +public class RemoteSearchForceConnectTimeoutIT extends AbstractMultiClustersTestCase { + private static final String REMOTE_CLUSTER_1 = "cluster-a"; + + public static class ForceConnectTimeoutPlugin extends Plugin implements ClusterPlugin { + @Override + public List> getSettings() { + return List.of(ForceConnectTimeoutSetting); + } + } + + private static final Setting ForceConnectTimeoutSetting = Setting.simpleString( + "search.ccs.force_connect_timeout", + Setting.Property.NodeScope + ); + + @Override + protected List remoteClusterAlias() { + return List.of(REMOTE_CLUSTER_1); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), ForceConnectTimeoutPlugin.class); + } + + @Override + protected Settings nodeSettings() { + /* + * This is the setting that controls how long TransportSearchAction will wait for establishing a connection + * with a remote. At present, we set it to low 1s to prevent stalling the test for too long -- this is consistent + * with what we've done in other tests. + */ + return Settings.builder().put(super.nodeSettings()).put("search.ccs.force_connect_timeout", "1s").build(); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of(REMOTE_CLUSTER_1, true); + } + + public void testTimeoutSetting() { + var latch = new CountDownLatch(1); + for (String nodeName : cluster(LOCAL_CLUSTER).getNodeNames()) { + MockTransportService mts = (MockTransportService) cluster(LOCAL_CLUSTER).getInstance(TransportService.class, nodeName); + + mts.addConnectBehavior( + cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, randomFrom(cluster(REMOTE_CLUSTER_1).getNodeNames())), + ((transport, discoveryNode, profile, listener) -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + + transport.openConnection(discoveryNode, profile, listener); + }) + ); + } + + // Add some dummy data to prove we are communicating fine with the remote. + assertAcked(client(REMOTE_CLUSTER_1).admin().indices().prepareCreate("test-index")); + client(REMOTE_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get(); + client(REMOTE_CLUSTER_1).admin().indices().prepareRefresh("test-index").get(); + + /* + * Do a full restart so that our custom connect behaviour takes effect since it does not apply to + * pre-existing connections -- they're already established by the time this test runs. + */ + try { + cluster(REMOTE_CLUSTER_1).fullRestart(); + } catch (Exception e) { + throw new AssertionError(e); + } finally { + var searchRequest = new SearchRequest("*", "*:*"); + searchRequest.allowPartialSearchResults(false); + var result = safeGet(client().execute(TransportSearchAction.TYPE, searchRequest)); + + // The remote cluster should've failed. + var failures = result.getClusters().getCluster(REMOTE_CLUSTER_1).getFailures(); + assertThat(failures.size(), Matchers.equalTo(1)); + + /* + * Reason should be a timed out exception. The timeout should be equal to what we've set and there should + * be a reference to the subscribable listener -- which is what we use to listen for a valid connection. + */ + var failureReason = failures.getFirst().reason(); + assertThat( + failureReason, + Matchers.containsString("org.elasticsearch.ElasticsearchTimeoutException: timed out after [1s/1000ms]") + ); + assertThat(failureReason, Matchers.containsString("SubscribableListener")); + latch.countDown(); + result.decRef(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 69260bcac105c..00b065fad4f04 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; @@ -167,6 +168,7 @@ public class TransportSearchAction extends HandledTransportAction buildPerIndexOriginalIndices( @@ -445,7 +448,9 @@ void executeRequest( projectState, clusters, searchPhaseProvider.apply(l) - ) + ), + transportService, + forceConnectTimeoutSecs ); } else { final SearchContextId searchContext = resolvedIndices.getSearchContextId(); @@ -505,7 +510,8 @@ void executeRequest( clusters, searchPhaseProvider.apply(finalDelegate) ); - }) + }), + forceConnectTimeoutSecs ); } } @@ -633,6 +639,40 @@ public static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) { || source.collapse().getInnerHits().isEmpty(); } + /** + * Return a subscribable listener with optional timeout depending on force reconnect setting is registered or + * not. + * @param forceConnectTimeoutSecs Timeout in seconds that determines how long we'll wait to establish a connection + * to a remote. + * @param threadPool The thread pool that'll be used for the timeout. + * @param timeoutExecutor The executor that should be used for the timeout. + * @return SubscribableListener A listener with optionally added timeout. + */ + private static SubscribableListener getListenerWithOptionalTimeout( + TimeValue forceConnectTimeoutSecs, + ThreadPool threadPool, + Executor timeoutExecutor + ) { + var subscribableListener = new SubscribableListener(); + if (forceConnectTimeoutSecs != null) { + subscribableListener.addTimeout(forceConnectTimeoutSecs, threadPool, timeoutExecutor); + } + + return subscribableListener; + } + + /** + * The default disconnected strategy for Elasticsearch is RECONNECT_UNLESS_SKIP_UNAVAILABLE. So we either force + * connect if required (like in CPS) or when skip unavailable is false for a cluster. + * @param forceConnectTimeoutSecs The timeout value from the force connect setting. + * If it is set, use it as it takes precedence. + * @param skipUnavailable The usual skip unavailable setting. + * @return boolean If we should always force reconnect. + */ + private static boolean shouldEstablishConnection(TimeValue forceConnectTimeoutSecs, boolean skipUnavailable) { + return forceConnectTimeoutSecs != null || skipUnavailable == false; + } + /** * Handles ccs_minimize_roundtrips=true */ @@ -647,7 +687,9 @@ static void ccsRemoteReduce( RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener listener, - BiConsumer> localSearchConsumer + BiConsumer> localSearchConsumer, + TransportService transportService, + TimeValue forceConnectTimeoutSecs ) { final var remoteClientResponseExecutor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION); if (resolvedIndices.getLocalIndices() == null && resolvedIndices.getRemoteClusterIndices().size() == 1) { @@ -665,12 +707,9 @@ static void ccsRemoteReduce( timeProvider.absoluteStartMillis(), true ); - var remoteClusterClient = remoteClusterService.getRemoteClusterClient( - clusterAlias, - remoteClientResponseExecutor, - RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE - ); - remoteClusterClient.execute(TransportSearchAction.REMOTE_TYPE, ccsSearchRequest, new ActionListener<>() { + + var connectionListener = getListenerWithOptionalTimeout(forceConnectTimeoutSecs, threadPool, remoteClientResponseExecutor); + var searchListener = new ActionListener() { @Override public void onResponse(SearchResponse searchResponse) { // overwrite the existing cluster entry with the updated one @@ -713,7 +752,25 @@ public void onFailure(Exception e) { listener.onFailure(wrapRemoteClusterFailure(clusterAlias, e)); } } - }); + }; + + connectionListener.addListener( + searchListener.delegateFailure( + (responseListener, connection) -> transportService.sendRequest( + connection, + TransportSearchAction.TYPE.name(), + ccsSearchRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(responseListener, SearchResponse::new, remoteClientResponseExecutor) + ) + ) + ); + + remoteClusterService.maybeEnsureConnectedAndGetConnection( + clusterAlias, + shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable), + connectionListener + ); } else { SearchResponseMerger searchResponseMerger = createSearchResponseMerger( searchRequest.source(), @@ -748,12 +805,30 @@ public void onFailure(Exception e) { task.getProgressListener(), listener ); - final var remoteClusterClient = remoteClusterService.getRemoteClusterClient( + + SubscribableListener connectionListener = getListenerWithOptionalTimeout( + forceConnectTimeoutSecs, + threadPool, + remoteClientResponseExecutor + ); + + connectionListener.addListener( + ccsListener.delegateFailure( + (responseListener, connection) -> transportService.sendRequest( + connection, + TransportSearchAction.REMOTE_TYPE.name(), + ccsSearchRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(responseListener, SearchResponse::new, remoteClientResponseExecutor) + ) + ) + ); + + remoteClusterService.maybeEnsureConnectedAndGetConnection( clusterAlias, - remoteClientResponseExecutor, - RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE + shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable), + connectionListener ); - remoteClusterClient.execute(TransportSearchAction.REMOTE_TYPE, ccsSearchRequest, ccsListener); } if (resolvedIndices.getLocalIndices() != null) { ActionListener ccsListener = createCCSListener( @@ -819,7 +894,8 @@ static void collectSearchShards( SearchResponse.Clusters clusters, SearchTimeProvider timeProvider, TransportService transportService, - ActionListener> listener + ActionListener> listener, + TimeValue forceConnectTimeoutSecs ) { RemoteClusterService remoteClusterService = transportService.getRemoteClusterService(); final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); @@ -848,49 +924,59 @@ Map createFinalResponse() { return searchShardsResponses; } }; + + var threadPool = transportService.getThreadPool(); + var connectionListener = getListenerWithOptionalTimeout( + forceConnectTimeoutSecs, + threadPool, + threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION) + ); + + connectionListener.addListener(singleListener.delegateFailure((responseListener, connection) -> { + final String[] indices = entry.getValue().indices(); + final Executor responseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION); + // TODO: support point-in-time + if (searchContext == null && connection.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) { + SearchShardsRequest searchShardsRequest = new SearchShardsRequest( + indices, + indicesOptions, + query, + routing, + preference, + allowPartialResults, + clusterAlias + ); + transportService.sendRequest( + connection, + TransportSearchShardsAction.TYPE.name(), + searchShardsRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(responseListener, SearchShardsResponse::new, responseExecutor) + ); + } else { + // does not do a can-match + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest( + MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, + indices + ).indicesOptions(indicesOptions).local(true).preference(preference).routing(routing); + transportService.sendRequest( + connection, + TransportClusterSearchShardsAction.TYPE.name(), + searchShardsRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + singleListener.map(SearchShardsResponse::fromLegacyResponse), + ClusterSearchShardsResponse::new, + responseExecutor + ) + ); + } + })); + remoteClusterService.maybeEnsureConnectedAndGetConnection( clusterAlias, - skipUnavailable == false, - singleListener.delegateFailureAndWrap((delegate, connection) -> { - final String[] indices = entry.getValue().indices(); - final Executor responseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION); - // TODO: support point-in-time - if (searchContext == null && connection.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) { - SearchShardsRequest searchShardsRequest = new SearchShardsRequest( - indices, - indicesOptions, - query, - routing, - preference, - allowPartialResults, - clusterAlias - ); - transportService.sendRequest( - connection, - TransportSearchShardsAction.TYPE.name(), - searchShardsRequest, - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(delegate, SearchShardsResponse::new, responseExecutor) - ); - } else { - // does not do a can-match - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest( - MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, - indices - ).indicesOptions(indicesOptions).local(true).preference(preference).routing(routing); - transportService.sendRequest( - connection, - TransportClusterSearchShardsAction.TYPE.name(), - searchShardsRequest, - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>( - delegate.map(SearchShardsResponse::fromLegacyResponse), - ClusterSearchShardsResponse::new, - responseExecutor - ) - ); - } - }) + shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable), + connectionListener ); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 4346351c1576c..8ca76fdf07799 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -546,7 +546,9 @@ public void testCCSRemoteReduceMergeFails() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -621,7 +623,9 @@ public void testCCSRemoteReduce() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -677,7 +681,9 @@ public void testCCSRemoteReduce() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -765,7 +771,9 @@ public void testCCSRemoteReduceWhereRemoteClustersFail() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -864,7 +872,9 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -915,7 +925,9 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -988,7 +1000,9 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -1083,7 +1097,8 @@ public void testCollectSearchShards() throws Exception { clusters, timeProvider, service, - new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch) + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch), + null ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertNotNull(response.get()); @@ -1112,7 +1127,8 @@ public void testCollectSearchShards() throws Exception { clusters, timeProvider, service, - new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch) + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch), + null ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertEquals(numClusters, clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)); @@ -1160,7 +1176,8 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce clusters, timeProvider, service, - new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch) + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch), + null ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertEquals(numDisconnectedClusters, clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)); @@ -1190,7 +1207,8 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce clusters, timeProvider, service, - new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch) + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch), + null ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertNotNull(response.get()); @@ -1236,7 +1254,8 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce clusters, timeProvider, service, - new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch) + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch), + null ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertEquals(0, clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));