|
15 | 15 | */
|
16 | 16 | package io.lettuce.core;
|
17 | 17 |
|
18 |
| -import static io.lettuce.core.ClientOptions.DisconnectedBehavior.REJECT_COMMANDS; |
| 18 | +import static io.lettuce.core.ClientOptions.DisconnectedBehavior.*; |
19 | 19 | import static io.lettuce.core.ScriptOutputType.INTEGER;
|
20 |
| -import static org.assertj.core.api.Assertions.assertThat; |
| 20 | +import static org.assertj.core.api.Assertions.*; |
21 | 21 |
|
22 | 22 | import java.time.Duration;
|
23 |
| -import java.util.ArrayList; |
24 | 23 | import java.util.List;
|
25 | 24 | import java.util.concurrent.CountDownLatch;
|
26 | 25 | import java.util.concurrent.TimeUnit;
|
27 | 26 |
|
28 |
| -import javax.enterprise.inject.New; |
29 | 27 | import javax.inject.Inject;
|
30 | 28 |
|
31 | 29 | import org.junit.jupiter.api.BeforeEach;
|
|
35 | 33 | import org.reactivestreams.Subscriber;
|
36 | 34 | import org.reactivestreams.Subscription;
|
37 | 35 |
|
38 |
| -import reactor.core.publisher.Flux; |
39 |
| -import reactor.core.publisher.Mono; |
40 |
| -import reactor.test.StepVerifier; |
41 | 36 | import io.lettuce.core.api.StatefulRedisConnection;
|
42 | 37 | import io.lettuce.core.api.reactive.RedisReactiveCommands;
|
43 | 38 | import io.lettuce.core.api.sync.RedisCommands;
|
|
46 | 41 | import io.lettuce.test.Wait;
|
47 | 42 | import io.lettuce.test.WithPassword;
|
48 | 43 | import io.lettuce.test.condition.EnabledOnCommand;
|
| 44 | +import reactor.core.publisher.Flux; |
| 45 | +import reactor.core.publisher.Mono; |
| 46 | +import reactor.test.StepVerifier; |
49 | 47 |
|
50 | 48 | /**
|
51 | 49 | * @author Mark Paluch
|
@@ -101,67 +99,11 @@ void getStatefulConnection() {
|
101 | 99 | assertThat(reactive.getStatefulConnection()).isSameAs(connection);
|
102 | 100 | }
|
103 | 101 |
|
104 |
| - @Test |
105 |
| - @Inject |
106 |
| - void testCancelCommand(@New StatefulRedisConnection<String, String> connection) { |
107 |
| - |
108 |
| - RedisReactiveCommands<String, String> reactive = connection.reactive(); |
109 |
| - List<Object> result = new ArrayList<>(); |
110 |
| - reactive.clientPause(2000).subscribe(); |
111 |
| - Delay.delay(Duration.ofMillis(50)); |
112 |
| - |
113 |
| - reactive.set(key, value).subscribe(new CompletionSubscriber(result)); |
114 |
| - Delay.delay(Duration.ofMillis(50)); |
115 |
| - |
116 |
| - reactive.reset(); |
117 |
| - assertThat(result).isEmpty(); |
118 |
| - } |
119 |
| - |
120 | 102 | @Test
|
121 | 103 | void testEcho() {
|
122 | 104 | StepVerifier.create(reactive.echo("echo")).expectNext("echo").verifyComplete();
|
123 | 105 | }
|
124 | 106 |
|
125 |
| - @Test |
126 |
| - @Inject |
127 |
| - void testMonoMultiCancel(@New StatefulRedisConnection<String, String> connection) { |
128 |
| - |
129 |
| - RedisReactiveCommands<String, String> reactive = connection.reactive(); |
130 |
| - |
131 |
| - List<Object> result = new ArrayList<>(); |
132 |
| - reactive.clientPause(1000).subscribe(); |
133 |
| - Delay.delay(Duration.ofMillis(50)); |
134 |
| - |
135 |
| - Mono<String> set = reactive.set(key, value); |
136 |
| - set.subscribe(new CompletionSubscriber(result)); |
137 |
| - set.subscribe(new CompletionSubscriber(result)); |
138 |
| - set.subscribe(new CompletionSubscriber(result)); |
139 |
| - Delay.delay(Duration.ofMillis(50)); |
140 |
| - |
141 |
| - reactive.reset(); |
142 |
| - assertThat(result).isEmpty(); |
143 |
| - } |
144 |
| - |
145 |
| - @Test |
146 |
| - @Inject |
147 |
| - void testFluxCancel(@New StatefulRedisConnection<String, String> connection) { |
148 |
| - |
149 |
| - RedisReactiveCommands<String, String> reactive = connection.reactive(); |
150 |
| - |
151 |
| - List<Object> result = new ArrayList<>(); |
152 |
| - reactive.clientPause(1000).subscribe(); |
153 |
| - Delay.delay(Duration.ofMillis(100)); |
154 |
| - |
155 |
| - Flux<KeyValue<String, String>> set = reactive.mget(key, value); |
156 |
| - set.subscribe(new CompletionSubscriber(result)); |
157 |
| - set.subscribe(new CompletionSubscriber(result)); |
158 |
| - set.subscribe(new CompletionSubscriber(result)); |
159 |
| - Delay.delay(Duration.ofMillis(100)); |
160 |
| - |
161 |
| - reactive.reset(); |
162 |
| - assertThat(result).isEmpty(); |
163 |
| - } |
164 |
| - |
165 | 107 | @Test
|
166 | 108 | void multiSubscribe() throws Exception {
|
167 | 109 |
|
|
0 commit comments