|
33 | 33 | import java.util.concurrent.TimeUnit;
|
34 | 34 | import java.util.concurrent.atomic.AtomicBoolean;
|
35 | 35 | import java.util.concurrent.atomic.AtomicReference;
|
| 36 | +import java.util.function.Consumer; |
36 | 37 | import java.util.function.Function;
|
37 | 38 |
|
38 | 39 | import org.aopalliance.aop.Advice;
|
|
52 | 53 | import org.springframework.context.annotation.ComponentScan;
|
53 | 54 | import org.springframework.context.annotation.Configuration;
|
54 | 55 | import org.springframework.context.annotation.Scope;
|
| 56 | +import org.springframework.core.task.TaskExecutor; |
55 | 57 | import org.springframework.integration.MessageDispatchingException;
|
56 | 58 | import org.springframework.integration.MessageRejectedException;
|
57 | 59 | import org.springframework.integration.annotation.MessageEndpoint;
|
@@ -491,6 +493,26 @@ public void testPrototypeIsNotOverridden() {
|
491 | 493 | this.flow2WithPrototypeHandlerConsumer.getHandler());
|
492 | 494 | }
|
493 | 495 |
|
| 496 | + @Autowired |
| 497 | + @Qualifier("globalErrorChannelResolutionFunction") |
| 498 | + private Consumer<String> globalErrorChannelResolutionGateway; |
| 499 | + |
| 500 | + @Autowired |
| 501 | + SubscribableChannel errorChannel; |
| 502 | + |
| 503 | + @Test |
| 504 | + public void testGlobalErrorChannelResolutionFlow() throws InterruptedException { |
| 505 | + CountDownLatch errorMessageLatch = new CountDownLatch(1); |
| 506 | + MessageHandler errorMessageHandler = m -> errorMessageLatch.countDown(); |
| 507 | + this.errorChannel.subscribe(errorMessageHandler); |
| 508 | + |
| 509 | + this.globalErrorChannelResolutionGateway.accept("foo"); |
| 510 | + |
| 511 | + assertThat(errorMessageLatch.await(10, TimeUnit.SECONDS)).isTrue(); |
| 512 | + |
| 513 | + this.errorChannel.unsubscribe(errorMessageHandler); |
| 514 | + } |
| 515 | + |
494 | 516 | @MessagingGateway
|
495 | 517 | public interface ControlBusGateway {
|
496 | 518 |
|
@@ -882,6 +904,16 @@ public IntegrationFlow flow2WithPrototypeHandler(
|
882 | 904 | return f -> f.handle(handler, e -> e.id("flow2WithPrototypeHandlerConsumer"));
|
883 | 905 | }
|
884 | 906 |
|
| 907 | + @Bean |
| 908 | + public IntegrationFlow globalErrorChannelResolutionFlow(@Qualifier("taskScheduler") TaskExecutor taskExecutor) { |
| 909 | + return IntegrationFlows.from(Consumer.class, "globalErrorChannelResolutionFunction") |
| 910 | + .channel(c -> c.executor(taskExecutor)) |
| 911 | + .handle((GenericHandler<?>) (p, h) -> { |
| 912 | + throw new RuntimeException("intentional"); |
| 913 | + }) |
| 914 | + .get(); |
| 915 | + } |
| 916 | + |
885 | 917 | }
|
886 | 918 |
|
887 | 919 | @Service
|
|
0 commit comments