Skip to content

Commit 97644e9

Browse files
committed
Improve Stream Template Test Coverage
1 parent 8c37c2d commit 97644e9

File tree

1 file changed

+117
-0
lines changed

1 file changed

+117
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.producer;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.BDDMockito.willAnswer;
24+
import static org.mockito.Mockito.mock;
25+
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
30+
import org.junit.jupiter.api.Test;
31+
32+
import org.springframework.amqp.support.converter.SimpleMessageConverter;
33+
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
34+
35+
import com.rabbitmq.stream.ConfirmationHandler;
36+
import com.rabbitmq.stream.ConfirmationStatus;
37+
import com.rabbitmq.stream.Constants;
38+
import com.rabbitmq.stream.Environment;
39+
import com.rabbitmq.stream.Message;
40+
import com.rabbitmq.stream.Producer;
41+
import com.rabbitmq.stream.ProducerBuilder;
42+
43+
/**
44+
* @author Gary Russell
45+
* @since 2.4.7
46+
*
47+
*/
48+
public class RabbitStreamTemplateTests {
49+
50+
@Test
51+
void handleConfirm() throws InterruptedException, ExecutionException {
52+
Environment env = mock(Environment.class);
53+
ProducerBuilder pb = mock(ProducerBuilder.class);
54+
given(env.producerBuilder()).willReturn(pb);
55+
Producer producer = mock(Producer.class);
56+
given(pb.build()).willReturn(producer);
57+
AtomicInteger which = new AtomicInteger();
58+
willAnswer(inv -> {
59+
ConfirmationHandler handler = inv.getArgument(1);
60+
ConfirmationStatus status = null;
61+
switch (which.getAndIncrement()) {
62+
case 0:
63+
status = new ConfirmationStatus(inv.getArgument(0), true, (short) 0);
64+
break;
65+
case 1:
66+
status = new ConfirmationStatus(inv.getArgument(0), false, Constants.CODE_MESSAGE_ENQUEUEING_FAILED);
67+
break;
68+
case 2:
69+
status = new ConfirmationStatus(inv.getArgument(0), false, Constants.CODE_PRODUCER_CLOSED);
70+
break;
71+
case 3:
72+
status = new ConfirmationStatus(inv.getArgument(0), false, Constants.CODE_PRODUCER_NOT_AVAILABLE);
73+
break;
74+
case 4:
75+
status = new ConfirmationStatus(inv.getArgument(0), false, Constants.CODE_PUBLISH_CONFIRM_TIMEOUT);
76+
break;
77+
case 5:
78+
status = new ConfirmationStatus(inv.getArgument(0), false, (short) -1);
79+
break;
80+
}
81+
handler.handle(status);
82+
return null;
83+
}).given(producer).send(any(), any());
84+
try (RabbitStreamTemplate template = new RabbitStreamTemplate(env, "foo")) {
85+
SimpleMessageConverter messageConverter = new SimpleMessageConverter();
86+
template.setMessageConverter(messageConverter);
87+
assertThat(template.messageConverter()).isSameAs(messageConverter);
88+
StreamMessageConverter converter = mock(StreamMessageConverter.class);
89+
given(converter.fromMessage(any())).willReturn(mock(Message.class));
90+
template.setStreamConverter(converter);
91+
assertThat(template.streamMessageConverter()).isSameAs(converter);
92+
CompletableFuture<Boolean> future = template.convertAndSend("foo");
93+
assertThat(future.get()).isTrue();
94+
CompletableFuture<Boolean> future1 = template.convertAndSend("foo");
95+
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> future1.get())
96+
.withCauseExactlyInstanceOf(StreamSendException.class)
97+
.withStackTraceContaining("Message Enqueueing Failed");
98+
CompletableFuture<Boolean> future2 = template.convertAndSend("foo");
99+
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> future2.get())
100+
.withCauseExactlyInstanceOf(StreamSendException.class)
101+
.withStackTraceContaining("Producer Closed");
102+
CompletableFuture<Boolean> future3 = template.convertAndSend("foo");
103+
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> future3.get())
104+
.withCauseExactlyInstanceOf(StreamSendException.class)
105+
.withStackTraceContaining("Producer Not Available");
106+
CompletableFuture<Boolean> future4 = template.convertAndSend("foo");
107+
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> future4.get())
108+
.withCauseExactlyInstanceOf(StreamSendException.class)
109+
.withStackTraceContaining("Publish Confirm Timeout");
110+
CompletableFuture<Boolean> future5 = template.convertAndSend("foo");
111+
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> future5.get())
112+
.withCauseExactlyInstanceOf(StreamSendException.class)
113+
.withStackTraceContaining("Unknown code: " + -1);
114+
}
115+
}
116+
117+
}

0 commit comments

Comments
 (0)