diff --git a/src/main/java/com/rabbitmq/stream/impl/ShutdownService.java b/src/main/java/com/rabbitmq/stream/impl/ShutdownService.java
new file mode 100644
index 0000000000..ef59dc5823
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/impl/ShutdownService.java
@@ -0,0 +1,85 @@
+// Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper to register callbacks and call them in reverse order. Registered callbacks are made
+ * automatically idempotent.
+ *
+ *
This class can be used to register closing callbacks, call them individually, and/or call all
+ * of them (in LIFO order) with the {@link #close()} method.
+ *
+ *
From
+ * https://github.com/rabbitmq/rabbitmq-perf-test/blob/main/src/main/java/com/rabbitmq/perf/ShutdownService.java.
+ */
+final class ShutdownService implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownService.class);
+
+ private final List closeables = Collections.synchronizedList(new ArrayList<>());
+
+ /**
+ * Wrap and register the callback into an idempotent {@link AutoCloseable}.
+ *
+ * @param closeCallback
+ * @return the callback as an idempotent {@link AutoCloseable}
+ */
+ AutoCloseable wrap(CloseCallback closeCallback) {
+ AtomicBoolean closingOrAlreadyClosed = new AtomicBoolean(false);
+ AutoCloseable idempotentCloseCallback =
+ new AutoCloseable() {
+ @Override
+ public void close() throws Exception {
+ if (closingOrAlreadyClosed.compareAndSet(false, true)) {
+ closeCallback.run();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return closeCallback.toString();
+ }
+ };
+ closeables.add(idempotentCloseCallback);
+ return idempotentCloseCallback;
+ }
+
+ /** Close all the registered callbacks, in the reverse order of registration. */
+ @Override
+ public synchronized void close() {
+ if (!closeables.isEmpty()) {
+ for (int i = closeables.size() - 1; i >= 0; i--) {
+ try {
+ closeables.get(i).close();
+ } catch (Exception e) {
+ LOGGER.warn("Could not properly execute closing step '{}'", closeables.get(i), e);
+ }
+ }
+ }
+ }
+
+ @FunctionalInterface
+ interface CloseCallback {
+
+ void run() throws Exception;
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
index d3ec44d8d0..ad41eccbaf 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
@@ -204,122 +204,136 @@ class StreamEnvironment implements Environment {
.collect(toList());
this.locators = List.copyOf(lctrs);
- this.executorServiceFactory =
- new DefaultExecutorServiceFactory(
- this.addresses.size(), 1, "rabbitmq-stream-locator-connection-");
-
- if (clientParametersPrototype.eventLoopGroup == null) {
- this.eventLoopGroup = Utils.eventLoopGroup();
- this.clientParametersPrototype =
- clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup);
- } else {
- this.eventLoopGroup = null;
- this.clientParametersPrototype =
- clientParametersPrototype
- .duplicate()
- .eventLoopGroup(clientParametersPrototype.eventLoopGroup);
- }
- ScheduledExecutorService executorService;
- if (scheduledExecutorService == null) {
- int threads = AVAILABLE_PROCESSORS;
- LOGGER.debug("Creating scheduled executor service with {} thread(s)", threads);
- ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-scheduler-");
- executorService = Executors.newScheduledThreadPool(threads, threadFactory);
- this.privateScheduleExecutorService = true;
- } else {
- executorService = scheduledExecutorService;
- this.privateScheduleExecutorService = false;
- }
- this.scheduledExecutorService = executorService;
-
- this.producersCoordinator =
- new ProducersCoordinator(
- this,
- maxProducersByConnection,
- maxTrackingConsumersByConnection,
- connectionNamingStrategy,
- coordinatorClientFactory(this, producerNodeRetryDelay),
- forceLeaderForProducers);
- this.consumersCoordinator =
- new ConsumersCoordinator(
- this,
- maxConsumersByConnection,
- connectionNamingStrategy,
- coordinatorClientFactory(this, consumerNodeRetryDelay),
- forceReplicaForConsumers,
- Utils.brokerPicker());
- this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
-
- ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-locator-scheduler-");
- this.locatorReconnectionScheduledExecutorService =
- Executors.newScheduledThreadPool(this.locators.size(), threadFactory);
-
- ClientParameters clientParametersForInit = locatorParametersCopy();
- Runnable locatorInitSequence =
- () -> {
- RuntimeException lastException = null;
- for (int i = 0; i < locators.size(); i++) {
- Address address = addresses.get(i % addresses.size());
- Locator locator = locator(i);
- address = addressResolver.resolve(address);
- String connectionName = connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
- Client.ClientParameters locatorParameters =
- clientParametersForInit
- .duplicate()
- .host(address.host())
- .port(address.port())
- .clientProperty("connection_name", connectionName)
- .shutdownListener(
- shutdownListener(locator, connectionNamingStrategy, clientFactory));
- try {
- Client client = clientFactory.apply(locatorParameters);
- locator.client(client);
- LOGGER.debug("Created locator connection '{}'", connectionName);
- LOGGER.debug("Locator connected to {}", address);
- } catch (RuntimeException e) {
- LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage());
- lastException = e;
+ ShutdownService shutdownService = new ShutdownService();
+ try {
+ this.executorServiceFactory =
+ new DefaultExecutorServiceFactory(
+ this.addresses.size(), 1, "rabbitmq-stream-locator-connection-");
+ shutdownService.wrap(this.executorServiceFactory::close);
+
+ if (clientParametersPrototype.eventLoopGroup == null) {
+ this.eventLoopGroup = Utils.eventLoopGroup();
+ shutdownService.wrap(() -> closeEventLoopGroup(this.eventLoopGroup));
+ this.clientParametersPrototype =
+ clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup);
+ } else {
+ this.eventLoopGroup = null;
+ this.clientParametersPrototype =
+ clientParametersPrototype
+ .duplicate()
+ .eventLoopGroup(clientParametersPrototype.eventLoopGroup);
+ }
+ ScheduledExecutorService executorService;
+ if (scheduledExecutorService == null) {
+ int threads = AVAILABLE_PROCESSORS;
+ LOGGER.debug("Creating scheduled executor service with {} thread(s)", threads);
+ ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-scheduler-");
+ executorService = Executors.newScheduledThreadPool(threads, threadFactory);
+ shutdownService.wrap(executorService::shutdownNow);
+ this.privateScheduleExecutorService = true;
+ } else {
+ executorService = scheduledExecutorService;
+ this.privateScheduleExecutorService = false;
+ }
+ this.scheduledExecutorService = executorService;
+
+ this.producersCoordinator =
+ new ProducersCoordinator(
+ this,
+ maxProducersByConnection,
+ maxTrackingConsumersByConnection,
+ connectionNamingStrategy,
+ coordinatorClientFactory(this, producerNodeRetryDelay),
+ forceLeaderForProducers);
+ shutdownService.wrap(this.producersCoordinator::close);
+ this.consumersCoordinator =
+ new ConsumersCoordinator(
+ this,
+ maxConsumersByConnection,
+ connectionNamingStrategy,
+ coordinatorClientFactory(this, consumerNodeRetryDelay),
+ forceReplicaForConsumers,
+ Utils.brokerPicker());
+ shutdownService.wrap(this.consumersCoordinator::close);
+ this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
+ shutdownService.wrap(this.offsetTrackingCoordinator::close);
+
+ ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-locator-scheduler-");
+ this.locatorReconnectionScheduledExecutorService =
+ Executors.newScheduledThreadPool(this.locators.size(), threadFactory);
+ shutdownService.wrap(this.locatorReconnectionScheduledExecutorService::shutdownNow);
+
+ ClientParameters clientParametersForInit = locatorParametersCopy();
+ Runnable locatorInitSequence =
+ () -> {
+ RuntimeException lastException = null;
+ for (int i = 0; i < locators.size(); i++) {
+ Address address = addresses.get(i % addresses.size());
+ Locator locator = locator(i);
+ address = addressResolver.resolve(address);
+ String connectionName = connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
+ Client.ClientParameters locatorParameters =
+ clientParametersForInit
+ .duplicate()
+ .host(address.host())
+ .port(address.port())
+ .clientProperty("connection_name", connectionName)
+ .shutdownListener(
+ shutdownListener(locator, connectionNamingStrategy, clientFactory));
+ try {
+ Client client = clientFactory.apply(locatorParameters);
+ locator.client(client);
+ LOGGER.debug("Created locator connection '{}'", connectionName);
+ LOGGER.debug("Locator connected to {}", address);
+ } catch (RuntimeException e) {
+ LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage());
+ lastException = e;
+ }
}
- }
- if (this.locators.stream().allMatch(Locator::isNotSet)) {
- throw lastException == null
- ? new StreamException("Not locator available")
- : lastException;
- } else {
- this.locators.forEach(
- l -> {
- if (l.isNotSet()) {
- ShutdownListener shutdownListener =
- shutdownListener(l, connectionNamingStrategy, clientFactory);
- Client.ClientParameters newLocatorParameters =
- this.locatorParametersCopy().shutdownListener(shutdownListener);
- scheduleLocatorConnection(
- newLocatorParameters,
- this.addressResolver,
- l,
- connectionNamingStrategy,
- clientFactory,
- this.locatorReconnectionScheduledExecutorService,
- this.recoveryBackOffDelayPolicy,
- l.label());
- }
- });
- }
- };
- if (lazyInit) {
- this.locatorInitializationSequence = locatorInitSequence;
- } else {
- locatorInitSequence.run();
- locatorsInitialized.set(true);
- this.locatorInitializationSequence = () -> {};
+ if (this.locators.stream().allMatch(Locator::isNotSet)) {
+ throw lastException == null
+ ? new StreamException("Not locator available")
+ : lastException;
+ } else {
+ this.locators.forEach(
+ l -> {
+ if (l.isNotSet()) {
+ ShutdownListener shutdownListener =
+ shutdownListener(l, connectionNamingStrategy, clientFactory);
+ Client.ClientParameters newLocatorParameters =
+ this.locatorParametersCopy().shutdownListener(shutdownListener);
+ scheduleLocatorConnection(
+ newLocatorParameters,
+ this.addressResolver,
+ l,
+ connectionNamingStrategy,
+ clientFactory,
+ this.locatorReconnectionScheduledExecutorService,
+ this.recoveryBackOffDelayPolicy,
+ l.label());
+ }
+ });
+ }
+ };
+ if (lazyInit) {
+ this.locatorInitializationSequence = locatorInitSequence;
+ } else {
+ locatorInitSequence.run();
+ locatorsInitialized.set(true);
+ this.locatorInitializationSequence = () -> {};
+ }
+ this.codec =
+ clientParametersPrototype.codec() == null
+ ? Codecs.DEFAULT
+ : clientParametersPrototype.codec();
+ this.clockRefreshFuture =
+ this.scheduledExecutorService.scheduleAtFixedRate(
+ namedRunnable(this.clock::refresh, "Background clock refresh"), 1, 1, SECONDS);
+ shutdownService.wrap(() -> this.clockRefreshFuture.cancel(false));
+ } catch (RuntimeException e) {
+ shutdownService.close();
+ throw e;
}
- this.codec =
- clientParametersPrototype.codec() == null
- ? Codecs.DEFAULT
- : clientParametersPrototype.codec();
- this.clockRefreshFuture =
- this.scheduledExecutorService.scheduleAtFixedRate(
- namedRunnable(this.clock::refresh, "Background clock refresh"), 1, 1, SECONDS);
}
private ShutdownListener shutdownListener(
@@ -717,20 +731,24 @@ public void close() {
if (this.locatorReconnectionScheduledExecutorService != null) {
this.locatorReconnectionScheduledExecutorService.shutdownNow();
}
- try {
- if (this.eventLoopGroup != null
- && (!this.eventLoopGroup.isShuttingDown() || !this.eventLoopGroup.isShutdown())) {
- LOGGER.debug("Closing Netty event loop group");
- this.eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
- }
- } catch (InterruptedException e) {
- LOGGER.info("Event loop group closing has been interrupted");
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- LOGGER.info("Event loop group closing failed", e);
- } catch (TimeoutException e) {
- LOGGER.info("Could not close event loop group in 10 seconds");
+ closeEventLoopGroup(this.eventLoopGroup);
+ }
+ }
+
+ private static void closeEventLoopGroup(EventLoopGroup eventLoopGroup) {
+ try {
+ if (eventLoopGroup != null
+ && (!eventLoopGroup.isShuttingDown() || !eventLoopGroup.isShutdown())) {
+ LOGGER.debug("Closing Netty event loop group");
+ eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
}
+ } catch (InterruptedException e) {
+ LOGGER.info("Event loop group closing has been interrupted");
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ LOGGER.info("Event loop group closing failed", e);
+ } catch (TimeoutException e) {
+ LOGGER.info("Could not close event loop group in 10 seconds");
}
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
index a5704e14b4..c4aeea7001 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
@@ -82,11 +82,10 @@
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
+@StreamTestInfrastructure
public class StreamEnvironmentTest {
EnvironmentBuilder environmentBuilder;
@@ -123,10 +122,14 @@ void environmentCreationShouldFailWithIncorrectVirtualHostInUri() {
}
@Test
- void environmentCreationShouldFailWithUrlUsingWrongPort() {
+ void environmentCreationShouldFailWithUrlUsingWrongPort() throws Exception {
+ int initialThreadCound = threads().size();
assertThatThrownBy(
() ->
environmentBuilder
+ .netty()
+ .eventLoopGroup(null)
+ .environmentBuilder()
.uri("rabbitmq-stream://guest:guest@localhost:4242/%2f")
.addressResolver(address -> new Address("localhost", 4242))
.build()
@@ -134,6 +137,8 @@ void environmentCreationShouldFailWithUrlUsingWrongPort() {
.isInstanceOf(StreamException.class)
.hasCauseInstanceOf(ConnectException.class)
.hasRootCauseMessage("Connection refused");
+ // no thread leak
+ waitAtMost(() -> threads().size() == initialThreadCound);
}
@Test
diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
index 52214838b8..32ba67282f 100644
--- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
+++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
@@ -1183,4 +1183,8 @@ boolean hasCompleted() {
return this.latch.get().getCount() == 0;
}
}
+
+ static Collection threads() {
+ return Thread.getAllStackTraces().keySet();
+ }
}