Skip to content

Refresh potential lost connections at query start for _search #130463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/130463.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130463
summary: Refresh potential lost connections at query start for `_search`
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.indices.cluster;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

public class RemoteSearchForceConnectTimeoutIT extends AbstractMultiClustersTestCase {
private static final String REMOTE_CLUSTER_1 = "cluster-a";

public static class ForceConnectTimeoutPlugin extends Plugin implements ClusterPlugin {
@Override
public List<Setting<?>> getSettings() {
return List.of(ForceConnectTimeoutSetting);
}
}

private static final Setting<String> ForceConnectTimeoutSetting = Setting.simpleString(
"search.ccs.force_connect_timeout",
Setting.Property.NodeScope
);

@Override
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER_1);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), ForceConnectTimeoutPlugin.class);
}

@Override
protected Settings nodeSettings() {
/*
* This is the setting that controls how long TransportSearchAction will wait for establishing a connection
* with a remote. At present, we set it to low 1s to prevent stalling the test for too long -- this is consistent
* with what we've done in other tests.
*/
return Settings.builder().put(super.nodeSettings()).put("search.ccs.force_connect_timeout", "1s").build();
}

@Override
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
return Map.of(REMOTE_CLUSTER_1, true);
}

public void testTimeoutSetting() {
var latch = new CountDownLatch(1);
for (String nodeName : cluster(LOCAL_CLUSTER).getNodeNames()) {
MockTransportService mts = (MockTransportService) cluster(LOCAL_CLUSTER).getInstance(TransportService.class, nodeName);

mts.addConnectBehavior(
cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, randomFrom(cluster(REMOTE_CLUSTER_1).getNodeNames())),
((transport, discoveryNode, profile, listener) -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}

transport.openConnection(discoveryNode, profile, listener);
})
);
}

// Add some dummy data to prove we are communicating fine with the remote.
assertAcked(client(REMOTE_CLUSTER_1).admin().indices().prepareCreate("test-index"));
client(REMOTE_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get();
client(REMOTE_CLUSTER_1).admin().indices().prepareRefresh("test-index").get();

/*
* Do a full restart so that our custom connect behaviour takes effect since it does not apply to
* pre-existing connections -- they're already established by the time this test runs.
*/
try {
cluster(REMOTE_CLUSTER_1).fullRestart();
} catch (Exception e) {
throw new AssertionError(e);
} finally {
var searchRequest = new SearchRequest("*", "*:*");
searchRequest.allowPartialSearchResults(false);
var result = safeGet(client().execute(TransportSearchAction.TYPE, searchRequest));

// The remote cluster should've failed.
var failures = result.getClusters().getCluster(REMOTE_CLUSTER_1).getFailures();
assertThat(failures.size(), Matchers.equalTo(1));

/*
* Reason should be a timed out exception. The timeout should be equal to what we've set and there should
* be a reference to the subscribable listener -- which is what we use to listen for a valid connection.
*/
var failureReason = failures.getFirst().reason();
assertThat(
failureReason,
Matchers.containsString("org.elasticsearch.ElasticsearchTimeoutException: timed out after [1s/1000ms]")
);
assertThat(failureReason, Matchers.containsString("SubscribableListener"));
latch.countDown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more convincing and full-featured test would show two things:

  1. It times out when the re-connection cannot be completed
  2. It successfully reconnects (when not blocked).

After doing this countDown on the latch, would you be able to add a test for this second scenario?

Copy link
Contributor Author

@pawankartik-elastic pawankartik-elastic Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It successfully reconnects

This would imply that I somehow disconnect the remote cluster. Unfortunately, that's not possible. Connections are already established by the time the test runs. The only way to break the connection is to shutdown the cluster but that'd mean something entirely different -- cluster is offline. If I don't disconnect the remote but still proceed with the test, there's no point since the connection was established and continues to be established for the entirety of the test. By the time countDown() is hit, the connection goes through and is established. I can stall a connection but not break it for the underlying networking code to establish it later.

Copy link
Contributor

@quux00 quux00 Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I don't understand what cluster(REMOTE_CLUSTER_1).fullRestart(); is doing and thus I don't understand this test at all. Doesn't the restart of remote_cluster_1 break the connection and make it stale?

Copy link
Contributor Author

@pawankartik-elastic pawankartik-elastic Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I cannot manually close any cluster connections, I add a connection behaviour that gets invoked when a "new" connection is established. It does not apply to pre-existing connections. So I restart the remote cluster which then establishes new connection(s). When that happens, the connection behaviour I added previously gets invoked and the connection is stalled until I call countDown(). And since the countDown() is called at the end of the test, the subscribable listener has trouble talking to the remote. The timeout is 1s in the test and hence you get the timeout exception. Once countDown() is called, the connection goes through. So any search request after this point will not exercise the maybeEnsureConnectedAndGetConnection() since it'd realise that the connection is active. All we'd be able to prove is that we can talk to the remote and not that a connection was established by maybeEnsureConnectedAndGetConnection() (because the connection went through the moment the latch was released -- the same connection that was supposed to be established when the cluster was brought up -- it was simply delayed by the latch, mimicking a stalled cluster).

result.decRef();
}
}
}
Loading
Loading