|
25 | 25 |
|
26 | 26 | import java.util.Collection;
|
27 | 27 | import java.util.Map;
|
| 28 | +import java.util.Set; |
28 | 29 | import java.util.concurrent.CyclicBarrier;
|
29 | 30 | import java.util.concurrent.TimeUnit;
|
30 | 31 |
|
@@ -110,19 +111,24 @@ public void handleMessage(Message<?> message) throws MessagingException {
|
110 | 111 | assertEquals(0, TestUtils.getPropertyValue(factory, "connectionListener.delegates", Collection.class).size());
|
111 | 112 | }
|
112 | 113 |
|
| 114 | + @SuppressWarnings("unchecked") |
113 | 115 | private void waitForNewConsumer(PublishSubscribeAmqpChannel channel, BlockingQueueConsumer consumer)
|
114 | 116 | throws Exception {
|
115 |
| - BlockingQueueConsumer newConsumer = (BlockingQueueConsumer) TestUtils |
116 |
| - .getPropertyValue(channel, "container.consumers", Map.class).keySet().iterator().next(); |
| 117 | + |
| 118 | + final Object consumersMonitor = TestUtils.getPropertyValue(channel, "container.consumersMonitor"); |
117 | 119 | int n = 0;
|
118 |
| - boolean newConsumerIsConsuming = newConsumer != consumer && TestUtils.getPropertyValue(newConsumer, |
119 |
| - "consumerTags", Map.class).size() > 0; |
120 |
| - while (n++ < 100 && !newConsumerIsConsuming) { |
| 120 | + while (n++ < 100) { |
| 121 | + Set<BlockingQueueConsumer> consumers = TestUtils.getPropertyValue(channel, "container.consumers", Map.class).keySet(); |
| 122 | + synchronized (consumersMonitor) { |
| 123 | + if (!consumers.isEmpty()) { |
| 124 | + BlockingQueueConsumer newConsumer = consumers.iterator().next(); |
| 125 | + if (newConsumer != consumer && TestUtils.getPropertyValue(newConsumer, |
| 126 | + "consumerTags", Map.class).size() > 0) { |
| 127 | + break; |
| 128 | + } |
| 129 | + } |
| 130 | + } |
121 | 131 | Thread.sleep(100);
|
122 |
| - newConsumer = (BlockingQueueConsumer) TestUtils |
123 |
| - .getPropertyValue(channel, "container.consumers", Map.class).keySet().iterator().next(); |
124 |
| - newConsumerIsConsuming = newConsumer != consumer && TestUtils.getPropertyValue(newConsumer, |
125 |
| - "consumerTags", Map.class).size() > 0; |
126 | 132 | }
|
127 | 133 | assertTrue("Failed to restart consumer", n < 100);
|
128 | 134 | }
|
|
0 commit comments