Skip to content

Commit 938b270

Browse files
artembilangaryrussell
authored andcommitted
GH-8770: Add PostgresSubsChannel.errorHandler (#8777)
* GH-8770: Add `PostgresSubsChannel.errorHandler` Fixes #8770 The problem with the `PostgresSubscribableChannel.notifyUpdate()` is that the try-catch block is outside the loop, so the loop will die on an exception, leaving further messages unprocessed. * Add ``PostgresSubscribableChannel.errorHandler` option to be invoked after a `RetryTemplate` and for every failed message. * The `askForMessage()` new logic is to catch an exception on a message and call `errorHandler` returning a `FALLBACK_STUB` to continue an outer loop in the `notifyUpdate()` **Cherry-pick to `6.1.x` & `6.0.x`** * * Rename private `PostgresSubscribableChannel.askForMessage()` method to more specific `pollAndDispatchMessage()`
1 parent 012a03f commit 938b270

File tree

2 files changed

+67
-11
lines changed

2 files changed

+67
-11
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.concurrent.Executor;
2121

2222
import org.springframework.core.log.LogAccessor;
23-
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2423
import org.springframework.integration.channel.AbstractSubscribableChannel;
2524
import org.springframework.integration.dispatcher.MessageDispatcher;
2625
import org.springframework.integration.dispatcher.UnicastingDispatcher;
@@ -31,6 +30,8 @@
3130
import org.springframework.transaction.PlatformTransactionManager;
3231
import org.springframework.transaction.support.TransactionTemplate;
3332
import org.springframework.util.Assert;
33+
import org.springframework.util.ErrorHandler;
34+
import org.springframework.util.ReflectionUtils;
3435

3536
/**
3637
* An {@link AbstractSubscribableChannel} for receiving push notifications for
@@ -53,6 +54,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
5354

5455
private static final LogAccessor LOGGER = new LogAccessor(PostgresSubscribableChannel.class);
5556

57+
private static final Optional<?> FALLBACK_STUB = Optional.of(new Object());
58+
5659
private final JdbcChannelMessageStore jdbcChannelMessageStore;
5760

5861
private final Object groupId;
@@ -65,7 +68,9 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
6568

6669
private RetryTemplate retryTemplate = RetryTemplate.builder().maxAttempts(1).build();
6770

68-
private Executor executor = new SimpleAsyncTaskExecutor();
71+
private ErrorHandler errorHandler = ReflectionUtils::rethrowRuntimeException;
72+
73+
private Executor executor;
6974

7075
private volatile boolean hasHandlers;
7176

@@ -77,6 +82,7 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
7782
*/
7883
public PostgresSubscribableChannel(JdbcChannelMessageStore jdbcChannelMessageStore,
7984
Object groupId, PostgresChannelMessageTableSubscriber messageTableSubscriber) {
85+
8086
Assert.notNull(jdbcChannelMessageStore, "A jdbcChannelMessageStore must be provided.");
8187
Assert.notNull(groupId, "A groupId must be set.");
8288
Assert.notNull(messageTableSubscriber, "A messageTableSubscriber must be set.");
@@ -117,6 +123,17 @@ public void setRetryTemplate(RetryTemplate retryTemplate) {
117123
this.retryTemplate = retryTemplate;
118124
}
119125

126+
/**
127+
* Set a {@link ErrorHandler} for messages which cannot be dispatched by this channel.
128+
* Used as a recovery callback after {@link RetryTemplate} execution throws an exception.
129+
* @param errorHandler the {@link ErrorHandler} to use.
130+
* @since 6.0.9
131+
*/
132+
public void setErrorHandler(ErrorHandler errorHandler) {
133+
Assert.notNull(errorHandler, "'errorHandler' must not be null.");
134+
this.errorHandler = errorHandler;
135+
}
136+
120137
@Override
121138
public boolean subscribe(MessageHandler handler) {
122139
boolean subscribed = super.subscribe(handler);
@@ -152,19 +169,29 @@ protected boolean doSend(Message<?> message, long timeout) {
152169
@Override
153170
public void notifyUpdate() {
154171
this.executor.execute(() -> {
172+
Optional<?> dispatchedMessage;
173+
do {
174+
dispatchedMessage = pollAndDispatchMessage();
175+
} while (dispatchedMessage.isPresent());
176+
});
177+
}
178+
179+
private Optional<?> pollAndDispatchMessage() {
180+
try {
181+
return doPollAndDispatchMessage();
182+
}
183+
catch (Exception ex) {
155184
try {
156-
Optional<Message<?>> dispatchedMessage;
157-
do {
158-
dispatchedMessage = askForMessage();
159-
} while (dispatchedMessage.isPresent());
185+
this.errorHandler.handleError(ex);
160186
}
161-
catch (Exception ex) {
187+
catch (Exception ex1) {
162188
LOGGER.error(ex, "Exception during message dispatch");
163189
}
164-
});
190+
return FALLBACK_STUB;
191+
}
165192
}
166193

167-
private Optional<Message<?>> askForMessage() {
194+
private Optional<?> doPollAndDispatchMessage() {
168195
if (this.hasHandlers) {
169196
if (this.transactionTemplate != null) {
170197
return this.retryTemplate.execute(context ->

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.CountDownLatch;
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.concurrent.atomic.AtomicReference;
2526

2627
import javax.sql.DataSource;
2728

@@ -46,6 +47,7 @@
4647
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
4748
import org.springframework.jdbc.datasource.init.ScriptUtils;
4849
import org.springframework.messaging.MessageHandler;
50+
import org.springframework.messaging.MessagingException;
4951
import org.springframework.messaging.support.GenericMessage;
5052
import org.springframework.retry.support.RetryTemplate;
5153
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -210,6 +212,34 @@ void testMessagesDispatchedInTransaction() throws InterruptedException {
210212
assertThat(messageStore.pollMessageFromGroup(groupId).getPayload()).isEqualTo("2");
211213
}
212214

215+
@ParameterizedTest
216+
@ValueSource(booleans = {true, false})
217+
void errorHandlerIsCalled(boolean transactionsEnabled) throws InterruptedException {
218+
if (transactionsEnabled) {
219+
postgresSubscribableChannel.setTransactionManager(transactionManager);
220+
}
221+
222+
AtomicReference<Throwable> exceptionReference = new AtomicReference<>();
223+
CountDownLatch errorHandlerLatch = new CountDownLatch(1);
224+
postgresSubscribableChannel.setErrorHandler(ex -> {
225+
exceptionReference.set(ex);
226+
errorHandlerLatch.countDown();
227+
});
228+
229+
postgresChannelMessageTableSubscriber.start();
230+
231+
postgresSubscribableChannel.subscribe(message -> {
232+
throw new RuntimeException("An error has occurred");
233+
});
234+
235+
messageStore.addMessageToGroup(groupId, new GenericMessage<>("1"));
236+
237+
assertThat(errorHandlerLatch.await(10, TimeUnit.SECONDS)).isTrue();
238+
assertThat(exceptionReference.get())
239+
.isInstanceOf(MessagingException.class)
240+
.hasStackTraceContaining("An error has occurred");
241+
}
242+
213243
@ParameterizedTest
214244
@ValueSource(booleans = {true, false})
215245
void testRetryOnErrorDuringDispatch(boolean transactionsEnabled) throws InterruptedException {
@@ -267,8 +297,7 @@ DataSourceInitializer dataSourceInitializer(DataSource dataSource) {
267297
ResourceDatabasePopulator databasePopulator =
268298
new ResourceDatabasePopulator(new ByteArrayResource(INTEGRATION_DB_SCRIPTS.getBytes()));
269299
databasePopulator.setSeparator(ScriptUtils.EOF_STATEMENT_SEPARATOR);
270-
dataSourceInitializer.setDatabasePopulator(
271-
databasePopulator);
300+
dataSourceInitializer.setDatabasePopulator(databasePopulator);
272301
return dataSourceInitializer;
273302
}
274303

0 commit comments

Comments
 (0)