Skip to content

Commit 45789a9

Browse files
garyrussellartembilan
authored andcommitted
GH-2776: Fix Streaming Remote File MessageSource
Fixes #2776 Also see #2777 - reset the filter for the current file if the fetch fails - implement `Lifecycle` and clear the `toBeReceived` queue and corresponding filter entries * Polishing - PR Comments **cherry-pick to all supported** * Polishing # Conflicts: # spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java # spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSourceTests.java # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/StoredProcJavaConfigTests.java # Conflicts: # spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java # spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java # Conflicts: # spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java # spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java # spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSourceTests.java # spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSourceTests.java
1 parent 442b3d9 commit 45789a9

File tree

3 files changed

+115
-22
lines changed

3 files changed

+115
-22
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.io.IOException;
2020
import java.io.InputStream;
21+
import java.util.ArrayList;
2122
import java.util.Arrays;
2223
import java.util.Collection;
2324
import java.util.Collections;
@@ -26,15 +27,18 @@
2627
import java.util.List;
2728
import java.util.concurrent.BlockingQueue;
2829
import java.util.concurrent.LinkedBlockingQueue;
30+
import java.util.concurrent.atomic.AtomicBoolean;
2931

30-
import org.springframework.beans.factory.BeanFactoryAware;
3132
import org.springframework.beans.factory.InitializingBean;
33+
import org.springframework.context.Lifecycle;
3234
import org.springframework.expression.Expression;
3335
import org.springframework.expression.common.LiteralExpression;
3436
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3537
import org.springframework.integration.endpoint.AbstractMessageSource;
3638
import org.springframework.integration.file.FileHeaders;
3739
import org.springframework.integration.file.filters.FileListFilter;
40+
import org.springframework.integration.file.filters.ResettableFileListFilter;
41+
import org.springframework.integration.file.filters.ReversibleFileListFilter;
3842
import org.springframework.integration.file.remote.session.Session;
3943
import org.springframework.messaging.MessagingException;
4044
import org.springframework.util.Assert;
@@ -47,15 +51,17 @@
4751
* @since 4.3
4852
*
4953
*/
50-
public abstract class AbstractRemoteFileStreamingMessageSource<F> extends AbstractMessageSource<InputStream>
51-
implements BeanFactoryAware, InitializingBean {
54+
public abstract class AbstractRemoteFileStreamingMessageSource<F>
55+
extends AbstractMessageSource<InputStream> implements Lifecycle {
5256

5357
private final RemoteFileTemplate<F> remoteFileTemplate;
5458

5559
private final BlockingQueue<AbstractFileInfo<F>> toBeReceived = new LinkedBlockingQueue<AbstractFileInfo<F>>();
5660

5761
private final Comparator<AbstractFileInfo<F>> comparator;
5862

63+
private final AtomicBoolean running = new AtomicBoolean();
64+
5965
/**
6066
* the path on the remote server.
6167
*/
@@ -107,7 +113,11 @@ public void setRemoteFileSeparator(String remoteFileSeparator) {
107113
* @param filter the file list filter.
108114
*/
109115
public void setFilter(FileListFilter<F> filter) {
110-
this.filter = filter;
116+
doSetFilter(filter);
117+
}
118+
119+
protected final void doSetFilter(FileListFilter<F> filterToSet) {
120+
this.filter = filterToSet;
111121
}
112122

113123
protected RemoteFileTemplate<F> getRemoteFileTemplate() {
@@ -127,26 +137,67 @@ public final void afterPropertiesSet() {
127137
protected void doInit() {
128138
}
129139

140+
141+
@Override
142+
public void start() {
143+
this.running.set(true);
144+
}
145+
146+
@Override
147+
public void stop() {
148+
if (this.running.compareAndSet(true, false)) {
149+
// remove unprocessed files from the queue (and filter)
150+
AbstractFileInfo<F> file = this.toBeReceived.poll();
151+
while (file != null) {
152+
resetFilterIfNecessary(file);
153+
file = this.toBeReceived.poll();
154+
}
155+
}
156+
}
157+
158+
@Override
159+
public boolean isRunning() {
160+
return this.running.get();
161+
}
162+
130163
@Override
131164
protected Object doReceive() {
165+
Assert.state(this.running.get(), getComponentName() + " is not running");
132166
AbstractFileInfo<F> file = poll();
133167
if (file != null) {
134-
String remotePath = remotePath(file);
135-
Session<?> session = this.remoteFileTemplate.getSession();
136168
try {
137-
return getMessageBuilderFactory().withPayload(session.readRaw(remotePath))
138-
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
139-
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
140-
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
141-
.build();
169+
String remotePath = remotePath(file);
170+
Session<?> session = this.remoteFileTemplate.getSession();
171+
try {
172+
return getMessageBuilderFactory()
173+
.withPayload(session.readRaw(remotePath))
174+
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
175+
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
176+
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
177+
.build();
178+
}
179+
catch (IOException e) {
180+
throw new MessagingException("IOException when retrieving " + remotePath, e);
181+
}
142182
}
143-
catch (IOException e) {
144-
throw new MessagingException("IOException when retrieving " + remotePath, e);
183+
catch (RuntimeException e) {
184+
resetFilterIfNecessary(file);
185+
throw e;
145186
}
146187
}
147188
return null;
148189
}
149190

191+
private void resetFilterIfNecessary(AbstractFileInfo<F> file) {
192+
if (this.filter instanceof ResettableFileListFilter) {
193+
if (this.logger.isInfoEnabled()) {
194+
this.logger.info("Removing the remote file '" + file +
195+
"' from the filter for a subsequent transfer attempt");
196+
}
197+
((ResettableFileListFilter<F>) this.filter).remove(file.getFileInfo());
198+
}
199+
}
200+
150201
protected AbstractFileInfo<F> poll() {
151202
if (this.toBeReceived.size() == 0) {
152203
listFiles();

spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,10 +28,13 @@
2828
import java.io.ByteArrayInputStream;
2929
import java.io.IOException;
3030
import java.io.InputStream;
31+
import java.io.UncheckedIOException;
3132
import java.util.ArrayList;
3233
import java.util.Collection;
3334
import java.util.Comparator;
3435
import java.util.List;
36+
import java.util.concurrent.BlockingQueue;
37+
import java.util.concurrent.ConcurrentHashMap;
3538

3639
import org.junit.Rule;
3740
import org.junit.Test;
@@ -41,15 +44,22 @@
4144
import org.springframework.integration.IntegrationMessageHeaderAccessor;
4245
import org.springframework.integration.channel.QueueChannel;
4346
import org.springframework.integration.file.FileHeaders;
47+
import org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter;
48+
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
4449
import org.springframework.integration.file.remote.session.Session;
4550
import org.springframework.integration.file.remote.session.SessionFactory;
4651
import org.springframework.integration.file.splitter.FileSplitter;
52+
import org.springframework.integration.metadata.ConcurrentMetadataStore;
53+
import org.springframework.integration.metadata.SimpleMetadataStore;
54+
import org.springframework.integration.test.util.TestUtils;
4755
import org.springframework.integration.transformer.StreamTransformer;
4856
import org.springframework.messaging.Message;
4957
import org.springframework.messaging.MessagingException;
5058

5159
/**
5260
* @author Gary Russell
61+
* @author Artem Bilan
62+
*
5363
* @since 4.3
5464
*
5565
*/
@@ -67,6 +77,7 @@ public void testAllData() throws Exception {
6777
streamer.setBeanFactory(mock(BeanFactory.class));
6878
streamer.setRemoteDirectory("/foo");
6979
streamer.afterPropertiesSet();
80+
streamer.start();
7081
Message<byte[]> received = (Message<byte[]>) this.transformer.transform(streamer.receive());
7182
assertEquals("foo\nbar", new String(received.getPayload()));
7283
assertEquals("/foo", received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY));
@@ -90,16 +101,17 @@ public void testExceptionOnFetch() {
90101
streamer.setBeanFactory(mock(BeanFactory.class));
91102
streamer.setRemoteDirectory("/bad");
92103
streamer.afterPropertiesSet();
93-
streamer.receive();
104+
streamer.start();
94105
}
95106

96-
@SuppressWarnings("unchecked")
97107
@Test
108+
@SuppressWarnings("unchecked")
98109
public void testLineByLine() throws Exception {
99110
Streamer streamer = new Streamer(new StringRemoteFileTemplate(new StringSessionFactory()), null);
100111
streamer.setBeanFactory(mock(BeanFactory.class));
101112
streamer.setRemoteDirectory("/foo");
102113
streamer.afterPropertiesSet();
114+
streamer.start();
103115
QueueChannel out = new QueueChannel();
104116
FileSplitter splitter = new FileSplitter();
105117
splitter.setBeanFactory(mock(BeanFactory.class));
@@ -140,8 +152,11 @@ public void testLineByLine() throws Exception {
140152

141153
public static class Streamer extends AbstractRemoteFileStreamingMessageSource<String> {
142154

155+
ConcurrentHashMap<String, String> metadataMap = new ConcurrentHashMap<>();
156+
143157
protected Streamer(RemoteFileTemplate<String> template, Comparator<AbstractFileInfo<String>> comparator) {
144158
super(template, comparator);
159+
doSetFilter(new StringPersistentFileListFilter(new SimpleMetadataStore(this.metadataMap), "streamer"));
145160
}
146161

147162
@Override
@@ -151,7 +166,7 @@ public String getComponentType() {
151166

152167
@Override
153168
protected List<AbstractFileInfo<String>> asFileInfoList(Collection<String> files) {
154-
List<AbstractFileInfo<String>> infos = new ArrayList<AbstractFileInfo<String>>();
169+
List<AbstractFileInfo<String>> infos = new ArrayList<>();
155170
for (String file : files) {
156171
infos.add(new StringFileInfo(file));
157172
}
@@ -200,7 +215,7 @@ public String getPermissions() {
200215

201216
@Override
202217
public String getFileInfo() {
203-
return null;
218+
return name;
204219
}
205220

206221
}
@@ -215,9 +230,14 @@ public StringRemoteFileTemplate(SessionFactory<String> sessionFactory) {
215230

216231
public static class StringSessionFactory implements SessionFactory<String> {
217232

233+
private Session<String> singletonSession;
234+
218235
@SuppressWarnings("unchecked")
219236
@Override
220237
public Session<String> getSession() {
238+
if (this.singletonSession != null) {
239+
return this.singletonSession;
240+
}
221241
try {
222242
Session<String> session = mock(Session.class);
223243
willReturn(new String[] { "/foo/foo", "/foo/bar" }).given(session).list("/foo");
@@ -232,10 +252,14 @@ public Session<String> getSession() {
232252
willReturn(foo2).given(session).readRaw("/bar/foo");
233253
willReturn(bar2).given(session).readRaw("/bar/bar");
234254

235-
willReturn(new String[] { "/bad/file" }).given(session).list("/bad");
236-
willThrow(new IOException("No file")).given(session).readRaw("/bad/file");
255+
willReturn(new String[] { "/bad/file1", "/bad/file2" }).given(session).list("/bad");
256+
willThrow(new IOException("No file")).given(session).readRaw("/bad/file1");
257+
willThrow(new IOException("No file")).given(session).readRaw("/bad/file2");
237258

238259
given(session.finalizeRaw()).willReturn(true);
260+
261+
this.singletonSession = session;
262+
239263
return session;
240264
}
241265
catch (Exception e) {
@@ -245,4 +269,22 @@ public Session<String> getSession() {
245269

246270
}
247271

272+
public static class StringPersistentFileListFilter extends AbstractPersistentAcceptOnceFileListFilter<String> {
273+
274+
public StringPersistentFileListFilter(ConcurrentMetadataStore store, String prefix) {
275+
super(store, prefix);
276+
}
277+
278+
@Override
279+
protected long modified(String file) {
280+
return 0;
281+
}
282+
283+
@Override
284+
protected String fileName(String file) {
285+
return file;
286+
}
287+
288+
}
289+
248290
}

spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSourceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

0 commit comments

Comments
 (0)