Skip to content

Commit f777bba

Browse files
authored
Remove the use of deprecated type DirectProcessor in tests and samples (#41880)
1 parent d0f3d76 commit f777bba

File tree

8 files changed

+88
-106
lines changed

8 files changed

+88
-106
lines changed

sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventProcessorClientAggregateEventsSample.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,9 @@
1313
import org.slf4j.Logger;
1414
import org.slf4j.LoggerFactory;
1515
import reactor.core.publisher.ConnectableFlux;
16-
import reactor.core.publisher.DirectProcessor;
1716
import reactor.core.publisher.Flux;
18-
import reactor.core.publisher.FluxSink;
1917
import reactor.core.publisher.Mono;
18+
import reactor.core.publisher.Sinks;
2019
import reactor.core.scheduler.Scheduler;
2120
import reactor.core.scheduler.Schedulers;
2221

@@ -298,7 +297,7 @@ class MachineInformation implements AutoCloseable {
298297
private final Logger logger = LoggerFactory.getLogger(MachineInformation.class);
299298
private final AtomicReference<List<Integer>> temperatures = new AtomicReference<>(new ArrayList<>());
300299
private final ConnectableFlux<AverageTemperature> averageTemperatures;
301-
private final DirectProcessor<Boolean> onDispose = DirectProcessor.create();
300+
private final Sinks.One<Boolean> onDispose = Sinks.one();
302301
private final AtomicBoolean isDisposed = new AtomicBoolean();
303302

304303
private volatile Instant lastReported = Instant.EPOCH;
@@ -312,7 +311,7 @@ class MachineInformation implements AutoCloseable {
312311
MachineInformation(String identifier, Duration reportingInterval) {
313312
this.identifier = identifier;
314313
this.averageTemperatures = Flux.interval(reportingInterval)
315-
.takeUntilOther(onDispose)
314+
.takeUntilOther(onDispose.asMono())
316315
.map(unused -> {
317316
final Instant timeCalculated = Instant.now();
318317
final List<Integer> temperaturesInInterval = temperatures.getAndSet(new ArrayList<>());
@@ -381,9 +380,7 @@ public void close() {
381380
return;
382381
}
383382

384-
final FluxSink<Boolean> sink = onDispose.sink();
385-
sink.next(true);
386-
sink.complete();
383+
onDispose.emitValue(true, Sinks.EmitFailureHandler.FAIL_FAST);
387384
}
388385
}
389386

sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PublishStreamOfEvents.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@
77
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
88
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
10-
import reactor.core.publisher.DirectProcessor;
1110
import reactor.core.publisher.Flux;
12-
import reactor.core.publisher.FluxSink;
1311
import reactor.core.publisher.Mono;
12+
import reactor.core.publisher.Sinks;
1413

1514
import java.time.Duration;
1615
import java.util.Objects;
@@ -51,14 +50,14 @@ public static void main(String[] args) {
5150
Duration.ofSeconds(1), batchOptions);
5251

5352
// This represents a stream of events that we want to publish.
54-
final DirectProcessor<EventData> events = DirectProcessor.create();
53+
final Sinks.Many<EventData> events = Sinks.many().multicast().onBackpressureBuffer();
5554

5655
System.out.println("Publishing events...");
57-
publisher.publish(events).subscribe(unused -> System.out.println("Completed."),
56+
publisher.publish(events.asFlux()).subscribe(unused -> System.out.println("Completed."),
5857
error -> System.err.println("Error sending events: " + error),
5958
() -> System.out.println("Completed sending events."));
6059

61-
emitEvents(events.sink());
60+
emitEvents(events);
6261

6362
// The .subscribe() creation and assignment is not a blocking call. For the purpose of this example, we sleep
6463
// the thread so the program does not end before the send operation is complete.
@@ -75,9 +74,9 @@ public static void main(String[] args) {
7574
* Helper function that emits 50 events. The interval between each event is randomly selected between 0 - 250ms and
7675
* is a random substring of the lorem ipsum text.
7776
*
78-
* @param sink Sink for generated events.
77+
* @param events Sink for generated events.
7978
*/
80-
private static void emitEvents(FluxSink<EventData> sink) {
79+
private static void emitEvents(Sinks.Many<EventData> events) {
8180
final String contents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do "
8281
+ "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis "
8382
+ "nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure "
@@ -99,10 +98,10 @@ private static void emitEvents(FluxSink<EventData> sink) {
9998
final EventData event = new EventData(contents.substring(0, endIndex));
10099
event.getProperties().put(EVENT_NUMBER, String.valueOf(i));
101100

102-
sink.next(event);
101+
events.emitNext(event, Sinks.EmitFailureHandler.FAIL_FAST);
103102
}
104103

105-
sink.complete();
104+
events.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
106105
}
107106

108107
/**

sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumerTest.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
import org.mockito.Mockito;
3232
import org.mockito.MockitoAnnotations;
3333
import reactor.core.Disposable;
34-
import reactor.core.publisher.DirectProcessor;
3534
import reactor.core.publisher.Flux;
3635
import reactor.core.publisher.FluxSink;
3736
import reactor.core.publisher.Mono;
37+
import reactor.core.publisher.Sinks;
3838
import reactor.test.StepVerifier;
3939

4040
import java.nio.charset.StandardCharsets;
@@ -82,11 +82,8 @@ class EventHubPartitionAsyncConsumerTest {
8282

8383
private final EventPosition originalPosition = EventPosition.latest();
8484
private final AtomicReference<Supplier<EventPosition>> currentPosition = new AtomicReference<>(() -> originalPosition);
85-
private final DirectProcessor<AmqpEndpointState> endpointProcessor = DirectProcessor.create();
86-
private final FluxSink<AmqpEndpointState> endpointProcessorSink = endpointProcessor.sink();
87-
88-
private final DirectProcessor<Message> messageProcessor = DirectProcessor.create();
89-
private final FluxSink<Message> messageProcessorSink = messageProcessor.sink();
85+
final Sinks.Many<AmqpEndpointState> endpointStatesSink = Sinks.many().multicast().onBackpressureBuffer();
86+
final Sinks.Many<Message> messagesSink = Sinks.many().multicast().onBackpressureBuffer();
9087

9188
private MessageFluxWrapper linkProcessor;
9289
private EventHubPartitionAsyncConsumer consumer;
@@ -97,8 +94,8 @@ void setup() {
9794

9895
when(retryPolicy.getRetryOptions()).thenReturn(new AmqpRetryOptions());
9996

100-
when(link1.getEndpointStates()).thenReturn(endpointProcessor);
101-
when(link1.receive()).thenReturn(messageProcessor);
97+
when(link1.getEndpointStates()).thenReturn(endpointStatesSink.asFlux());
98+
when(link1.receive()).thenReturn(messagesSink.asFlux());
10299
when(link1.addCredits(anyInt())).thenReturn(Mono.empty());
103100

104101
when(link2.addCredits(anyInt())).thenReturn(Mono.empty());
@@ -139,9 +136,9 @@ void receivesMessages(boolean trackLastEnqueuedProperties) {
139136
// Act & Assert
140137
StepVerifier.create(consumer.receive())
141138
.then(() -> {
142-
endpointProcessorSink.next(AmqpEndpointState.ACTIVE);
143-
messageProcessorSink.next(message1);
144-
messageProcessorSink.next(message2);
139+
endpointStatesSink.emitNext(AmqpEndpointState.ACTIVE, Sinks.EmitFailureHandler.FAIL_FAST);
140+
messagesSink.emitNext(message1, Sinks.EmitFailureHandler.FAIL_FAST);
141+
messagesSink.emitNext(message2, Sinks.EmitFailureHandler.FAIL_FAST);
145142
})
146143
.assertNext(partitionEvent -> {
147144
verifyPartitionContext(partitionEvent.getPartitionContext());
@@ -195,8 +192,8 @@ void receiveMultipleTimes() {
195192
// Act & Assert
196193
StepVerifier.create(consumer.receive())
197194
.then(() -> {
198-
messageProcessorSink.next(message1);
199-
messageProcessorSink.next(message2);
195+
messagesSink.emitNext(message1, Sinks.EmitFailureHandler.FAIL_FAST);
196+
messagesSink.emitNext(message2, Sinks.EmitFailureHandler.FAIL_FAST);
200197
})
201198
.assertNext(partitionEvent -> {
202199
verifyPartitionContext(partitionEvent.getPartitionContext());
@@ -269,9 +266,9 @@ void listensToShutdownSignals() throws InterruptedException {
269266
});
270267

271268
// Act
272-
messageProcessorSink.next(message1);
273-
messageProcessorSink.next(message2);
274-
messageProcessorSink.next(message3);
269+
messagesSink.emitNext(message1, Sinks.EmitFailureHandler.FAIL_FAST);
270+
messagesSink.emitNext(message2, Sinks.EmitFailureHandler.FAIL_FAST);
271+
messagesSink.emitNext(message3, Sinks.EmitFailureHandler.FAIL_FAST);
275272

276273
linkProcessor.cancel();
277274

sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,9 @@
5555
import org.mockito.Mock;
5656
import org.mockito.Mockito;
5757
import org.mockito.MockitoAnnotations;
58-
import reactor.core.publisher.DirectProcessor;
5958
import reactor.core.publisher.Flux;
60-
import reactor.core.publisher.FluxSink;
6159
import reactor.core.publisher.Mono;
60+
import reactor.core.publisher.Sinks;
6261
import reactor.core.scheduler.Scheduler;
6362
import reactor.core.scheduler.Schedulers;
6463
import reactor.test.StepVerifier;
@@ -146,8 +145,7 @@ class EventHubProducerAsyncClientTest {
146145
.setDelay(Duration.ofMillis(500))
147146
.setMode(AmqpRetryMode.FIXED)
148147
.setTryTimeout(Duration.ofSeconds(10));
149-
private final DirectProcessor<AmqpEndpointState> endpointProcessor = DirectProcessor.create();
150-
private final FluxSink<AmqpEndpointState> endpointSink = endpointProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
148+
private final Sinks.Many<AmqpEndpointState> endpointStates = Sinks.many().multicast().onBackpressureBuffer();
151149
private EventHubProducerAsyncClient producer;
152150
private ConnectionCacheWrapper connectionProcessor;
153151
private ConnectionOptions connectionOptions;
@@ -163,8 +161,8 @@ void setup(TestInfo testInfo) {
163161
CLIENT_OPTIONS, SslDomain.VerifyMode.VERIFY_PEER_NAME,
164162
"client-product", "client-version");
165163

166-
when(connection.getEndpointStates()).thenReturn(endpointProcessor);
167-
endpointSink.next(AmqpEndpointState.ACTIVE);
164+
when(connection.getEndpointStates()).thenReturn(endpointStates.asFlux());
165+
endpointStates.emitNext(AmqpEndpointState.ACTIVE, Sinks.EmitFailureHandler.FAIL_FAST);
168166

169167
when(connection.closeAsync()).thenReturn(Mono.empty());
170168

@@ -1319,8 +1317,8 @@ void closesDedicatedConnectionOnlyOnce() {
13191317
@Test
13201318
void reopensOnFailure() {
13211319
// Arrange
1322-
when(connection.getEndpointStates()).thenReturn(endpointProcessor);
1323-
endpointSink.next(AmqpEndpointState.ACTIVE);
1320+
when(connection.getEndpointStates()).thenReturn(endpointStates.asFlux());
1321+
endpointStates.emitNext(AmqpEndpointState.ACTIVE, Sinks.EmitFailureHandler.FAIL_FAST);
13241322

13251323
EventHubReactorAmqpConnection[] connections = new EventHubReactorAmqpConnection[]{
13261324
connection, connection2, connection3
@@ -1342,14 +1340,14 @@ void reopensOnFailure() {
13421340
.thenReturn(Mono.just(sendLink));
13431341
when(sendLink.send(anyList())).thenReturn(Mono.empty());
13441342

1345-
final DirectProcessor<AmqpEndpointState> connectionState2 = DirectProcessor.create();
1346-
when(connection2.getEndpointStates()).thenReturn(connectionState2);
1343+
final Sinks.Many<AmqpEndpointState> connectionState2 = Sinks.many().multicast().onBackpressureBuffer();
1344+
when(connection2.getEndpointStates()).thenReturn(connectionState2.asFlux());
13471345
when(connection2.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions), eq(CLIENT_IDENTIFIER)))
13481346
.thenReturn(Mono.just(sendLink2));
13491347
when(sendLink2.send(any(Message.class))).thenReturn(Mono.empty());
13501348

1351-
final DirectProcessor<AmqpEndpointState> connectionState3 = DirectProcessor.create();
1352-
when(connection3.getEndpointStates()).thenReturn(connectionState3);
1349+
final Sinks.Many<AmqpEndpointState> connectionState3 = Sinks.many().multicast().onBackpressureBuffer();
1350+
when(connection3.getEndpointStates()).thenReturn(connectionState3.asFlux());
13531351
when(connection3.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions), eq(CLIENT_IDENTIFIER)))
13541352
.thenReturn(Mono.just(sendLink3));
13551353
when(sendLink3.send(anyList())).thenReturn(Mono.empty());
@@ -1360,8 +1358,8 @@ void reopensOnFailure() {
13601358
.verify(DEFAULT_TIMEOUT);
13611359

13621360
// Send in an error signal like a server busy condition.
1363-
endpointSink.error(new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "Test-message",
1364-
new AmqpErrorContext("test-namespace")));
1361+
endpointStates.emitError(new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "Test-message",
1362+
new AmqpErrorContext("test-namespace")), Sinks.EmitFailureHandler.FAIL_FAST);
13651363

13661364
StepVerifier.create(producer.send(testData2))
13671365
.expectComplete()
@@ -1385,8 +1383,8 @@ void reopensOnFailure() {
13851383
@Test
13861384
void closesOnNonTransientFailure() {
13871385
// Arrange
1388-
when(connection.getEndpointStates()).thenReturn(endpointProcessor);
1389-
endpointSink.next(AmqpEndpointState.ACTIVE);
1386+
when(connection.getEndpointStates()).thenReturn(endpointStates.asFlux());
1387+
endpointStates.emitNext(AmqpEndpointState.ACTIVE, Sinks.EmitFailureHandler.FAIL_FAST);
13901388

13911389
EventHubReactorAmqpConnection[] connections = new EventHubReactorAmqpConnection[]{
13921390
connection, connection2, connection3
@@ -1408,8 +1406,8 @@ void closesOnNonTransientFailure() {
14081406
.thenReturn(Mono.just(sendLink));
14091407
when(sendLink.send(anyList())).thenReturn(Mono.empty());
14101408

1411-
final DirectProcessor<AmqpEndpointState> connectionState2 = DirectProcessor.create();
1412-
when(connection2.getEndpointStates()).thenReturn(connectionState2);
1409+
final Sinks.Many<AmqpEndpointState> connectionState2 = Sinks.many().multicast().onBackpressureBuffer();
1410+
when(connection2.getEndpointStates()).thenReturn(connectionState2.asFlux());
14131411
when(connection2.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions), eq(CLIENT_IDENTIFIER)))
14141412
.thenReturn(Mono.just(sendLink2));
14151413
when(sendLink2.send(any(Message.class))).thenReturn(Mono.empty());
@@ -1423,7 +1421,7 @@ void closesOnNonTransientFailure() {
14231421
.verify(DEFAULT_TIMEOUT);
14241422

14251423
// Send in an error signal like authorization failure.
1426-
endpointSink.error(nonTransientError);
1424+
endpointStates.emitError(nonTransientError, Sinks.EmitFailureHandler.FAIL_FAST);
14271425

14281426
StepVerifier.create(producer.send(testData2))
14291427
.expectErrorSatisfies(error -> {
@@ -1453,8 +1451,8 @@ void closesOnNonTransientFailure() {
14531451
@Test
14541452
void resendMessageOnTransientLinkFailure() {
14551453
// Arrange
1456-
when(connection.getEndpointStates()).thenReturn(endpointProcessor);
1457-
endpointSink.next(AmqpEndpointState.ACTIVE);
1454+
when(connection.getEndpointStates()).thenReturn(endpointStates.asFlux());
1455+
endpointStates.emitNext(AmqpEndpointState.ACTIVE, Sinks.EmitFailureHandler.FAIL_FAST);
14581456

14591457
EventHubReactorAmqpConnection[] connections = new EventHubReactorAmqpConnection[]{connection, connection2};
14601458
connectionProcessor = createConnectionProcessor(connections, connectionOptions.getRetry(), false);
@@ -1484,12 +1482,12 @@ void resendMessageOnTransientLinkFailure() {
14841482
final Throwable error = new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "Test-message",
14851483
new AmqpErrorContext("test-namespace"));
14861484

1487-
endpointSink.error(error);
1485+
endpointStates.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST);
14881486
return Mono.error(error);
14891487
});
14901488

1491-
final DirectProcessor<AmqpEndpointState> connectionState2 = DirectProcessor.create();
1492-
when(connection2.getEndpointStates()).thenReturn(connectionState2);
1489+
final Sinks.Many<AmqpEndpointState> connectionState2 = Sinks.many().multicast().onBackpressureBuffer();
1490+
when(connection2.getEndpointStates()).thenReturn(connectionState2.asFlux());
14931491
when(connection2.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions), eq(CLIENT_IDENTIFIER)))
14941492
.thenReturn(Mono.just(sendLink2));
14951493
when(sendLink2.send(any(Message.class))).thenReturn(Mono.empty());

0 commit comments

Comments
 (0)