Skip to content

Commit 86c7699

Browse files
artembilangaryrussell
authored andcommitted
Optimize AbstractMessageSources
To avoid `Message` re-creation during `AbstractMessageSource.receive()` logic, refactor `AbstractMessageSource` implementations to return `AbstractIntegrationMessageBuilder` * Add `AbstractIntegrationMessageBuilder<File> doReceive()` to the `FileReadingMessageSource` to be called from the `AbstractInboundFileSynchronizingMessageSource` to avoid message recreation in its `doReceive()` * Some code style refactoring in the `AbstractMessageSource`
1 parent e6ec86c commit 86c7699

File tree

4 files changed

+93
-57
lines changed

4 files changed

+93
-57
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractMessageSource.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,13 @@
3737
* @author Mark Fisher
3838
* @author Oleg Zhurakousky
3939
* @author Gary Russell
40+
* @author Artem Bilan
41+
*
4042
* @since 2.0
4143
*/
4244
@IntegrationManagedResource
43-
public abstract class AbstractMessageSource<T> extends AbstractExpressionEvaluator implements MessageSource<T>,
44-
MessageSourceMetrics, NamedComponent, BeanNameAware {
45+
public abstract class AbstractMessageSource<T> extends AbstractExpressionEvaluator
46+
implements MessageSource<T>, MessageSourceMetrics, NamedComponent, BeanNameAware {
4547

4648
private final AtomicLong messageCount = new AtomicLong();
4749

@@ -61,7 +63,7 @@ public abstract class AbstractMessageSource<T> extends AbstractExpressionEvaluat
6163

6264
public void setHeaderExpressions(Map<String, Expression> headerExpressions) {
6365
this.headerExpressions = (headerExpressions != null)
64-
? headerExpressions : Collections.<String, Expression>emptyMap();
66+
? headerExpressions : Collections.emptyMap();
6567
}
6668

6769
@Override
@@ -160,24 +162,24 @@ else if (result instanceof Message<?>) {
160162
}
161163
if (!CollectionUtils.isEmpty(headers)) {
162164
// create a new Message from this one in order to apply headers
163-
AbstractIntegrationMessageBuilder<T> builder = getMessageBuilderFactory().fromMessage(message);
164-
builder.copyHeaders(headers);
165-
message = builder.build();
165+
message = getMessageBuilderFactory()
166+
.fromMessage(message)
167+
.copyHeaders(headers)
168+
.build();
166169
}
167170
}
168171
else if (result != null) {
169-
T payload = null;
172+
T payload;
170173
try {
171174
payload = (T) result;
172175
}
173176
catch (Exception e) {
174177
throw new MessagingException("MessageSource returned unexpected type.", e);
175178
}
176-
AbstractIntegrationMessageBuilder<T> builder = getMessageBuilderFactory().withPayload(payload);
177-
if (!CollectionUtils.isEmpty(headers)) {
178-
builder.copyHeaders(headers);
179-
}
180-
message = builder.build();
179+
message = getMessageBuilderFactory()
180+
.withPayload(payload)
181+
.copyHeaders(headers)
182+
.build();
181183
}
182184
if (this.countsEnabled && message != null) {
183185
this.messageCount.incrementAndGet();
@@ -186,7 +188,7 @@ else if (result != null) {
186188
}
187189

188190
private Map<String, Object> evaluateHeaders() {
189-
Map<String, Object> results = new HashMap<String, Object>();
191+
Map<String, Object> results = new HashMap<>();
190192
for (Map.Entry<String, Expression> entry : this.headerExpressions.entrySet()) {
191193
Object headerValue = this.evaluateExpression(entry.getValue());
192194
if (headerValue != null) {
@@ -197,9 +199,9 @@ private Map<String, Object> evaluateHeaders() {
197199
}
198200

199201
/**
200-
* Subclasses must implement this method. Typically the returned value will be the payload of
201-
* type T, but the returned value may also be a Message instance whose payload is of type T.
202-
*
202+
* Subclasses must implement this method. Typically the returned value will be the {@code payload} of
203+
* type T, but the returned value may also be a {@link Message} instance whose payload is of type T;
204+
* also can be {@link AbstractIntegrationMessageBuilder} which is used for additional headers population.
203205
* @return The value returned.
204206
*/
205207
protected abstract Object doReceive();

spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
5151
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
5252
import org.springframework.integration.file.filters.FileListFilter;
5353
import org.springframework.integration.file.filters.ResettableFileListFilter;
54+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
5455
import org.springframework.messaging.Message;
5556
import org.springframework.messaging.MessagingException;
5657
import org.springframework.util.Assert;
@@ -159,8 +160,7 @@ public FileReadingMessageSource(int internalQueueCapacity) {
159160
* queue
160161
*/
161162
public FileReadingMessageSource(Comparator<File> receptionOrderComparator) {
162-
this.toBeReceived = new PriorityBlockingQueue<File>(
163-
DEFAULT_INTERNAL_QUEUE_CAPACITY, receptionOrderComparator);
163+
this.toBeReceived = new PriorityBlockingQueue<>(DEFAULT_INTERNAL_QUEUE_CAPACITY, receptionOrderComparator);
164164
}
165165

166166

@@ -350,8 +350,21 @@ protected void onInit() {
350350

351351
@Override
352352
public Message<File> receive() throws MessagingException {
353+
AbstractIntegrationMessageBuilder<File> messageBuilder = doReceive();
354+
353355
Message<File> message = null;
354356

357+
if (messageBuilder != null) {
358+
message = messageBuilder.build();
359+
if (logger.isInfoEnabled()) {
360+
logger.info("Created message: [" + message + "]");
361+
}
362+
}
363+
364+
return message;
365+
}
366+
367+
protected AbstractIntegrationMessageBuilder<File> doReceive() {
355368
// rescan only if needed or explicitly configured
356369
if (this.scanEachPoll || this.toBeReceived.isEmpty()) {
357370
scanInputDirectory();
@@ -366,23 +379,22 @@ public Message<File> receive() throws MessagingException {
366379
}
367380

368381
if (file != null) {
369-
message = getMessageBuilderFactory().withPayload(file)
370-
.setHeader(FileHeaders.RELATIVE_PATH, file.getAbsolutePath()
371-
.replaceFirst(Matcher.quoteReplacement(this.directory.getAbsolutePath() + File.separator),
372-
""))
382+
return getMessageBuilderFactory()
383+
.withPayload(file)
384+
.setHeader(FileHeaders.RELATIVE_PATH,
385+
file.getAbsolutePath()
386+
.replaceFirst(Matcher.quoteReplacement(
387+
this.directory.getAbsolutePath() + File.separator), ""))
373388
.setHeader(FileHeaders.FILENAME, file.getName())
374-
.setHeader(FileHeaders.ORIGINAL_FILE, file)
375-
.build();
376-
if (logger.isInfoEnabled()) {
377-
logger.info("Created message: [" + message + "]");
378-
}
389+
.setHeader(FileHeaders.ORIGINAL_FILE, file);
379390
}
380-
return message;
391+
392+
return null;
381393
}
382394

383395
private void scanInputDirectory() {
384396
List<File> filteredFiles = this.scanner.listFiles(this.directory);
385-
Set<File> freshFiles = new LinkedHashSet<File>(filteredFiles);
397+
Set<File> freshFiles = new LinkedHashSet<>(filteredFiles);
386398
if (!freshFiles.isEmpty()) {
387399
this.toBeReceived.addAll(freshFiles);
388400
if (logger.isDebugEnabled()) {
@@ -421,7 +433,7 @@ public enum WatchEventType {
421433

422434
private class WatchServiceDirectoryScanner extends DefaultDirectoryScanner implements Lifecycle {
423435

424-
private final ConcurrentMap<Path, WatchKey> pathKeys = new ConcurrentHashMap<Path, WatchKey>();
436+
private final ConcurrentMap<Path, WatchKey> pathKeys = new ConcurrentHashMap<>();
425437

426438
private WatchService watcher;
427439

@@ -547,7 +559,7 @@ else if (event.kind() == StandardWatchEventKinds.OVERFLOW) {
547559
}
548560

549561
private Set<File> walkDirectory(Path directory, final WatchEvent.Kind<?> kind) {
550-
final Set<File> walkedFiles = new LinkedHashSet<File>();
562+
final Set<File> walkedFiles = new LinkedHashSet<>();
551563
try {
552564
registerWatch(directory);
553565
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,7 @@ protected Object doReceive() {
161161
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
162162
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
163163
.setHeader(FileHeaders.REMOTE_FILE_INFO,
164-
this.fileInfoJson ? file.toJson() : file)
165-
.build();
164+
this.fileInfoJson ? file.toJson() : file);
166165
}
167166
catch (IOException e) {
168167
throw new MessagingException("IOException when retrieving " + remotePath, e);

spring-integration-file/src/main/java/org/springframework/integration/file/remote/synchronizer/AbstractInboundFileSynchronizingMessageSource.java

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter;
3636
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
3737
import org.springframework.integration.metadata.SimpleMetadataStore;
38-
import org.springframework.messaging.Message;
38+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3939
import org.springframework.util.Assert;
4040

4141
/**
@@ -63,14 +63,8 @@
6363
* @author Venil Noronha
6464
*/
6565
public abstract class AbstractInboundFileSynchronizingMessageSource<F>
66-
extends AbstractFetchLimitingMessageSource<File> implements Lifecycle {
67-
68-
private volatile boolean running;
69-
70-
/**
71-
* Should the endpoint attempt to create the local directory? True by default.
72-
*/
73-
private volatile boolean autoCreateLocalDirectory = true;
66+
extends AbstractFetchLimitingMessageSource<File>
67+
implements Lifecycle {
7468

7569
/**
7670
* An implementation that will handle the chores of actually connecting to and synchronizing
@@ -79,14 +73,21 @@ public abstract class AbstractInboundFileSynchronizingMessageSource<F>
7973
private final AbstractInboundFileSynchronizer<F> synchronizer;
8074

8175
/**
82-
* Directory to which things should be synchronized locally.
76+
* The actual {@link LocalFileReadingMessageSource} that monitors the local file system once files are synchronized.
8377
*/
84-
private volatile File localDirectory;
78+
private final LocalFileReadingMessageSource fileSource;
79+
80+
private volatile boolean running;
81+
82+
/**
83+
* Should the endpoint attempt to create the local directory? True by default.
84+
*/
85+
private volatile boolean autoCreateLocalDirectory = true;
8586

8687
/**
87-
* The actual {@link FileReadingMessageSource} that monitors the local file system once files are synchronized.
88+
* Directory to which things should be synchronized locally.
8889
*/
89-
private final FileReadingMessageSource fileSource;
90+
private volatile File localDirectory;
9091

9192
private volatile FileListFilter<File> localFileListFilter;
9293

@@ -101,13 +102,14 @@ public AbstractInboundFileSynchronizingMessageSource(AbstractInboundFileSynchron
101102

102103
public AbstractInboundFileSynchronizingMessageSource(AbstractInboundFileSynchronizer<F> synchronizer,
103104
Comparator<File> comparator) {
105+
104106
Assert.notNull(synchronizer, "synchronizer must not be null");
105107
this.synchronizer = synchronizer;
106108
if (comparator == null) {
107-
this.fileSource = new FileReadingMessageSource();
109+
this.fileSource = new LocalFileReadingMessageSource();
108110
}
109111
else {
110-
this.fileSource = new FileReadingMessageSource(comparator);
112+
this.fileSource = new LocalFileReadingMessageSource(comparator);
111113
}
112114
}
113115

@@ -243,20 +245,41 @@ public boolean isRunning() {
243245
* @param maxFetchSize the maximum files to fetch.
244246
*/
245247
@Override
246-
public final Message<File> doReceive(int maxFetchSize) {
247-
Message<File> message = this.fileSource.receive();
248-
if (message == null) {
248+
public final AbstractIntegrationMessageBuilder<File> doReceive(int maxFetchSize) {
249+
AbstractIntegrationMessageBuilder<File> messageBuilder = this.fileSource.doReceive();
250+
if (messageBuilder == null) {
249251
this.synchronizer.synchronizeToLocalDirectory(this.localDirectory, maxFetchSize);
250-
message = this.fileSource.receive();
252+
messageBuilder = this.fileSource.doReceive();
251253
}
252-
return message;
254+
255+
return messageBuilder;
253256
}
254257

255258
private FileListFilter<File> buildFilter() {
256259
Pattern completePattern = Pattern.compile("^.*(?<!" + this.synchronizer.getTemporaryFileSuffix() + ")$");
257-
return new CompositeFileListFilter<File>(Arrays.asList(
258-
this.localFileListFilter,
259-
new RegexPatternFileListFilter(completePattern)));
260+
return new CompositeFileListFilter<>(
261+
Arrays.asList(this.localFileListFilter, new RegexPatternFileListFilter(completePattern)));
262+
}
263+
264+
265+
/**
266+
* The {@link FileReadingMessageSource} extension to increase visibility
267+
* for the {@link FileReadingMessageSource#doReceive()}
268+
*/
269+
private static final class LocalFileReadingMessageSource extends FileReadingMessageSource {
270+
271+
LocalFileReadingMessageSource() {
272+
}
273+
274+
LocalFileReadingMessageSource(Comparator<File> receptionOrderComparator) {
275+
super(receptionOrderComparator);
276+
}
277+
278+
@Override
279+
protected AbstractIntegrationMessageBuilder<File> doReceive() {
280+
return super.doReceive();
281+
}
282+
260283
}
261284

262285
}

0 commit comments

Comments
 (0)