Skip to content

Commit 7f25cba

Browse files
xak2000artembilan
authored andcommitted
INT-4430: FileSplitter close reader on exception
JIRA: https://jira.spring.io/browse/INT-4430 When Iterator-based `FileSplitter` splits the file, and an exception throws in downstream flow (in the same thread), the exception propagates to the caller leaving underlying file reader opened. This commit changes `AbstractMessageSplitter` the way, that, when any exception happens, if Iterator implements `java.io.Closeable`, its `close()` method will be called before propagating exception. Also `FileSplitter`'s underlying iterator implements `Closeable` now. * Make `CloseableIterator` to follow `Closeable` contract. Now `CloseableIterator.close()` declares `IOException` and can be used as base interface for `FunctionIterator`. * Adjust tests. Adjust tests to reflect the fact that we call `close()` on the reader one more time in the end of iterator. **Cherry-pick to 5.0.x and 4.3.x**
1 parent f77fd78 commit 7f25cba

File tree

6 files changed

+101
-14
lines changed

6 files changed

+101
-14
lines changed

spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.splitter;
1818

19+
import java.io.Closeable;
1920
import java.util.Arrays;
2021
import java.util.Collection;
2122
import java.util.Collections;
@@ -42,6 +43,7 @@
4243
* @author Mark Fisher
4344
* @author Dave Syer
4445
* @author Artem Bilan
46+
* @author Ruslan Stelmachenko
4547
*/
4648
public abstract class AbstractMessageSplitter extends AbstractReplyProducingMessageHandler {
4749

@@ -227,9 +229,20 @@ protected boolean shouldCopyRequestHeaders() {
227229
protected void produceOutput(Object result, Message<?> requestMessage) {
228230
if (result instanceof Iterator<?>) {
229231
Iterator<?> iterator = (Iterator<?>) result;
230-
while (iterator.hasNext()) {
231-
super.produceOutput(iterator.next(), requestMessage);
232-
232+
try {
233+
while (iterator.hasNext()) {
234+
super.produceOutput(iterator.next(), requestMessage);
235+
}
236+
}
237+
finally {
238+
if (iterator instanceof Closeable) {
239+
try {
240+
((Closeable) iterator).close();
241+
}
242+
catch (Exception e) {
243+
// ignored
244+
}
245+
}
233246
}
234247
}
235248
else {
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.util;
18+
19+
import java.io.Closeable;
20+
import java.util.Iterator;
21+
22+
/**
23+
* A {@link CloseableIterator} is intended to be used when it may hold resources (such as file or socket handles).
24+
* This allows implementations to clean up any resources they need to keep open to iterate over elements.
25+
*
26+
* @author Ruslan Stelmachenko
27+
*
28+
* @since 4.3.15
29+
*/
30+
public interface CloseableIterator<E> extends Iterator<E>, Closeable {
31+
}

spring-integration-core/src/main/java/org/springframework/integration/util/FunctionIterator.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2017 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.util;
1818

19+
import java.io.Closeable;
20+
import java.io.IOException;
1921
import java.util.Iterator;
2022
import java.util.function.Function;
2123

@@ -24,9 +26,10 @@
2426
* {@link #iterator} to a new object applying the {@link #function} on {@link #next()}.
2527
*
2628
* @author Artem Bilan
29+
* @author Ruslan Stelmachenko
2730
* @since 4.1
2831
*/
29-
public class FunctionIterator<T, V> implements Iterator<V> {
32+
public class FunctionIterator<T, V> implements CloseableIterator<V> {
3033

3134
private final Iterator<T> iterator;
3235

@@ -56,5 +59,11 @@ public V next() {
5659
return this.function.apply(this.iterator.next());
5760
}
5861

59-
}
62+
@Override
63+
public void close() throws IOException {
64+
if (this.iterator instanceof Closeable) {
65+
((Closeable) this.iterator).close();
66+
}
67+
}
6068

69+
}

spring-integration-file/src/main/java/org/springframework/integration/file/splitter/FileSplitter.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2017 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@
4141
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
4242
import org.springframework.integration.support.json.JsonObjectMapper;
4343
import org.springframework.integration.support.json.JsonObjectMapperProvider;
44+
import org.springframework.integration.util.CloseableIterator;
4445
import org.springframework.messaging.Message;
4546
import org.springframework.messaging.MessageHandlingException;
4647
import org.springframework.util.Assert;
@@ -68,6 +69,7 @@
6869
*
6970
* @author Artem Bilan
7071
* @author Gary Russell
72+
* @author Ruslan Stelmachenko
7173
*
7274
* @since 4.1.2
7375
*/
@@ -243,7 +245,7 @@ public void close() throws IOException {
243245
firstLineAsHeader = null;
244246
}
245247

246-
Iterator<Object> iterator = new Iterator<Object>() {
248+
Iterator<Object> iterator = new CloseableIterator<Object>() {
247249

248250
boolean markers = FileSplitter.this.markers;
249251

@@ -263,7 +265,7 @@ public void close() throws IOException {
263265
public boolean hasNext() {
264266
this.hasNextCalled = true;
265267
try {
266-
if (this.line == null && !this.done) {
268+
if (!this.done && this.line == null) {
267269
this.line = bufferedReader.readLine();
268270
}
269271
boolean ready = !this.done && this.line != null;
@@ -280,8 +282,8 @@ public boolean hasNext() {
280282
}
281283
catch (IOException e) {
282284
try {
283-
bufferedReader.close();
284285
this.done = true;
286+
bufferedReader.close();
285287
}
286288
catch (IOException e1) {
287289
// ignored
@@ -344,6 +346,17 @@ private AbstractIntegrationMessageBuilder<Object> markerToReturn(FileMarker file
344346
.setHeader(FileHeaders.MARKER, fileMarker.mark.name());
345347
}
346348

349+
@Override
350+
public void close() {
351+
try {
352+
this.done = true;
353+
bufferedReader.close();
354+
}
355+
catch (IOException e) {
356+
// ignored
357+
}
358+
}
359+
347360
};
348361

349362
if (this.iterator) {

spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void testLineByLine() throws Exception {
172172
assertNull(out.receive(0));
173173

174174
// close by list, splitter
175-
verify(new IntegrationMessageHeaderAccessor(receivedStream).getCloseableResource(), times(2)).close();
175+
verify(new IntegrationMessageHeaderAccessor(receivedStream).getCloseableResource(), times(3)).close();
176176

177177
receivedStream = streamer.receive();
178178
splitter.handleMessage(receivedStream);
@@ -187,7 +187,7 @@ public void testLineByLine() throws Exception {
187187
assertNull(out.receive(0));
188188

189189
// close by splitter
190-
verify(new IntegrationMessageHeaderAccessor(receivedStream).getCloseableResource(), times(3)).close();
190+
verify(new IntegrationMessageHeaderAccessor(receivedStream).getCloseableResource(), times(5)).close();
191191
}
192192

193193
public static class Streamer extends AbstractRemoteFileStreamingMessageSource<String> {

spring-integration-file/src/test/java/org/springframework/integration/file/splitter/FileSplitterTests.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2017 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,6 +43,7 @@
4343
import org.junit.BeforeClass;
4444
import org.junit.Test;
4545
import org.junit.runner.RunWith;
46+
import org.mockito.Mockito;
4647
import org.reactivestreams.Subscriber;
4748

4849
import org.springframework.beans.factory.annotation.Autowired;
@@ -51,6 +52,7 @@
5152
import org.springframework.context.annotation.ImportResource;
5253
import org.springframework.integration.IntegrationMessageHeaderAccessor;
5354
import org.springframework.integration.annotation.Splitter;
55+
import org.springframework.integration.channel.DirectChannel;
5456
import org.springframework.integration.channel.FluxMessageChannel;
5557
import org.springframework.integration.channel.QueueChannel;
5658
import org.springframework.integration.config.EnableIntegration;
@@ -75,6 +77,7 @@
7577
/**
7678
* @author Artem Bilan
7779
* @author Gary Russell
80+
* @author Ruslan Stelmachenko
7881
*
7982
* @since 4.1.2
8083
*/
@@ -365,6 +368,24 @@ public void testFirstLineAsHeaderOnlyHeader() throws IOException {
365368
assertEquals(0, fileMarker.getLineCount());
366369
}
367370

371+
@Test
372+
public void testFileReaderClosedOnException() throws Exception {
373+
DirectChannel outputChannel = new DirectChannel();
374+
outputChannel.subscribe(message -> {
375+
throw new RuntimeException();
376+
});
377+
FileSplitter splitter = new FileSplitter(true, true);
378+
splitter.setOutputChannel(outputChannel);
379+
FileReader fileReader = Mockito.spy(new FileReader(file));
380+
try {
381+
splitter.handleMessage(new GenericMessage<Reader>(fileReader));
382+
}
383+
catch (RuntimeException e) {
384+
// ignore
385+
}
386+
Mockito.verify(fileReader).close();
387+
}
388+
368389
@Configuration
369390
@EnableIntegration
370391
@ImportResource("classpath:org/springframework/integration/file/splitter/FileSplitterTests-context.xml")

0 commit comments

Comments
 (0)