1
1
/*
2
- * Copyright 2002-2018 the original author or authors.
2
+ * Copyright 2002-2019 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
47
47
import org .springframework .integration .file .filters .FileListFilter ;
48
48
import org .springframework .integration .file .filters .ResettableFileListFilter ;
49
49
import org .springframework .integration .support .AbstractIntegrationMessageBuilder ;
50
+ import org .springframework .lang .Nullable ;
50
51
import org .springframework .messaging .Message ;
51
52
import org .springframework .util .Assert ;
52
53
@@ -103,23 +104,23 @@ public class FileReadingMessageSource extends AbstractMessageSource<File>
103
104
*/
104
105
private final Queue <File > toBeReceived ;
105
106
106
- private volatile File directory ;
107
+ private File directory ;
107
108
108
- private volatile DirectoryScanner scanner = new DefaultDirectoryScanner ();
109
+ private DirectoryScanner scanner = new DefaultDirectoryScanner ();
109
110
110
- private volatile boolean scannerExplicitlySet ;
111
+ private boolean scannerExplicitlySet ;
111
112
112
- private volatile boolean autoCreateDirectory = true ;
113
+ private boolean autoCreateDirectory = true ;
113
114
114
- private volatile boolean scanEachPoll = false ;
115
+ private boolean scanEachPoll = false ;
115
116
116
117
private FileListFilter <File > filter ;
117
118
118
119
private FileLocker locker ;
119
120
120
121
private boolean useWatchService ;
121
122
122
- private WatchEventType [] watchEvents = new WatchEventType [] { WatchEventType .CREATE };
123
+ private WatchEventType [] watchEvents = { WatchEventType .CREATE };
123
124
124
125
/**
125
126
* Creates a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.
@@ -132,7 +133,6 @@ public FileReadingMessageSource() {
132
133
* Creates a FileReadingMessageSource with a bounded queue of the given
133
134
* capacity. This can be used to reduce the memory footprint of this
134
135
* component when reading from a large directory.
135
- *
136
136
* @param internalQueueCapacity
137
137
* the size of the queue used to cache files to be received
138
138
* internally. This queue can be made larger to optimize the
@@ -151,26 +151,22 @@ public FileReadingMessageSource(int internalQueueCapacity) {
151
151
152
152
/**
153
153
* Creates a FileReadingMessageSource with a {@link PriorityBlockingQueue}
154
- * ordered with the passed in {@link Comparator}
155
- * <p>
156
- * The size of the queue used should be large enough to hold all the files
154
+ * ordered with the passed in {@link Comparator}.
155
+ * <p> The size of the queue used should be large enough to hold all the files
157
156
* in the input directory in order to sort all of them, so restricting the
158
157
* size of the queue is mutually exclusive with ordering. No guarantees
159
158
* about file delivery order can be made under concurrent access.
160
- * <p>
161
- *
162
159
* @param receptionOrderComparator
163
160
* the comparator to be used to order the files in the internal
164
161
* queue
165
162
*/
166
- public FileReadingMessageSource (Comparator <File > receptionOrderComparator ) {
163
+ public FileReadingMessageSource (@ Nullable Comparator <File > receptionOrderComparator ) {
167
164
this .toBeReceived = new PriorityBlockingQueue <>(DEFAULT_INTERNAL_QUEUE_CAPACITY , receptionOrderComparator );
168
165
}
169
166
170
167
171
168
/**
172
169
* Specify the input directory.
173
- *
174
170
* @param directory to monitor
175
171
*/
176
172
public void setDirectory (File directory ) {
@@ -181,7 +177,6 @@ public void setDirectory(File directory) {
181
177
/**
182
178
* Optionally specify a custom scanner, for example the
183
179
* {@link WatchServiceDirectoryScanner}
184
- *
185
180
* @param scanner scanner implementation
186
181
*/
187
182
public void setScanner (DirectoryScanner scanner ) {
@@ -207,7 +202,6 @@ public DirectoryScanner getScanner() {
207
202
* <em>true</em>. If set to <em>false</em> and the
208
203
* source directory does not exist, an Exception will be thrown upon
209
204
* initialization.
210
- *
211
205
* @param autoCreateDirectory
212
206
* should the directory to be monitored be created when this
213
207
* component starts up?
@@ -224,8 +218,7 @@ public void setAutoCreateDirectory(boolean autoCreateDirectory) {
224
218
* If multiple filters are required a
225
219
* {@link org.springframework.integration.file.filters.CompositeFileListFilter}
226
220
* can be used to group them together.
227
- * <p>
228
- * <b>The supplied filter must be thread safe.</b>.
221
+ * <p> <b>The supplied filter must be thread safe.</b>.
229
222
* @param filter a filter
230
223
*/
231
224
public void setFilter (FileListFilter <File > filter ) {
@@ -256,7 +249,6 @@ public void setLocker(FileLocker locker) {
256
249
* will more likely be out of sync with the file system if this flag is set
257
250
* to <code>false</code>, but it will change more often (causing expensive
258
251
* reordering) if it is set to <code>true</code>.
259
- *
260
252
* @param scanEachPoll
261
253
* whether or not the component should re-scan (as opposed to not
262
254
* rescanning until the entire backlog has been delivered)
@@ -304,15 +296,15 @@ public String getComponentType() {
304
296
@ Override
305
297
public void start () {
306
298
if (!this .running .getAndSet (true )) {
307
- if (!this .directory .exists () && this .autoCreateDirectory ) {
308
- this .directory . mkdirs ( );
299
+ if (!this .directory .exists () && this .autoCreateDirectory && ! this . directory . mkdirs () ) {
300
+ throw new IllegalStateException ( "Cannot create directory or ita parents: " + this .directory );
309
301
}
310
302
Assert .isTrue (this .directory .exists (),
311
- "Source directory [" + this .directory + "] does not exist." );
303
+ () -> "Source directory [" + this .directory + "] does not exist." );
312
304
Assert .isTrue (this .directory .isDirectory (),
313
- "Source path [" + this .directory + "] does not point to a directory." );
305
+ () -> "Source path [" + this .directory + "] does not point to a directory." );
314
306
Assert .isTrue (this .directory .canRead (),
315
- "Source directory [" + this .directory + "] is not readable." );
307
+ () -> "Source directory [" + this .directory + "] is not readable." );
316
308
if (this .scanner instanceof Lifecycle ) {
317
309
((Lifecycle ) this .scanner ).start ();
318
310
}
@@ -336,7 +328,7 @@ protected void onInit() {
336
328
Assert .notNull (this .directory , "'directory' must not be null" );
337
329
338
330
Assert .state (!(this .scannerExplicitlySet && this .useWatchService ),
339
- "The 'scanner' and 'useWatchService' options are mutually exclusive: " + this .scanner );
331
+ () -> "The 'scanner' and 'useWatchService' options are mutually exclusive: " + this .scanner );
340
332
341
333
if (this .useWatchService ) {
342
334
this .scanner = new WatchServiceDirectoryScanner ();
@@ -345,8 +337,8 @@ protected void onInit() {
345
337
// Check that the filter and locker options are _NOT_ set if an external scanner has been set.
346
338
// The external scanner is responsible for the filter and locker options in that case.
347
339
Assert .state (!(this .scannerExplicitlySet && (this .filter != null || this .locker != null )),
348
- "When using an external scanner the 'filter' and 'locker' options should not be used. Instead, set these options on the external DirectoryScanner: "
349
- + this .scanner );
340
+ () -> "When using an external scanner the 'filter' and 'locker' options should not be used. " +
341
+ "Instead, set these options on the external DirectoryScanner: " + this .scanner );
350
342
if (this .filter != null ) {
351
343
this .scanner .setFilter (this .filter );
352
344
}
@@ -492,7 +484,7 @@ protected File[] listEligibleFiles(File directory) {
492
484
493
485
files .addAll (filesFromEvents ());
494
486
495
- return files .toArray (new File [files . size () ]);
487
+ return files .toArray (new File [0 ]);
496
488
}
497
489
498
490
private Set <File > filesFromEvents () {
@@ -504,57 +496,11 @@ private Set<File> filesFromEvents() {
504
496
if (event .kind () == StandardWatchEventKinds .ENTRY_CREATE ||
505
497
event .kind () == StandardWatchEventKinds .ENTRY_MODIFY ||
506
498
event .kind () == StandardWatchEventKinds .ENTRY_DELETE ) {
507
- Path item = (Path ) event .context ();
508
- File file = new File (parentDir , item .toFile ().getName ());
509
- if (logger .isDebugEnabled ()) {
510
- logger .debug ("Watch event [" + event .kind () + "] for file [" + file + "]" );
511
- }
512
499
513
- if (event .kind () == StandardWatchEventKinds .ENTRY_DELETE ) {
514
- if (getFilter () instanceof ResettableFileListFilter ) {
515
- ((ResettableFileListFilter <File >) getFilter ()).remove (file );
516
- }
517
- boolean fileRemoved = files .remove (file );
518
- if (fileRemoved && logger .isDebugEnabled ()) {
519
- logger .debug ("The file [" + file +
520
- "] has been removed from the queue because of DELETE event." );
521
- }
522
- }
523
- else {
524
- if (file .exists ()) {
525
- if (file .isDirectory ()) {
526
- files .addAll (walkDirectory (file .toPath (), event .kind ()));
527
- }
528
- else {
529
- files .remove (file );
530
- files .add (file );
531
- }
532
- }
533
- else {
534
- if (logger .isDebugEnabled ()) {
535
- logger .debug ("A file [" + file + "] for the event [" + event .kind () +
536
- "] doesn't exist. Ignored." );
537
- }
538
- }
539
- }
500
+ processFilesFromNormalEvent (files , parentDir , event );
540
501
}
541
502
else if (event .kind () == StandardWatchEventKinds .OVERFLOW ) {
542
- if (logger .isDebugEnabled ()) {
543
- logger .debug ("Watch event [" + StandardWatchEventKinds .OVERFLOW +
544
- "] with context [" + event .context () + "]" );
545
- }
546
-
547
- for (WatchKey watchKey : this .pathKeys .values ()) {
548
- watchKey .cancel ();
549
- }
550
- this .pathKeys .clear ();
551
-
552
- if (event .context () != null && event .context () instanceof Path ) {
553
- files .addAll (walkDirectory ((Path ) event .context (), event .kind ()));
554
- }
555
- else {
556
- files .addAll (walkDirectory (FileReadingMessageSource .this .directory .toPath (), event .kind ()));
557
- }
503
+ processFilesFromOverflowEvent (files , event );
558
504
}
559
505
}
560
506
key .reset ();
@@ -563,6 +509,61 @@ else if (event.kind() == StandardWatchEventKinds.OVERFLOW) {
563
509
return files ;
564
510
}
565
511
512
+ private void processFilesFromNormalEvent (Set <File > files , File parentDir , WatchEvent <?> event ) {
513
+ Path item = (Path ) event .context ();
514
+ File file = new File (parentDir , item .toFile ().getName ());
515
+ if (logger .isDebugEnabled ()) {
516
+ logger .debug ("Watch event [" + event .kind () + "] for file [" + file + "]" );
517
+ }
518
+
519
+ if (event .kind () == StandardWatchEventKinds .ENTRY_DELETE ) {
520
+ if (getFilter () instanceof ResettableFileListFilter ) {
521
+ ((ResettableFileListFilter <File >) getFilter ()).remove (file );
522
+ }
523
+ boolean fileRemoved = files .remove (file );
524
+ if (fileRemoved && logger .isDebugEnabled ()) {
525
+ logger .debug ("The file [" + file +
526
+ "] has been removed from the queue because of DELETE event." );
527
+ }
528
+ }
529
+ else {
530
+ if (file .exists ()) {
531
+ if (file .isDirectory ()) {
532
+ files .addAll (walkDirectory (file .toPath (), event .kind ()));
533
+ }
534
+ else {
535
+ files .remove (file );
536
+ files .add (file );
537
+ }
538
+ }
539
+ else {
540
+ if (logger .isDebugEnabled ()) {
541
+ logger .debug ("A file [" + file + "] for the event [" + event .kind () +
542
+ "] doesn't exist. Ignored." );
543
+ }
544
+ }
545
+ }
546
+ }
547
+
548
+ private void processFilesFromOverflowEvent (Set <File > files , WatchEvent <?> event ) {
549
+ if (logger .isDebugEnabled ()) {
550
+ logger .debug ("Watch event [" + StandardWatchEventKinds .OVERFLOW +
551
+ "] with context [" + event .context () + "]" );
552
+ }
553
+
554
+ for (WatchKey watchKey : this .pathKeys .values ()) {
555
+ watchKey .cancel ();
556
+ }
557
+ this .pathKeys .clear ();
558
+
559
+ if (event .context () != null && event .context () instanceof Path ) {
560
+ files .addAll (walkDirectory ((Path ) event .context (), event .kind ()));
561
+ }
562
+ else {
563
+ files .addAll (walkDirectory (FileReadingMessageSource .this .directory .toPath (), event .kind ()));
564
+ }
565
+ }
566
+
566
567
private Set <File > walkDirectory (Path directory , final WatchEvent .Kind <?> kind ) {
567
568
final Set <File > walkedFiles = new LinkedHashSet <>();
568
569
try {
0 commit comments