Skip to content

Commit 038f8f6

Browse files
authored
GH-1473: Move RabbitFutures to Top Level Classes
- to aid migration from 2.4.x to 3.0.x so that the return types will not change * Increase test coverage. * Fix since. * Add author to new files.
1 parent 819630c commit 038f8f6

File tree

8 files changed

+374
-175
lines changed

8 files changed

+374
-175
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate2.java

Lines changed: 50 additions & 162 deletions
Large diffs are not rendered by default.
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit;
18+
19+
import java.util.concurrent.ScheduledFuture;
20+
import java.util.function.BiConsumer;
21+
import java.util.function.Function;
22+
23+
import org.springframework.amqp.core.Message;
24+
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
25+
import org.springframework.core.ParameterizedTypeReference;
26+
27+
/**
28+
* A {@link RabbitFuture} with a return type of the template's
29+
* generic parameter.
30+
* @param <C> the type.
31+
*
32+
* @author Gary Russell
33+
* @since 2.4.7
34+
*/
35+
public class RabbitConverterFuture<C> extends RabbitFuture<C> {
36+
37+
private volatile ParameterizedTypeReference<C> returnType;
38+
39+
RabbitConverterFuture(String correlationId, Message requestMessage,
40+
BiConsumer<String, ChannelHolder> canceler,
41+
Function<RabbitFuture<?>, ScheduledFuture<?>> timeoutTaskFunction) {
42+
43+
super(correlationId, requestMessage, canceler, timeoutTaskFunction);
44+
}
45+
46+
public ParameterizedTypeReference<C> getReturnType() {
47+
return this.returnType;
48+
}
49+
50+
public void setReturnType(ParameterizedTypeReference<C> returnType) {
51+
this.returnType = returnType;
52+
}
53+
54+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.ScheduledFuture;
21+
import java.util.function.BiConsumer;
22+
import java.util.function.Function;
23+
24+
import org.springframework.amqp.core.Message;
25+
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
26+
27+
/**
28+
* Base class for {@link CompletableFuture}s returned by {@link AsyncRabbitTemplate2}.
29+
* @param <T> the type.
30+
*
31+
* @author Gary Russell
32+
* @since 2.4.7
33+
*/
34+
public abstract class RabbitFuture<T> extends CompletableFuture<T> {
35+
36+
private final String correlationId;
37+
38+
private final Message requestMessage;
39+
40+
private final BiConsumer<String, ChannelHolder> canceler;
41+
42+
private final Function<RabbitFuture<?>, ScheduledFuture<?>> timeoutTaskFunction;
43+
44+
private ScheduledFuture<?> timeoutTask;
45+
46+
private volatile CompletableFuture<Boolean> confirm;
47+
48+
private String nackCause;
49+
50+
private ChannelHolder channelHolder;
51+
52+
protected RabbitFuture(String correlationId, Message requestMessage, BiConsumer<String, ChannelHolder> canceler,
53+
Function<RabbitFuture<?>, ScheduledFuture<?>> timeoutTaskFunction) {
54+
55+
this.correlationId = correlationId;
56+
this.requestMessage = requestMessage;
57+
this.canceler = canceler;
58+
this.timeoutTaskFunction = timeoutTaskFunction;
59+
}
60+
61+
void setChannelHolder(ChannelHolder channel) {
62+
this.channelHolder = channel;
63+
}
64+
65+
String getCorrelationId() {
66+
return this.correlationId;
67+
}
68+
69+
ChannelHolder getChannelHolder() {
70+
return this.channelHolder;
71+
}
72+
73+
Message getRequestMessage() {
74+
return this.requestMessage;
75+
}
76+
77+
@Override
78+
public boolean cancel(boolean mayInterruptIfRunning) {
79+
if (this.timeoutTask != null) {
80+
this.timeoutTask.cancel(true);
81+
}
82+
this.canceler.accept(this.correlationId, this.channelHolder);
83+
return super.cancel(mayInterruptIfRunning);
84+
}
85+
86+
/**
87+
* When confirms are enabled contains a {@link CompletableFuture}
88+
* for the confirmation.
89+
* @return the future.
90+
*/
91+
public CompletableFuture<Boolean> getConfirm() {
92+
return this.confirm;
93+
}
94+
95+
void setConfirm(CompletableFuture<Boolean> confirm) {
96+
this.confirm = confirm;
97+
}
98+
99+
/**
100+
* When confirms are enabled and a nack is received, contains
101+
* the cause for the nack, if any.
102+
* @return the cause.
103+
*/
104+
public String getNackCause() {
105+
return this.nackCause;
106+
}
107+
108+
void setNackCause(String nackCause) {
109+
this.nackCause = nackCause;
110+
}
111+
112+
void startTimer() {
113+
this.timeoutTask = this.timeoutTaskFunction.apply(this);
114+
}
115+
116+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit;
18+
19+
import java.util.concurrent.ScheduledFuture;
20+
import java.util.function.BiConsumer;
21+
import java.util.function.Function;
22+
23+
import org.springframework.amqp.core.Message;
24+
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
25+
26+
/**
27+
* A {@link RabbitFuture} with a return type of {@link Message}.
28+
*
29+
* @author Gary Russell
30+
* @since 2.4.7
31+
*/
32+
public class RabbitMessageFuture extends RabbitFuture<Message> {
33+
34+
RabbitMessageFuture(String correlationId, Message requestMessage, BiConsumer<String, ChannelHolder> canceler,
35+
Function<RabbitFuture<?>, ScheduledFuture<?>> timeoutTaskFunction) {
36+
37+
super(correlationId, requestMessage, canceler, timeoutTaskFunction);
38+
}
39+
40+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit;
18+
19+
import java.util.concurrent.ConcurrentMap;
20+
21+
import org.springframework.amqp.core.AmqpReplyTimeoutException;
22+
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer;
23+
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
24+
import org.springframework.lang.Nullable;
25+
26+
/**
27+
* A {@link Runnable} used to time out a {@link RabbitFuture}.
28+
*
29+
* @author Gary Russell
30+
* @since 2.4.7
31+
*/
32+
public class TimeoutTask implements Runnable {
33+
34+
private final RabbitFuture<?> future;
35+
36+
private final ConcurrentMap<String, RabbitFuture<?>> pending;
37+
38+
private final DirectReplyToMessageListenerContainer container;
39+
40+
TimeoutTask(RabbitFuture<?> future, ConcurrentMap<String, RabbitFuture<?>> pending,
41+
@Nullable DirectReplyToMessageListenerContainer container) {
42+
43+
this.future = future;
44+
this.pending = pending;
45+
this.container = container;
46+
}
47+
48+
@Override
49+
public void run() {
50+
this.pending.remove(this.future.getCorrelationId());
51+
ChannelHolder holder = this.future.getChannelHolder();
52+
if (holder != null && this.container != null) {
53+
this.container.releaseConsumerFor(holder, false, null); // NOSONAR
54+
}
55+
this.future.completeExceptionally(
56+
new AmqpReplyTimeoutException("Reply timed out", this.future.getRequestMessage()));
57+
}
58+
59+
}

0 commit comments

Comments
 (0)