42
42
import org .springframework .integration .support .ErrorMessageStrategy ;
43
43
import org .springframework .integration .support .ErrorMessageUtils ;
44
44
import org .springframework .retry .RecoveryCallback ;
45
- import org .springframework .retry .RetryCallback ;
46
- import org .springframework .retry .RetryContext ;
47
- import org .springframework .retry .RetryListener ;
45
+ import org .springframework .retry .support .RetrySynchronizationManager ;
48
46
import org .springframework .retry .support .RetryTemplate ;
49
47
import org .springframework .util .Assert ;
50
48
import org .springframework .util .StringUtils ;
@@ -190,9 +188,6 @@ protected void onInit() throws Exception {
190
188
+ "send an error message when retries are exhausted" );
191
189
}
192
190
Listener messageListener = new Listener ();
193
- if (this .retryTemplate != null ) {
194
- this .retryTemplate .registerListener (messageListener );
195
- }
196
191
this .messageListenerContainer .setMessageListener (messageListener );
197
192
this .messageListenerContainer .afterPropertiesSet ();
198
193
if (!this .amqpTemplateExplicitlySet ) {
@@ -232,7 +227,9 @@ private void setAttributesIfNecessary(Message amqpMessage, org.springframework.m
232
227
attributesHolder .set (ErrorMessageUtils .getAttributeAccessor (null , null ));
233
228
}
234
229
if (needAttributes ) {
235
- AttributeAccessor attributes = attributesHolder .get ();
230
+ AttributeAccessor attributes = this .retryTemplate != null
231
+ ? RetrySynchronizationManager .getContext ()
232
+ : attributesHolder .get ();
236
233
if (attributes != null ) {
237
234
attributes .setAttribute (ErrorMessageUtils .INPUT_MESSAGE_CONTEXT_KEY , message );
238
235
attributes .setAttribute (AmqpMessageHeaderErrorMessageStrategy .AMQP_RAW_MESSAGE , amqpMessage );
@@ -251,7 +248,7 @@ protected AttributeAccessor getErrorMessageAttributes(org.springframework.messag
251
248
}
252
249
}
253
250
254
- protected class Listener implements ChannelAwareMessageListener , RetryListener {
251
+ protected class Listener implements ChannelAwareMessageListener {
255
252
256
253
@ SuppressWarnings ("unchecked" )
257
254
@ Override
@@ -362,26 +359,6 @@ private void process(Message message, org.springframework.messaging.Message<Obje
362
359
}
363
360
}
364
361
365
- @ Override
366
- public <T , E extends Throwable > boolean open (RetryContext context , RetryCallback <T , E > callback ) {
367
- if (AmqpInboundGateway .this .recoveryCallback != null ) {
368
- attributesHolder .set (context );
369
- }
370
- return true ;
371
- }
372
-
373
- @ Override
374
- public <T , E extends Throwable > void close (RetryContext context , RetryCallback <T , E > callback ,
375
- Throwable throwable ) {
376
- attributesHolder .remove ();
377
- }
378
-
379
- @ Override
380
- public <T , E extends Throwable > void onError (RetryContext context , RetryCallback <T , E > callback ,
381
- Throwable throwable ) {
382
- // Empty
383
- }
384
-
385
362
}
386
363
387
364
}
0 commit comments