Skip to content

Commit 13a8623

Browse files
artembilangaryrussell
authored andcommitted
GH-3027: Decouple RotationPolicy
Fixes #3027 This is a polishing for PR #3029 * Move `KeyDirectory` in to the `RotationPolicy` since it looks like fully coupled concept with that abstraction and the class name is so bad for the public API * Remove `AbstractStandardRotationPolicy` in favor of a `StandardRotationPolicy` extendability * Change `ftp.adoc` and `sftp.adoc` to reflect a new API reality * Mantion these changes in the `whats-new.adoc`
1 parent a66b5ed commit 13a8623

File tree

8 files changed

+109
-131
lines changed

8 files changed

+109
-131
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/KeyDirectory.java

Lines changed: 0 additions & 51 deletions
This file was deleted.

spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotatingServerAdvice.java

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020

2121
import org.springframework.integration.aop.AbstractMessageSourceAdvice;
2222
import org.springframework.integration.core.MessageSource;
23-
import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource;
2423
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
25-
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource;
2624
import org.springframework.messaging.Message;
2725
import org.springframework.util.Assert;
2826

@@ -45,20 +43,22 @@ public class RotatingServerAdvice extends AbstractMessageSourceAdvice {
4543
* Create an instance that rotates to the next server/directory if no message is
4644
* received.
4745
* @param factory the {@link DelegatingSessionFactory}.
48-
* @param keyDirectories a list of {@link KeyDirectory}.
46+
* @param keyDirectories a list of {@link RotationPolicy.KeyDirectory}.
4947
*/
50-
public RotatingServerAdvice(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories) {
48+
public RotatingServerAdvice(DelegatingSessionFactory<?> factory, List<RotationPolicy.KeyDirectory> keyDirectories) {
5149
this(factory, keyDirectories, false);
5250
}
5351

5452
/**
5553
* Create an instance that rotates to the next server/directory depending on the fair
5654
* argument.
5755
* @param factory the {@link DelegatingSessionFactory}.
58-
* @param keyDirectories a list of {@link KeyDirectory}.
56+
* @param keyDirectories a list of {@link RotationPolicy.KeyDirectory}.
5957
* @param fair true to rotate on every poll, false to rotate when no message is received.
6058
*/
61-
public RotatingServerAdvice(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories, boolean fair) {
59+
public RotatingServerAdvice(DelegatingSessionFactory<?> factory, List<RotationPolicy.KeyDirectory> keyDirectories,
60+
boolean fair) {
61+
6262
this(new StandardRotationPolicy(factory, keyDirectories, fair));
6363
}
6464

@@ -84,29 +84,4 @@ public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
8484
return result;
8585
}
8686

87-
public static class StandardRotationPolicy extends AbstractStandardRotationPolicy {
88-
89-
90-
public StandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories,
91-
boolean fair) {
92-
super(factory, keyDirectories, fair);
93-
}
94-
95-
@Override
96-
protected void onRotation(MessageSource<?> source) {
97-
Assert.isTrue(source instanceof AbstractInboundFileSynchronizingMessageSource
98-
|| source instanceof AbstractRemoteFileStreamingMessageSource,
99-
"source must be an AbstractInboundFileSynchronizingMessageSource or a "
100-
+ "AbstractRemoteFileStreamingMessageSource");
101-
102-
if (source instanceof AbstractRemoteFileStreamingMessageSource) {
103-
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(getCurrent().getDirectory());
104-
}
105-
else {
106-
((AbstractInboundFileSynchronizingMessageSource<?>) source).getSynchronizer()
107-
.setRemoteDirectory(getCurrent().getDirectory());
108-
}
109-
}
110-
111-
}
11287
}

spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,18 @@
1717
package org.springframework.integration.file.remote.aop;
1818

1919
import org.springframework.integration.core.MessageSource;
20+
import org.springframework.util.Assert;
2021

2122
/**
22-
* Implementations can reconfigure the message source before and/or after
23-
* a poll.
23+
* A strategy for rotating advices to allow reconfiguring
24+
* the message source before and/or after a poll.
2425
*
2526
* @author Gary Russell
2627
* @author Michael Forstner
2728
* @author Artem Bilan
2829
* @author David Turanski
2930
*
30-
* @since 5.0.7
31+
* @since 5.2
3132
*/
3233
public interface RotationPolicy {
3334

@@ -44,11 +45,42 @@ public interface RotationPolicy {
4445
*/
4546
void afterReceive(boolean messageReceived, MessageSource<?> source);
4647

47-
4848
/**
49-
*
49+
* Return the current {@link KeyDirectory}.
5050
* @return the current {@link KeyDirectory}
51+
* @since 5.2
5152
*/
5253
KeyDirectory getCurrent();
5354

55+
/**
56+
* A key for a thread-local store and its related directory pair.
57+
*/
58+
class KeyDirectory {
59+
60+
private final Object key;
61+
62+
private final String directory;
63+
64+
public KeyDirectory(Object key, String directory) {
65+
Assert.notNull(key, "key cannot be null");
66+
Assert.notNull(directory, "directory cannot be null");
67+
this.key = key;
68+
this.directory = directory;
69+
}
70+
71+
public Object getKey() {
72+
return this.key;
73+
}
74+
75+
public String getDirectory() {
76+
return this.directory;
77+
}
78+
79+
@Override
80+
public String toString() {
81+
return "KeyDirectory [key=" + this.key.toString() + ", directory=" + this.directory + "]";
82+
}
83+
84+
}
85+
5486
}
Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2018-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,14 +17,17 @@
1717
package org.springframework.integration.file.remote.aop;
1818

1919
import java.util.ArrayList;
20+
import java.util.Collections;
2021
import java.util.Iterator;
2122
import java.util.List;
2223

2324
import org.apache.commons.logging.Log;
2425
import org.apache.commons.logging.LogFactory;
2526

2627
import org.springframework.integration.core.MessageSource;
28+
import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource;
2729
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
30+
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource;
2831
import org.springframework.util.Assert;
2932

3033
/**
@@ -41,13 +44,13 @@
4144
* @author Artem Bilan
4245
* @author David Turanski
4346
*
44-
* @since 5.1.8
47+
* @since 5.2
4548
*/
46-
public abstract class AbstractStandardRotationPolicy implements RotationPolicy {
49+
public class StandardRotationPolicy implements RotationPolicy {
4750

4851
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final
4952

50-
private final DelegatingSessionFactory<?> factory; // NOSONAR final
53+
private final DelegatingSessionFactory<?> factory;
5154

5255
private final List<KeyDirectory> keyDirectories = new ArrayList<>();
5356

@@ -59,12 +62,12 @@ public abstract class AbstractStandardRotationPolicy implements RotationPolicy {
5962

6063
private volatile boolean initialized;
6164

62-
protected AbstractStandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories,
65+
public StandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories,
6366
boolean fair) {
6467

6568
Assert.notNull(factory, "factory cannot be null");
6669
Assert.notNull(keyDirectories, "keyDirectories cannot be null");
67-
Assert.isTrue(keyDirectories.size() > 0, "At least one KeyDirectory is required");
70+
Assert.isTrue(!keyDirectories.isEmpty(), "At least one KeyDirectory is required");
6871
this.factory = factory;
6972
this.keyDirectories.addAll(keyDirectories);
7073
this.fair = fair;
@@ -107,7 +110,7 @@ protected DelegatingSessionFactory<?> getFactory() {
107110
}
108111

109112
protected List<KeyDirectory> getKeyDirectories() {
110-
return this.keyDirectories;
113+
return Collections.unmodifiableList(this.keyDirectories);
111114
}
112115

113116
protected boolean isFair() {
@@ -123,14 +126,26 @@ protected boolean isInitialized() {
123126
}
124127

125128
protected void configureSource(MessageSource<?> source) {
126-
127129
if (!this.iterator.hasNext()) {
128130
this.iterator = this.keyDirectories.iterator();
129131
}
130132
this.current = this.iterator.next();
131-
132133
onRotation(source);
133134
}
134135

135-
protected abstract void onRotation(MessageSource<?> source);
136+
protected void onRotation(MessageSource<?> source) {
137+
Assert.isTrue(source instanceof AbstractInboundFileSynchronizingMessageSource
138+
|| source instanceof AbstractRemoteFileStreamingMessageSource,
139+
"source must be an AbstractInboundFileSynchronizingMessageSource or a "
140+
+ "AbstractRemoteFileStreamingMessageSource");
141+
142+
if (source instanceof AbstractRemoteFileStreamingMessageSource) {
143+
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(getCurrent().getDirectory());
144+
}
145+
else {
146+
((AbstractInboundFileSynchronizingMessageSource<?>) source).getSynchronizer()
147+
.setRemoteDirectory(getCurrent().getDirectory());
148+
}
149+
}
150+
136151
}

spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
import org.springframework.integration.dsl.MessageChannels;
4444
import org.springframework.integration.dsl.Pollers;
4545
import org.springframework.integration.dsl.StandardIntegrationFlow;
46-
import org.springframework.integration.file.remote.aop.KeyDirectory;
4746
import org.springframework.integration.file.remote.aop.RotatingServerAdvice;
47+
import org.springframework.integration.file.remote.aop.RotationPolicy;
4848
import org.springframework.integration.file.remote.session.CachingSessionFactory;
4949
import org.springframework.integration.file.remote.session.DefaultSessionFactoryLocator;
5050
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
@@ -227,17 +227,17 @@ public DelegatingSessionFactory<FTPFile> sf() {
227227

228228
@Bean
229229
public RotatingServerAdvice advice() {
230-
List<KeyDirectory> keyDirectories = new ArrayList<>();
231-
keyDirectories.add(new KeyDirectory("one", "foo"));
232-
keyDirectories.add(new KeyDirectory("one", "bar"));
233-
keyDirectories.add(new KeyDirectory("two", "baz"));
234-
keyDirectories.add(new KeyDirectory("two", "qux"));
235-
keyDirectories.add(new KeyDirectory("three", "fiz"));
236-
keyDirectories.add(new KeyDirectory("three", "buz"));
230+
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
231+
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
232+
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
233+
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
234+
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
235+
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
236+
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
237237
return theAdvice(keyDirectories);
238238
}
239239

240-
protected RotatingServerAdvice theAdvice(List<KeyDirectory> keyDirectories) {
240+
protected RotatingServerAdvice theAdvice(List<RotationPolicy.KeyDirectory> keyDirectories) {
241241
return new RotatingServerAdvice(sf(), keyDirectories);
242242
}
243243

@@ -262,7 +262,7 @@ protected File localDir() {
262262
public static class FairConfig extends StandardConfig {
263263

264264
@Override
265-
protected RotatingServerAdvice theAdvice(List<KeyDirectory> keyDirectories) {
265+
protected RotatingServerAdvice theAdvice(List<RotationPolicy.KeyDirectory> keyDirectories) {
266266
return new RotatingServerAdvice(sf(), keyDirectories, true);
267267
}
268268

src/reference/asciidoc/ftp.adoc

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -706,20 +706,20 @@ Notice that, in this example, the message handler downstream of the transformer
706706
Starting with _version 5.0.7_, the `RotatingServerAdvice` is available; when configured as a poller advice, the inbound adapters can poll multiple servers and directories.
707707
Configure the advice and add it to the poller's advice chain as normal.
708708
A `DelegatingSessionFactory` is used to select the server see <<ftp-dsf>> for more information.
709-
The advice configuration consists of a list of `RotatingServerAdvice.KeyDirectory` objects.
709+
The advice configuration consists of a list of `RotationPolicy.KeyDirectory` objects.
710710

711711
.Example
712712
[source, java]
713713
----
714714
@Bean
715715
public RotatingServerAdvice advice() {
716-
List<KeyDirectory> keyDirectories = new ArrayList<>();
717-
keyDirectories.add(new KeyDirectory("one", "foo"));
718-
keyDirectories.add(new KeyDirectory("one", "bar"));
719-
keyDirectories.add(new KeyDirectory("two", "baz"));
720-
keyDirectories.add(new KeyDirectory("two", "qux"));
721-
keyDirectories.add(new KeyDirectory("three", "fiz"));
722-
keyDirectories.add(new KeyDirectory("three", "buz"));
716+
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
717+
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
718+
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
719+
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
720+
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
721+
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
722+
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
723723
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
724724
}
725725
----
@@ -740,7 +740,7 @@ public RotatingServerAdvice advice() {
740740

741741
In this case, the advice will move to the next server/directory regardless of whether the previous poll returned a file.
742742

743-
Alternatively, you can provide your own `RotatingServerAdvice.RotationPolicy` to reconfigure the message source as needed:
743+
Alternatively, you can provide your own `RotationPolicy` to reconfigure the message source as needed:
744744

745745
.policy
746746
[source, java]

0 commit comments

Comments
 (0)