Skip to content

Commit 3113b69

Browse files
Artem Bilangaryrussell
authored andcommitted
INT-651: Add Iterator Support for Splitter
JIRA: https://jira.spring.io/browse/INT-651 INT-651: Polishing according PR comments INT-651: Polishing #2 Doc Polishing
1 parent dae6343 commit 3113b69

File tree

9 files changed

+459
-39
lines changed

9 files changed

+459
-39
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ else if (result != null) {
200200
}
201201
}
202202

203-
private void produceReply(Object reply, MessageHeaders requestHeaders) {
203+
protected void produceReply(Object reply, MessageHeaders requestHeaders) {
204204
Message<?> replyMessage = this.createReplyMessage(reply, requestHeaders);
205205
this.sendReplyMessage(replyMessage, requestHeaders.getReplyChannel());
206206
}

spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,73 +16,94 @@
1616

1717
package org.springframework.integration.splitter;
1818

19-
import java.util.ArrayList;
19+
import java.util.Arrays;
2020
import java.util.Collection;
21-
import java.util.List;
21+
import java.util.Collections;
22+
import java.util.Iterator;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
25+
import reactor.function.Function;
2226

2327
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
2428
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
29+
import org.springframework.integration.util.FunctionIterator;
2530
import org.springframework.messaging.Message;
2631
import org.springframework.messaging.MessageHeaders;
27-
import org.springframework.util.CollectionUtils;
28-
import org.springframework.util.ObjectUtils;
2932

3033
/**
3134
* Base class for Message-splitting handlers.
3235
*
3336
* @author Mark Fisher
3437
* @author Dave Syer
38+
* @author Artem Bilan
3539
*/
3640
public abstract class AbstractMessageSplitter extends AbstractReplyProducingMessageHandler {
3741

3842
private boolean applySequence = true;
3943

4044
/**
4145
* Set the applySequence flag to the specified value. Defaults to true.
42-
*
4346
* @param applySequence true to apply sequence information.
4447
*/
4548
public void setApplySequence(boolean applySequence) {
4649
this.applySequence = applySequence;
4750
}
4851

49-
@SuppressWarnings("rawtypes")
5052
@Override
53+
@SuppressWarnings("unchecked")
5154
protected final Object handleRequestMessage(Message<?> message) {
5255
Object result = this.splitMessage(message);
53-
// return null if 'null', empty Collection or empty Array
54-
if (result == null || (result instanceof Collection && CollectionUtils.isEmpty((Collection) result))
55-
|| (result.getClass().isArray() && ObjectUtils.isEmpty((Object[]) result))) {
56+
// return null if 'null'
57+
if (result == null) {
5658
return null;
5759
}
58-
MessageHeaders headers = message.getHeaders();
59-
Object correlationId = headers.getId();
60-
List<AbstractIntegrationMessageBuilder<?>> messageBuilders = new ArrayList<AbstractIntegrationMessageBuilder<?>>();
60+
61+
Iterator<Object> iterator;
62+
final int sequenceSize;
6163
if (result instanceof Collection) {
62-
Collection<?> items = (Collection<?>) result;
63-
int sequenceNumber = 0;
64-
int sequenceSize = items.size();
65-
for (Object item : items) {
66-
messageBuilders.add(this.createBuilder(item, headers, correlationId, ++sequenceNumber, sequenceSize));
67-
}
64+
Collection<Object> items = (Collection<Object>) result;
65+
sequenceSize = items.size();
66+
iterator = items.iterator();
6867
}
6968
else if (result.getClass().isArray()) {
7069
Object[] items = (Object[]) result;
71-
int sequenceNumber = 0;
72-
int sequenceSize = items.length;
73-
for (Object item : items) {
74-
messageBuilders.add(this.createBuilder(item, headers, correlationId, ++sequenceNumber, sequenceSize));
75-
}
70+
sequenceSize = items.length;
71+
iterator = Arrays.asList(items).iterator();
72+
}
73+
else if (result instanceof Iterable<?>) {
74+
sequenceSize = 0;
75+
iterator = ((Iterable<Object>) result).iterator();
76+
}
77+
else if (result instanceof Iterator<?>) {
78+
sequenceSize = 0;
79+
iterator = (Iterator<Object>) result;
7680
}
7781
else {
78-
messageBuilders.add(this.createBuilder(result, headers, correlationId, 1, 1));
82+
sequenceSize = 1;
83+
iterator = Collections.singleton(result).iterator();
84+
}
85+
86+
if (!iterator.hasNext()) {
87+
return null;
7988
}
80-
return messageBuilders;
89+
90+
final MessageHeaders headers = message.getHeaders();
91+
final Object correlationId = headers.getId();
92+
final AtomicInteger sequenceNumber = new AtomicInteger(1);
93+
94+
return new FunctionIterator<Object, AbstractIntegrationMessageBuilder<?>>(iterator,
95+
new Function<Object, AbstractIntegrationMessageBuilder<?>>() {
96+
@Override
97+
public AbstractIntegrationMessageBuilder<?> apply(Object object) {
98+
return createBuilder(object, headers, correlationId, sequenceNumber.getAndIncrement(),
99+
sequenceSize);
100+
}
101+
});
81102
}
82103

83104
@SuppressWarnings( { "unchecked", "rawtypes" })
84-
private AbstractIntegrationMessageBuilder createBuilder(Object item, MessageHeaders headers, Object correlationId, int sequenceNumber,
85-
int sequenceSize) {
105+
private AbstractIntegrationMessageBuilder createBuilder(Object item, MessageHeaders headers, Object correlationId,
106+
int sequenceNumber, int sequenceSize) {
86107
AbstractIntegrationMessageBuilder builder;
87108
if (item instanceof Message) {
88109
builder = this.getMessageBuilderFactory().fromMessage((Message) item);
@@ -97,6 +118,15 @@ private AbstractIntegrationMessageBuilder createBuilder(Object item, MessageHead
97118
return builder;
98119
}
99120

121+
@Override
122+
protected void produceReply(Object result, MessageHeaders requestHeaders) {
123+
Iterator<?> iterator = (Iterator<?>) result;
124+
while (iterator.hasNext()) {
125+
super.produceReply(iterator.next(), requestHeaders);
126+
127+
}
128+
}
129+
100130
@Override
101131
public String getComponentType() {
102132
return "splitter";
@@ -107,7 +137,6 @@ public String getComponentType() {
107137
* Array. The individual elements may be Messages, but it is not necessary. If the elements are not Messages, each
108138
* will be provided as the payload of a Message. It is also acceptable to return a single Object or Message. In that
109139
* case, a single reply Message will be produced.
110-
*
111140
* @param message The message.
112141
* @return The result of splitting the message.
113142
*/
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2014 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.util;
18+
19+
import java.util.Iterator;
20+
import java.util.NoSuchElementException;
21+
22+
import reactor.function.Function;
23+
24+
/**
25+
* An {@link Iterator} implementation to convert each item from the target
26+
* {@link #iterator} to a new object applying the {@link #function} on {@link #next()}.
27+
*
28+
* @author Artem Bilan
29+
* @since 4.1
30+
*/
31+
public final class FunctionIterator<T, V> implements Iterator<V> {
32+
33+
private final Iterator<T> iterator;
34+
35+
private final Function<? super T, ? extends V> function;
36+
37+
public FunctionIterator(Iterable<T> iterable, Function<? super T, ? extends V> function) {
38+
this(iterable.iterator(), function);
39+
}
40+
41+
public FunctionIterator(Iterator<T> newIterator, Function<? super T, ? extends V> function) {
42+
this.iterator = newIterator;
43+
this.function = function;
44+
}
45+
46+
@Override
47+
public void remove() {
48+
throw new UnsupportedOperationException("Cannot remove from a collect iterator");
49+
}
50+
51+
@Override
52+
public boolean hasNext() {
53+
return this.iterator.hasNext();
54+
}
55+
56+
@Override
57+
public V next() {
58+
if (this.hasNext()) {
59+
return this.function.apply(this.iterator.next());
60+
}
61+
throw new NoSuchElementException();
62+
}
63+
64+
}
65+

spring-integration-core/src/test/java/org/springframework/integration/splitter/SpelSplitterIntegrationTests-context.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
<splitter input-channel="beanResolvingInput" expression="@testBean.split(payload)" output-channel="output"/>
1919

20+
<splitter input-channel="iteratorInput" ref="testBean" method="splitIterator" output-channel="output"/>
21+
2022
<beans:bean id="testBean" class="org.springframework.integration.splitter.SpelSplitterIntegrationTests$TestBean"/>
2123

2224
</beans:beans>

spring-integration-core/src/test/java/org/springframework/integration/splitter/SpelSplitterIntegrationTests.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2010 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,26 +16,29 @@
1616

1717
package org.springframework.integration.splitter;
1818

19-
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertNotNull;
21-
import static org.junit.Assert.assertNull;
19+
import static org.junit.Assert.*;
2220

2321
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.Iterator;
2424
import java.util.List;
2525

2626
import org.junit.Test;
2727
import org.junit.runner.RunWith;
2828

2929
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3031
import org.springframework.integration.support.MessageBuilder;
3132
import org.springframework.messaging.Message;
3233
import org.springframework.messaging.MessageChannel;
3334
import org.springframework.messaging.PollableChannel;
35+
import org.springframework.messaging.support.GenericMessage;
3436
import org.springframework.test.context.ContextConfiguration;
3537
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
3638

3739
/**
3840
* @author Mark Fisher
41+
* @author Artem Bilan
3942
*/
4043
@ContextConfiguration
4144
@RunWith(SpringJUnit4ClassRunner.class)
@@ -50,6 +53,9 @@ public class SpelSplitterIntegrationTests {
5053
@Autowired
5154
private MessageChannel beanResolvingInput;
5255

56+
@Autowired
57+
private MessageChannel iteratorInput;
58+
5359
@Autowired
5460
private PollableChannel output;
5561

@@ -99,6 +105,28 @@ public void beanResolving() {
99105
assertNull(output.receive(0));
100106
}
101107

108+
@Test
109+
public void iteratorSplitter() {
110+
this.iteratorInput.send(new GenericMessage<String>("a,b,c,d"));
111+
Message<?> a = output.receive(0);
112+
Message<?> b = output.receive(0);
113+
Message<?> c = output.receive(0);
114+
Message<?> d = output.receive(0);
115+
assertEquals("a", a.getPayload());
116+
assertEquals(new Integer(1), new IntegrationMessageHeaderAccessor(a).getSequenceNumber());
117+
assertEquals(new Integer(0), new IntegrationMessageHeaderAccessor(a).getSequenceSize());
118+
assertEquals("b", b.getPayload());
119+
assertEquals(new Integer(2), new IntegrationMessageHeaderAccessor(b).getSequenceNumber());
120+
assertEquals(new Integer(0), new IntegrationMessageHeaderAccessor(b).getSequenceSize());
121+
assertEquals("c", c.getPayload());
122+
assertEquals(new Integer(3), new IntegrationMessageHeaderAccessor(c).getSequenceNumber());
123+
assertEquals(new Integer(0), new IntegrationMessageHeaderAccessor(c).getSequenceSize());
124+
assertEquals("d", d.getPayload());
125+
assertEquals(new Integer(4), new IntegrationMessageHeaderAccessor(d).getSequenceNumber());
126+
assertEquals(new Integer(0), new IntegrationMessageHeaderAccessor(d).getSequenceSize());
127+
assertNull(output.receive(0));
128+
}
129+
102130

103131
static class TestBean {
104132

@@ -117,6 +145,10 @@ public List<Integer> getNumbers() {
117145
public String[] split(String s) {
118146
return s.split(",");
119147
}
148+
149+
public Iterator<String> splitIterator(String s) {
150+
return Arrays.asList(s.split(",")).iterator();
151+
}
120152
}
121153

122154
}

spring-integration-core/src/test/java/org/springframework/integration/splitter/SplitterIntegrationTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.ArrayList;
2323
import java.util.Arrays;
24+
import java.util.Iterator;
2425
import java.util.List;
2526

2627
import org.junit.Before;
@@ -95,8 +96,8 @@ public void deliveredWords(String string) {
9596
public static class TestSplitter {
9697

9798
@Splitter(inputChannel = "inAnnotated", outputChannel = "out")
98-
public List<String> split(String sentence) {
99-
return Arrays.asList(sentence.split("\\s"));
99+
public Iterator<String> split(String sentence) {
100+
return Arrays.asList(sentence.split("\\s")).iterator();
100101
}
101102
}
102103

0 commit comments

Comments
 (0)