@@ -365,13 +365,13 @@ protected void doInit() {
365
365
this .evaluationContext = ExpressionUtils .createStandardEvaluationContext (getBeanFactory ());
366
366
367
367
if (this .destinationDirectoryExpression instanceof LiteralExpression ) {
368
- final File directory = new File ( this .destinationDirectoryExpression .getValue (
369
- this . evaluationContext , null , String . class ) );
368
+ String directoryPath = this .destinationDirectoryExpression .getValue (this . evaluationContext , String . class );
369
+ final File directory = new File ( directoryPath );
370
370
validateDestinationDirectory (directory , this .autoCreateDirectory );
371
371
}
372
372
373
373
Assert .state (!(this .temporaryFileSuffixSet
374
- && (FileExistsMode .APPEND .equals (this .fileExistsMode )
374
+ && (FileExistsMode .APPEND .equals (this .fileExistsMode )
375
375
|| FileExistsMode .APPEND_NO_FLUSH .equals (this .fileExistsMode ))),
376
376
"'temporaryFileSuffix' can not be set when appending to an existing file" );
377
377
@@ -391,12 +391,32 @@ public void start() {
391
391
}
392
392
393
393
@ Override
394
- public synchronized void stop () {
395
- if (this .flushTask != null ) {
396
- this .flushTask .cancel (true );
397
- this .flushTask = null ;
394
+ public void stop () {
395
+ synchronized (this ) {
396
+ if (this .flushTask != null ) {
397
+ this .flushTask .cancel (true );
398
+ this .flushTask = null ;
399
+ }
400
+ }
401
+ Flusher flusher = new Flusher ();
402
+ flusher .run ();
403
+ boolean needInterrupt = this .fileStates .size () > 0 ;
404
+ int n = 0 ;
405
+ while (n ++ < 10 && this .fileStates .size () > 0 ) {
406
+ try {
407
+ Thread .sleep (1 );
408
+ }
409
+ catch (InterruptedException e ) {
410
+ // cancel the interrupt
411
+ }
412
+ flusher .run ();
413
+ }
414
+ if (this .fileStates .size () > 0 ) {
415
+ this .logger .error ("Failed to flush after multiple attempts, while stopping: " + this .fileStates .keySet ());
416
+ }
417
+ if (needInterrupt ) {
418
+ Thread .currentThread ().interrupt ();
398
419
}
399
- new Flusher ().run ();
400
420
}
401
421
402
422
@ Override
@@ -529,7 +549,7 @@ private File handleFileMessage(final File sourceFile, File tempFile, final File
529
549
}
530
550
531
551
private File handleInputStreamMessage (final InputStream sourceFileInputStream , File originalFile , File tempFile ,
532
- final File resultFile ) throws IOException {
552
+ final File resultFile ) throws IOException {
533
553
final boolean append = FileExistsMode .APPEND .equals (this .fileExistsMode )
534
554
|| FileExistsMode .APPEND_NO_FLUSH .equals (this .fileExistsMode );
535
555
@@ -796,10 +816,10 @@ else if (destinationDirectoryToUse instanceof File) {
796
816
797
817
private synchronized FileState getFileState (final File fileToWriteTo , boolean isString )
798
818
throws FileNotFoundException {
799
- String absolutePath = fileToWriteTo .getAbsolutePath ();
800
819
FileState state ;
801
820
boolean appendNoFlush = FileExistsMode .APPEND_NO_FLUSH .equals (this .fileExistsMode );
802
821
if (appendNoFlush ) {
822
+ String absolutePath = fileToWriteTo .getAbsolutePath ();
803
823
state = this .fileStates .get (absolutePath );
804
824
if (state != null && ((isString && state .stream != null ) || (!isString && state .writer != null ))) {
805
825
state .close ();
@@ -872,16 +892,19 @@ public void trigger(Message<?> message) {
872
892
* @param flushPredicate the {@link FlushPredicate}.
873
893
* @since 4.3
874
894
*/
875
- public synchronized void flushIfNeeded (FlushPredicate flushPredicate ) {
876
- Iterator <Entry <String , FileState >> iterator = FileWritingMessageHandler .this .fileStates .entrySet ().iterator ();
877
- while (iterator .hasNext ()) {
878
- Entry <String , FileState > entry = iterator .next ();
879
- FileState state = entry .getValue ();
880
- if (flushPredicate .shouldFlush (entry .getKey (), state .lastWrite )) {
881
- iterator .remove ();
882
- state .close ();
883
- }
884
- }
895
+ public void flushIfNeeded (final FlushPredicate flushPredicate ) {
896
+ flushIfNeeded (new MessageFlushPredicate () {
897
+
898
+ @ Override
899
+ public boolean shouldFlush (String fileAbsolutePath , long lastWrite ,
900
+ Message <?> filterMessage ) {
901
+
902
+ return flushPredicate .shouldFlush (fileAbsolutePath , lastWrite );
903
+
904
+ }
905
+
906
+ },
907
+ null );
885
908
}
886
909
887
910
/**
@@ -893,16 +916,24 @@ public synchronized void flushIfNeeded(FlushPredicate flushPredicate) {
893
916
* @param filterMessage an optional message passed into the predicate.
894
917
* @since 4.3
895
918
*/
896
- public synchronized void flushIfNeeded (MessageFlushPredicate flushPredicate , Message <?> filterMessage ) {
897
- Iterator <Entry <String , FileState >> iterator = FileWritingMessageHandler .this .fileStates .entrySet ().iterator ();
898
- while (iterator .hasNext ()) {
899
- Entry <String , FileState > entry = iterator .next ();
900
- FileState state = entry .getValue ();
901
- if (flushPredicate .shouldFlush (entry .getKey (), state .lastWrite , filterMessage )) {
902
- iterator .remove ();
903
- state .close ();
919
+ public void flushIfNeeded (MessageFlushPredicate flushPredicate , Message <?> filterMessage ) {
920
+ doFlush (findFilesToFlush (flushPredicate , filterMessage ));
921
+ }
922
+
923
+ private Map <String , FileState > findFilesToFlush (MessageFlushPredicate flushPredicate , Message <?> filterMessage ) {
924
+ Map <String , FileState > toRemove = new HashMap <String , FileState >();
925
+ synchronized (this ) {
926
+ Iterator <Entry <String , FileState >> iterator = this .fileStates .entrySet ().iterator ();
927
+ while (iterator .hasNext ()) {
928
+ Entry <String , FileState > entry = iterator .next ();
929
+ FileState state = entry .getValue ();
930
+ if (flushPredicate .shouldFlush (entry .getKey (), state .lastWrite , filterMessage )) {
931
+ iterator .remove ();
932
+ toRemove .put (entry .getKey (), state );
933
+ }
904
934
}
905
935
}
936
+ return toRemove ;
906
937
}
907
938
908
939
private synchronized void clearState (final File fileToWriteTo , final FileState state ) {
@@ -911,6 +942,33 @@ private synchronized void clearState(final File fileToWriteTo, final FileState s
911
942
}
912
943
}
913
944
945
+ private void doFlush (Map <String , FileState > toRemove ) {
946
+ Map <String , FileState > toRestore = new HashMap <String , FileState >();
947
+ boolean interrupted = false ;
948
+ for (Entry <String , FileState > entry : toRemove .entrySet ()) {
949
+ if (!interrupted && entry .getValue ().close ()) {
950
+ if (FileWritingMessageHandler .this .logger .isDebugEnabled ()) {
951
+ FileWritingMessageHandler .this .logger .debug ("Flushed: " + entry .getKey ());
952
+ }
953
+ }
954
+ else { // interrupted (stop), re-add
955
+ interrupted = true ;
956
+ toRestore .put (entry .getKey (), entry .getValue ());
957
+ }
958
+ }
959
+ if (interrupted ) {
960
+ if (FileWritingMessageHandler .this .logger .isDebugEnabled ()) {
961
+ FileWritingMessageHandler .this .logger
962
+ .debug ("Interrupted during flush; not flushed: " + toRestore .keySet ());
963
+ }
964
+ synchronized (this ) {
965
+ for (Entry <String , FileState > entry : toRestore .entrySet ()) {
966
+ this .fileStates .putIfAbsent (entry .getKey (), entry .getValue ());
967
+ }
968
+ }
969
+ }
970
+ }
971
+
914
972
private static boolean rename (File source , File target ) throws IOException {
915
973
return (nioFilesPresent && filesMove (source , target )) || source .renameTo (target );
916
974
}
@@ -925,7 +983,7 @@ private static final class FileState {
925
983
926
984
private final BufferedWriter writer ;
927
985
928
- private final BufferedOutputStream stream ;
986
+ private final BufferedOutputStream stream ;
929
987
930
988
private final Lock lock ;
931
989
@@ -975,6 +1033,7 @@ private final class Flusher implements Runnable {
975
1033
976
1034
@ Override
977
1035
public void run () {
1036
+ Map <String , FileState > toRemove = new HashMap <String , FileState >();
978
1037
synchronized (FileWritingMessageHandler .this ) {
979
1038
long expired = FileWritingMessageHandler .this .flushTask == null ? Long .MAX_VALUE
980
1039
: (System .currentTimeMillis () - FileWritingMessageHandler .this .flushInterval );
@@ -984,18 +1043,12 @@ public void run() {
984
1043
FileState state = entry .getValue ();
985
1044
if (state .lastWrite < expired ||
986
1045
(!FileWritingMessageHandler .this .flushWhenIdle && state .firstWrite < expired )) {
987
- if (state .close ()) {
988
- if (FileWritingMessageHandler .this .logger .isDebugEnabled ()) {
989
- FileWritingMessageHandler .this .logger .debug ("Flushed: " + entry .getKey ());
990
- }
991
- iterator .remove ();
992
- }
993
- else {
994
- break ; // interrupted
995
- }
1046
+ toRemove .put (entry .getKey (), state );
1047
+ iterator .remove ();
996
1048
}
997
1049
}
998
1050
}
1051
+ doFlush (toRemove );
999
1052
}
1000
1053
1001
1054
}
0 commit comments