Skip to content

Commit 1f04149

Browse files
authored
ServiceBusSessionReactorReceiver: Replace the use of DirectProcessor with Sinks. (#43764)
* ServiceBusSessionReactorReceiver: Replace the use of DirectProcessor with Sinks. * Adding tests for ServiceBusSessionReactorReceiver
1 parent 3e41aba commit 1f04149

File tree

2 files changed

+183
-13
lines changed

2 files changed

+183
-13
lines changed

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReactorReceiver.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111
import org.apache.qpid.proton.message.Message;
1212
import reactor.core.Disposable;
1313
import reactor.core.Disposables;
14-
import reactor.core.publisher.DirectProcessor;
1514
import reactor.core.publisher.Flux;
16-
import reactor.core.publisher.FluxSink;
1715
import reactor.core.publisher.Mono;
1816
import reactor.core.publisher.Sinks;
1917

@@ -29,10 +27,8 @@ final class ServiceBusSessionReactorReceiver implements AmqpReceiveLink {
2927
private final String sessionId;
3028
private final AmqpReceiveLink sessionLink;
3129
private final boolean hasIdleTimeout;
32-
private final Sinks.Empty<Void> idleTimeoutSink = Sinks.empty();
33-
// TODO (anu|connie|liudmila); Discuss DirectProcessor is deprecated.
34-
private final DirectProcessor<Boolean> idleTimerProcessor = DirectProcessor.create();
35-
private final FluxSink<Boolean> idleTimerSink = idleTimerProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
30+
private final Sinks.Many<Boolean> nextItemIdleTimeoutSink = Sinks.many().multicast().onBackpressureBuffer();
31+
private final Sinks.Empty<Void> terminateEndpointStatesSink = Sinks.empty();
3632
private final Disposable.Composite disposables = Disposables.composite();
3733

3834
ServiceBusSessionReactorReceiver(ClientLogger logger, ServiceBusTracer tracer,
@@ -43,11 +39,12 @@ final class ServiceBusSessionReactorReceiver implements AmqpReceiveLink {
4339
this.hasIdleTimeout = sessionIdleTimeout != null;
4440
if (hasIdleTimeout) {
4541
this.disposables
46-
.add(Flux.switchOnNext(idleTimerProcessor.map(__ -> Mono.delay(sessionIdleTimeout))).subscribe(v -> {
47-
withLinkInfo(logger.atInfo()).addKeyValue("timeout", sessionIdleTimeout)
48-
.log("Did not a receive message within timeout.");
49-
idleTimeoutSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
50-
}));
42+
.add(Flux.switchOnNext(nextItemIdleTimeoutSink.asFlux().map(__ -> Mono.delay(sessionIdleTimeout)))
43+
.subscribe(v -> {
44+
withLinkInfo(logger.atInfo()).addKeyValue("timeout", sessionIdleTimeout)
45+
.log("Did not a receive message within timeout.");
46+
terminateEndpointStatesSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
47+
}));
5148
}
5249
this.disposables.add(session.beginLockRenew(tracer, maxSessionLockRenew));
5350
}
@@ -80,7 +77,7 @@ public String getEntityPath() {
8077
public Flux<AmqpEndpointState> getEndpointStates() {
8178
final Flux<AmqpEndpointState> endpointStates;
8279
if (hasIdleTimeout) {
83-
endpointStates = sessionLink.getEndpointStates().takeUntilOther(idleTimeoutSink.asMono());
80+
endpointStates = sessionLink.getEndpointStates().takeUntilOther(terminateEndpointStatesSink.asMono());
8481
} else {
8582
endpointStates = sessionLink.getEndpointStates();
8683
}
@@ -94,7 +91,7 @@ public Flux<AmqpEndpointState> getEndpointStates() {
9491
public Flux<Message> receive() {
9592
if (hasIdleTimeout) {
9693
return sessionLink.receive().doOnNext(m -> {
97-
idleTimerSink.next(true);
94+
nextItemIdleTimeoutSink.emitNext(true, Sinks.EmitFailureHandler.FAIL_FAST);
9895
});
9996
} else {
10097
return sessionLink.receive();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.messaging.servicebus;
5+
6+
import com.azure.core.amqp.AmqpEndpointState;
7+
import com.azure.core.util.logging.ClientLogger;
8+
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
9+
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
10+
import org.apache.qpid.proton.message.Message;
11+
import org.junit.jupiter.api.AfterEach;
12+
import org.junit.jupiter.api.Assertions;
13+
import org.junit.jupiter.api.BeforeEach;
14+
import org.junit.jupiter.api.Test;
15+
import org.mockito.Mock;
16+
import org.mockito.Mockito;
17+
import org.mockito.MockitoAnnotations;
18+
import reactor.core.Disposable;
19+
import reactor.core.Disposables;
20+
import reactor.core.publisher.Flux;
21+
import reactor.core.publisher.Mono;
22+
import reactor.core.scheduler.Schedulers;
23+
import reactor.test.StepVerifier;
24+
import reactor.test.publisher.TestPublisher;
25+
26+
import java.time.Duration;
27+
28+
import static org.junit.jupiter.api.Assertions.assertEquals;
29+
import static org.mockito.ArgumentMatchers.any;
30+
import static org.mockito.ArgumentMatchers.anyInt;
31+
import static org.mockito.Mockito.doNothing;
32+
import static org.mockito.Mockito.mock;
33+
import static org.mockito.Mockito.verify;
34+
import static org.mockito.Mockito.when;
35+
36+
/**
37+
* Tests {@link ServiceBusSessionReactorReceiver}.
38+
*/
39+
public class ServiceBusSessionReactorReceiverTest {
40+
private static final String NAMESPACE = "contoso.servicebus.windows.net";
41+
private static final String ENTITY_PATH = "queue0";
42+
private static final String SESSION_ID = "1";
43+
private static final String LINK_NAME = "queue0-session-1";
44+
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSessionReceiverTest.class);
45+
46+
private final TestPublisher<AmqpEndpointState> endpointStates = TestPublisher.createCold();
47+
private final TestPublisher<Message> messagePublisher = TestPublisher.createCold();
48+
private AutoCloseable autoCloseable;
49+
50+
@Mock
51+
private ServiceBusReceiveLink sessionLink;
52+
53+
@BeforeEach
54+
public void beforeEach() {
55+
autoCloseable = MockitoAnnotations.openMocks(this);
56+
when(sessionLink.receive()).thenReturn(messagePublisher.flux().publishOn(Schedulers.single()));
57+
when(sessionLink.getHostname()).thenReturn(NAMESPACE);
58+
when(sessionLink.getEntityPath()).thenReturn(ENTITY_PATH);
59+
when(sessionLink.getLinkName()).thenReturn(LINK_NAME);
60+
when(sessionLink.getEndpointStates()).thenReturn(endpointStates.flux());
61+
when(sessionLink.addCredits(anyInt())).thenReturn(Mono.empty());
62+
when(sessionLink.closeAsync()).thenReturn(Mono.empty());
63+
endpointStates.next(AmqpEndpointState.ACTIVE);
64+
}
65+
66+
@AfterEach
67+
public void afterEach() throws Exception {
68+
if (autoCloseable != null) {
69+
autoCloseable.close();
70+
}
71+
Mockito.framework().clearInlineMock(this);
72+
}
73+
74+
@Test
75+
public void properties() {
76+
// Arrange
77+
final Disposable lockRenewDisposable = Disposables.single();
78+
doNothing().when(sessionLink).dispose();
79+
final ServiceBusSessionAcquirer.Session session = mock(ServiceBusSessionAcquirer.Session.class);
80+
when(session.getId()).thenReturn(SESSION_ID);
81+
when(session.getLink()).thenReturn(sessionLink);
82+
when(session.beginLockRenew(any(ServiceBusTracer.class), any(Duration.class))).thenReturn(lockRenewDisposable);
83+
final ServiceBusSessionReactorReceiver receiver = new ServiceBusSessionReactorReceiver(LOGGER,
84+
mock(ServiceBusTracer.class), session, null, Duration.ofSeconds(1));
85+
86+
// Act and assert
87+
try {
88+
Assertions.assertEquals(SESSION_ID, receiver.getSessionId());
89+
Assertions.assertEquals(NAMESPACE, receiver.getHostname());
90+
Assertions.assertEquals(LINK_NAME, receiver.getLinkName());
91+
Assertions.assertEquals(ENTITY_PATH, receiver.getEntityPath());
92+
} finally {
93+
receiver.dispose();
94+
}
95+
}
96+
97+
@Test
98+
public void disposeResources() {
99+
// Arrange
100+
final Disposable lockRenewDisposable = Disposables.single();
101+
doNothing().when(sessionLink).dispose();
102+
final ServiceBusSessionAcquirer.Session session = mock(ServiceBusSessionAcquirer.Session.class);
103+
when(session.getId()).thenReturn(SESSION_ID);
104+
when(session.getLink()).thenReturn(sessionLink);
105+
when(session.beginLockRenew(any(ServiceBusTracer.class), any(Duration.class))).thenReturn(lockRenewDisposable);
106+
final ServiceBusSessionReactorReceiver receiver = new ServiceBusSessionReactorReceiver(LOGGER,
107+
mock(ServiceBusTracer.class), session, null, Duration.ofSeconds(1));
108+
109+
// Act
110+
receiver.dispose();
111+
112+
// Assert
113+
verify(sessionLink).dispose();
114+
Assertions.assertTrue(lockRenewDisposable.isDisposed());
115+
}
116+
117+
@Test
118+
public void receivesThenCompletes() {
119+
// Arrange
120+
doNothing().when(sessionLink).dispose();
121+
final ServiceBusSessionAcquirer.Session session = mock(ServiceBusSessionAcquirer.Session.class);
122+
when(session.getId()).thenReturn(SESSION_ID);
123+
when(session.getLink()).thenReturn(sessionLink);
124+
when(session.beginLockRenew(any(ServiceBusTracer.class), any(Duration.class))).thenReturn(Disposables.single());
125+
final Message message0 = mock(Message.class);
126+
final Message message1 = mock(Message.class);
127+
final ServiceBusSessionReactorReceiver receiver = new ServiceBusSessionReactorReceiver(LOGGER,
128+
mock(ServiceBusTracer.class), session, null, Duration.ofSeconds(1));
129+
130+
// Act and assert
131+
try {
132+
final Flux<Message> messages = receiver.receive();
133+
StepVerifier.create(messages)
134+
.then(() -> messagePublisher.next(message0, message1))
135+
.then(messagePublisher::complete)
136+
.assertNext(m -> assertEquals(message0, m))
137+
.assertNext(m -> assertEquals(message1, m))
138+
.expectComplete()
139+
.verify(Duration.ofSeconds(5));
140+
} finally {
141+
receiver.dispose();
142+
}
143+
}
144+
145+
@Test
146+
public void completesOnIdleTimeout() {
147+
// Arrange
148+
final Duration idleTimeout = Duration.ofSeconds(3);
149+
doNothing().when(sessionLink).dispose();
150+
final ServiceBusSessionAcquirer.Session session = mock(ServiceBusSessionAcquirer.Session.class);
151+
when(session.getId()).thenReturn(SESSION_ID);
152+
when(session.getLink()).thenReturn(sessionLink);
153+
when(session.beginLockRenew(any(ServiceBusTracer.class), any(Duration.class))).thenReturn(Disposables.single());
154+
final Message message0 = mock(Message.class);
155+
final Message message1 = mock(Message.class);
156+
final ServiceBusSessionReactorReceiver receiver = new ServiceBusSessionReactorReceiver(LOGGER,
157+
mock(ServiceBusTracer.class), session, idleTimeout, Duration.ofSeconds(1));
158+
159+
// Act and assert
160+
try {
161+
final Flux<Message> messages = receiver.receive();
162+
final Flux<AmqpEndpointState> states = receiver.getEndpointStates();
163+
StepVerifier.create(messages.takeUntilOther(states.then()))
164+
.then(() -> messagePublisher.next(message0, message1))
165+
.assertNext(m -> assertEquals(message0, m))
166+
.assertNext(m -> assertEquals(message1, m))
167+
.expectComplete()
168+
.verify(Duration.ofSeconds(15));
169+
} finally {
170+
receiver.dispose();
171+
}
172+
}
173+
}

0 commit comments

Comments
 (0)