Skip to content

Commit 5c849c5

Browse files
committed
GH-8792: Fix File StreamingMS for one file filter
Fixes #8792 1. Use a `SftpStreamingMessageSource` with a `maxFetchSize = 5` and a `ChainFileListFilter` filter composed with `SftpSystemMarkerFilePresentFileListFilter` which `supportsSingleFileFiltering == false` 2. Put 2 files in the folder and invoke `SftpStreamingMessageSource.receive()` method twice. 3. Put 5 files in the folder and invoke `SftpStreamingMessageSource.receive()`` method five times. 4. The last two files won't be received. When you set max fetch size to a number bigger than one (for example 5) and at a certain point it is necessary to `this.toBeReceived.clear()` inside `AbstractRemoteFileStreamingMessageSource.doReceive()`, those removed elements from toBeReceived are not rolled back. * Fix `AbstractRemoteFileStreamingMessageSource.listFiles()` to calculate `maxFetchSize` as `getMaxFetchSize() - this.fetched.get()` **Cherry-pick to `6.1.x`, `6.0.x` & `5.5.x`** (cherry picked from commit 22c4db2) # Conflicts: # spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSourceTests.java
1 parent 430ef9b commit 5c849c5

File tree

2 files changed

+73
-3
lines changed

2 files changed

+73
-3
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ protected Object doReceive(int maxFetchSize) {
215215
if (this.filter != null && this.filter.supportsSingleFileFiltering()
216216
&& !this.filter.accept(file.getFileInfo())) {
217217

218-
if (this.toBeReceived.size() > 0) { // don't re-fetch already filtered files
218+
if (!this.toBeReceived.isEmpty()) { // don't re-fetch already filtered files
219219
file = poll();
220220
continue;
221221
}
@@ -267,7 +267,7 @@ private Object remoteFileToMessage(AbstractFileInfo<F> file) {
267267
}
268268

269269
protected AbstractFileInfo<F> poll() {
270-
if (this.toBeReceived.size() == 0) {
270+
if (this.toBeReceived.isEmpty()) {
271271
listFiles();
272272
}
273273
return this.toBeReceived.poll();
@@ -297,7 +297,7 @@ private void listFiles() {
297297
if (!ObjectUtils.isEmpty(files)) {
298298
List<AbstractFileInfo<F>> fileInfoList;
299299
if (this.filter != null && !this.filter.supportsSingleFileFiltering()) {
300-
int maxFetchSize = getMaxFetchSize();
300+
int maxFetchSize = getMaxFetchSize() - this.fetched.get();
301301
List<F> filteredFiles = this.filter.filterFiles(files);
302302
if (maxFetchSize > 0 && filteredFiles.size() > maxFetchSize) {
303303
rollbackFromFileToListEnd(filteredFiles, filteredFiles.get(maxFetchSize));

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSourceTests.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616

1717
package org.springframework.integration.sftp.inbound;
1818

19+
import java.io.File;
20+
import java.io.IOException;
1921
import java.io.InputStream;
22+
import java.nio.charset.StandardCharsets;
2023
import java.util.Arrays;
2124
import java.util.Comparator;
2225
import java.util.concurrent.ConcurrentHashMap;
2326
import java.util.concurrent.ConcurrentMap;
2427

2528
import com.jcraft.jsch.ChannelSftp.LsEntry;
29+
import org.apache.commons.io.FileUtils;
2630
import org.junit.jupiter.api.Test;
2731

2832
import org.springframework.beans.factory.annotation.Autowired;
@@ -38,11 +42,14 @@
3842
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
3943
import org.springframework.integration.file.FileHeaders;
4044
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
45+
import org.springframework.integration.file.filters.ChainFileListFilter;
4146
import org.springframework.integration.file.remote.session.SessionFactory;
4247
import org.springframework.integration.metadata.SimpleMetadataStore;
4348
import org.springframework.integration.scheduling.PollerMetadata;
4449
import org.springframework.integration.sftp.SftpTestSupport;
4550
import org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter;
51+
import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
52+
import org.springframework.integration.sftp.filters.SftpSystemMarkerFilePresentFileListFilter;
4653
import org.springframework.integration.sftp.session.SftpFileInfo;
4754
import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
4855
import org.springframework.integration.transformer.StreamTransformer;
@@ -167,6 +174,69 @@ public void testMaxFetchLambdaFilter() throws Exception {
167174
StaticMessageHeaderAccessor.getCloseableResource(received).close();
168175
}
169176

177+
178+
@Test
179+
public void maxFetchIsAdjustedWhenNoSupportsSingleFileFiltering() throws Exception {
180+
SftpStreamingMessageSource messageSource = buildSource();
181+
ChainFileListFilter<LsEntry> chainFileListFilter = new ChainFileListFilter<>();
182+
SftpSystemMarkerFilePresentFileListFilter sftpSystemMarkerFilePresentFileListFilter =
183+
new SftpSystemMarkerFilePresentFileListFilter(
184+
new SftpSimplePatternFileListFilter("*"), ".trg");
185+
SftpPersistentAcceptOnceFileListFilter sftpPersistentAcceptOnceFileListFilter =
186+
new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "prefix");
187+
chainFileListFilter.addFilter(sftpSystemMarkerFilePresentFileListFilter);
188+
chainFileListFilter.addFilter(sftpPersistentAcceptOnceFileListFilter);
189+
messageSource.setFilter(chainFileListFilter);
190+
messageSource.setMaxFetchSize(5);
191+
messageSource.afterPropertiesSet();
192+
messageSource.start();
193+
194+
addFileAndTrigger("file001");
195+
addFileAndTrigger("file002");
196+
197+
Message<InputStream> received = messageSource.receive();
198+
assertThat(received).isNotNull();
199+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file001");
200+
201+
received = messageSource.receive();
202+
assertThat(received).isNotNull();
203+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file002");
204+
205+
addFileAndTrigger("file003");
206+
addFileAndTrigger("file004");
207+
addFileAndTrigger("file005");
208+
addFileAndTrigger("file006");
209+
addFileAndTrigger("file007");
210+
211+
received = messageSource.receive();
212+
assertThat(received).isNotNull();
213+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file003");
214+
215+
received = messageSource.receive();
216+
assertThat(received).isNotNull();
217+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file004");
218+
219+
received = messageSource.receive();
220+
assertThat(received).isNotNull();
221+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file005");
222+
223+
received = messageSource.receive();
224+
assertThat(received).isNotNull();
225+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file006");
226+
227+
received = messageSource.receive();
228+
assertThat(received).isNotNull();
229+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("file007");
230+
}
231+
232+
private void addFileAndTrigger(String filename) throws IOException {
233+
File file = new File(this.sourceRemoteDirectory, filename);
234+
FileUtils.writeStringToFile(file, "source1", StandardCharsets.UTF_8);
235+
236+
file = new File(this.sourceRemoteDirectory, filename + ".trg");
237+
file.createNewFile();
238+
}
239+
170240
private SftpStreamingMessageSource buildSource() {
171241
SftpStreamingMessageSource messageSource =
172242
new SftpStreamingMessageSource(this.config.template(),

0 commit comments

Comments
 (0)