Skip to content

Commit d1b3046

Browse files
artembilangaryrussell
authored andcommitted
INT-4550: Disallow multi aggregators on same MGS (#2622)
* INT-4550: Disallow multi aggregators on same MGS JIRA: https://jira.spring.io/browse/INT-4550 **Cherry-pick to 5.0.x** * * Introduce `UniqueExpiryCallback` * Use `UniqueExpiryCallback` in the `AbstractCorrelatingMessageHandler` * Check for uniqueness in the `AbstractMessageGroupStore` * Remove duplicate code in the `ConfigurableMongoDbMessageStore` * * Fix tests according a new logic * * Address PR review * Change `Assert.isTrue` to the `logger.error` for backward compatibility * Revert changes in tests since we don't throw exception anymore * Fix language on doc * * Fix Checkstyle violation in the `AbstractMessageGroupStore` * * Ignore `testDontReapMessageOfOtherHandler()`
1 parent 1e8da3a commit d1b3046

File tree

6 files changed

+76
-125
lines changed

6 files changed

+76
-125
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Date;
2323
import java.util.List;
2424
import java.util.Map;
25-
import java.util.Set;
2625
import java.util.UUID;
2726
import java.util.concurrent.ConcurrentHashMap;
2827
import java.util.concurrent.ScheduledFuture;
@@ -51,6 +50,7 @@
5150
import org.springframework.integration.store.MessageStore;
5251
import org.springframework.integration.store.SimpleMessageGroup;
5352
import org.springframework.integration.store.SimpleMessageStore;
53+
import org.springframework.integration.store.UniqueExpiryCallback;
5454
import org.springframework.integration.support.locks.DefaultLockRegistry;
5555
import org.springframework.integration.support.locks.LockRegistry;
5656
import org.springframework.integration.util.UUIDConverter;
@@ -100,8 +100,6 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
100100

101101
private final Map<UUID, ScheduledFuture<?>> expireGroupScheduledFutures = new ConcurrentHashMap<>();
102102

103-
private final Set<Object> groupIds = ConcurrentHashMap.newKeySet();
104-
105103
private MessageGroupProcessor outputProcessor;
106104

107105
private MessageGroupStore messageStore;
@@ -173,8 +171,9 @@ public void setLockRegistry(LockRegistry lockRegistry) {
173171

174172
public final void setMessageStore(MessageGroupStore store) {
175173
this.messageStore = store;
176-
store.registerMessageGroupExpiryCallback(
177-
(messageGroupStore, group) -> this.forceReleaseProcessor.processMessageGroup(group));
174+
UniqueExpiryCallback expiryCallback =
175+
(messageGroupStore, group) -> this.forceReleaseProcessor.processMessageGroup(group);
176+
store.registerMessageGroupExpiryCallback(expiryCallback);
178177
}
179178

180179
public void setCorrelationStrategy(CorrelationStrategy correlationStrategy) {
@@ -687,7 +686,6 @@ protected void forceComplete(MessageGroup group) {
687686
protected void remove(MessageGroup group) {
688687
Object correlationKey = group.getGroupId();
689688
this.messageStore.removeMessageGroup(correlationKey);
690-
this.groupIds.remove(group.getGroupId());
691689
}
692690

693691
protected int findLastReleasedSequenceNumber(Object groupId, Collection<Message<?>> partialSequence) {
@@ -696,7 +694,6 @@ protected int findLastReleasedSequenceNumber(Object groupId, Collection<Message<
696694
}
697695

698696
protected MessageGroup store(Object correlationKey, Message<?> message) {
699-
this.groupIds.add(correlationKey);
700697
return this.messageStore.addMessageToGroup(correlationKey, message);
701698
}
702699

@@ -866,9 +863,7 @@ private class ForceReleaseMessageGroupProcessor implements MessageGroupProcessor
866863

867864
@Override
868865
public Object processMessageGroup(MessageGroup group) {
869-
if (AbstractCorrelatingMessageHandler.this.groupIds.contains(group.getGroupId())) {
870-
forceComplete(group);
871-
}
866+
forceComplete(group);
872867
return null;
873868
}
874869

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageG
4242

4343
protected final Log logger = LogFactory.getLog(getClass());
4444

45-
private final Collection<MessageGroupCallback> expiryCallbacks = new LinkedHashSet<MessageGroupCallback>();
45+
private final Collection<MessageGroupCallback> expiryCallbacks = new LinkedHashSet<>();
4646

4747
private final MessageGroupFactory persistentMessageGroupFactory =
4848
new SimpleMessageGroupFactory(SimpleMessageGroupFactory.GroupType.PERSISTENT);
@@ -72,13 +72,10 @@ protected MessageGroupFactory getMessageGroupFactory() {
7272
/**
7373
* Convenient injection point for expiry callbacks in the message store. Each of the callbacks provided will simply
7474
* be registered with the store using {@link #registerMessageGroupExpiryCallback(MessageGroupCallback)}.
75-
*
7675
* @param expiryCallbacks the expiry callbacks to add
7776
*/
7877
public void setExpiryCallbacks(Collection<MessageGroupCallback> expiryCallbacks) {
79-
for (MessageGroupCallback callback : expiryCallbacks) {
80-
registerMessageGroupExpiryCallback(callback);
81-
}
78+
expiryCallbacks.forEach(this::registerMessageGroupExpiryCallback);
8279
}
8380

8481
public boolean isTimeoutOnIdle() {
@@ -110,6 +107,17 @@ public void setLazyLoadMessageGroups(boolean lazyLoadMessageGroups) {
110107

111108
@Override
112109
public void registerMessageGroupExpiryCallback(MessageGroupCallback callback) {
110+
if (callback instanceof UniqueExpiryCallback) {
111+
boolean uniqueExpiryCallbackPresent =
112+
this.expiryCallbacks.stream()
113+
.anyMatch(UniqueExpiryCallback.class::isInstance);
114+
115+
if (!uniqueExpiryCallbackPresent && this.logger.isErrorEnabled()) {
116+
this.logger.error("Only one instance of 'UniqueExpiryCallback' can be registered in the " +
117+
"'MessageGroupStore'. Use a separate 'MessageGroupStore' for each aggregator/resequencer.");
118+
}
119+
}
120+
113121
this.expiryCallbacks.add(callback);
114122
}
115123

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.store;
18+
19+
/**
20+
* A marker interface extension of the {@link MessageGroupStore.MessageGroupCallback}
21+
* for components which should be registered in the {@link MessageGroupStore} only once.
22+
* The {@link MessageGroupStore} implementation ensures that only once instance of this
23+
* class is present in the expire callbacks.
24+
*
25+
* @author Meherzad Lahewala
26+
* @author Artme Bilan
27+
*
28+
* @since 5.0.10
29+
*/
30+
@FunctionalInterface
31+
public interface UniqueExpiryCallback extends MessageGroupStore.MessageGroupCallback {
32+
33+
}

spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.Executors;
3636
import java.util.concurrent.TimeUnit;
3737

38+
import org.junit.Ignore;
3839
import org.junit.Test;
3940

4041
import org.springframework.beans.DirectFieldAccessor;
@@ -422,7 +423,8 @@ public void testScheduleRemoveAnEmptyGroupAfterConfiguredDelay() throws Exceptio
422423
}
423424

424425
@Test
425-
public void testDontReapMessageOfOtherHandler() throws Exception {
426+
@Ignore("Until 5.2 with new 'owner' feature on groups")
427+
public void testDontReapMessageOfOtherHandler() {
426428
MessageGroupStore groupStore = new SimpleMessageStore();
427429

428430
AggregatingMessageHandler handler1 = new AggregatingMessageHandler(group -> group, groupStore);

spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java

Lines changed: 12 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
package org.springframework.integration.mongodb.store;
1818

1919
import java.util.ArrayList;
20-
import java.util.Arrays;
2120
import java.util.Collection;
2221
import java.util.Iterator;
23-
import java.util.LinkedHashSet;
2422
import java.util.List;
2523
import java.util.UUID;
24+
import java.util.stream.Collectors;
25+
import java.util.stream.StreamSupport;
2626

2727
import org.springframework.data.domain.Sort;
2828
import org.springframework.data.mongodb.MongoDbFactory;
@@ -32,12 +32,10 @@
3232
import org.springframework.data.mongodb.core.query.Query;
3333
import org.springframework.data.mongodb.core.query.Update;
3434
import org.springframework.integration.store.MessageGroup;
35-
import org.springframework.integration.store.MessageGroupMetadata;
3635
import org.springframework.integration.store.MessageGroupStore;
3736
import org.springframework.integration.store.MessageStore;
3837
import org.springframework.integration.store.SimpleMessageGroup;
3938
import org.springframework.jmx.export.annotation.ManagedAttribute;
40-
import org.springframework.jmx.export.annotation.ManagedOperation;
4139
import org.springframework.messaging.Message;
4240
import org.springframework.util.Assert;
4341

@@ -58,11 +56,6 @@ public class ConfigurableMongoDbMessageStore extends AbstractConfigurableMongoDb
5856

5957
public final static String DEFAULT_COLLECTION_NAME = "configurableStoreMessages";
6058

61-
private final Collection<MessageGroupCallback> expiryCallbacks = new LinkedHashSet<MessageGroupCallback>();
62-
63-
private volatile boolean timeoutOnIdle;
64-
65-
6659
public ConfigurableMongoDbMessageStore(MongoTemplate mongoTemplate) {
6760
this(mongoTemplate, DEFAULT_COLLECTION_NAME);
6861
}
@@ -85,33 +78,8 @@ public ConfigurableMongoDbMessageStore(MongoDbFactory mongoDbFactory, String col
8578

8679
public ConfigurableMongoDbMessageStore(MongoDbFactory mongoDbFactory, MappingMongoConverter mappingMongoConverter,
8780
String collectionName) {
88-
super(mongoDbFactory, mappingMongoConverter, collectionName);
89-
}
90-
91-
/**
92-
* Convenient injection point for expiry callbacks in the message store. Each of the callbacks provided will simply
93-
* be registered with the store using {@link #registerMessageGroupExpiryCallback(MessageGroupCallback)}.
94-
* @param expiryCallbacks the expiry callbacks to add
95-
*/
96-
public void setExpiryCallbacks(Collection<MessageGroupCallback> expiryCallbacks) {
97-
for (MessageGroupCallback callback : expiryCallbacks) {
98-
registerMessageGroupExpiryCallback(callback);
99-
}
100-
}
101-
102-
public boolean isTimeoutOnIdle() {
103-
return this.timeoutOnIdle;
104-
}
10581

106-
/**
107-
* Allows you to override the rule for the timeout calculation. Typical timeout is based from the time
108-
* the {@link MessageGroup} was created. If you want the timeout to be based on the time
109-
* the {@link MessageGroup} was idling (e.g., inactive from the last update) invoke this method with 'true'.
110-
* Default is 'false'.
111-
* @param timeoutOnIdle The boolean.
112-
*/
113-
public void setTimeoutOnIdle(boolean timeoutOnIdle) {
114-
this.timeoutOnIdle = timeoutOnIdle;
82+
super(mongoDbFactory, mappingMongoConverter, collectionName);
11583
}
11684

11785
@Override
@@ -204,7 +172,7 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
204172
Assert.notNull(groupId, "'groupId' must not be null");
205173
Assert.notNull(messages, "'messageToRemove' must not be null");
206174

207-
Collection<UUID> ids = new ArrayList<UUID>();
175+
Collection<UUID> ids = new ArrayList<>();
208176
for (Message<?> messageToRemove : messages) {
209177
ids.add(messageToRemove.getHeaders().getId());
210178
if (ids.size() >= getRemoveBatchSize()) {
@@ -224,11 +192,6 @@ private void removeMessages(Object groupId, Collection<UUID> ids) {
224192
this.mongoTemplate.remove(query, this.collectionName);
225193
}
226194

227-
@Override
228-
public void removeMessagesFromGroup(Object groupId, Message<?>... messages) {
229-
removeMessagesFromGroup(groupId, Arrays.asList(messages));
230-
}
231-
232195
@Override
233196
public Message<?> pollMessageFromGroup(final Object groupId) {
234197
Assert.notNull(groupId, "'groupId' must not be null");
@@ -246,52 +209,24 @@ public Message<?> pollMessageFromGroup(final Object groupId) {
246209

247210
@Override
248211
public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
249-
this.updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.LAST_RELEASED_SEQUENCE, sequenceNumber));
212+
updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.LAST_RELEASED_SEQUENCE, sequenceNumber));
250213
}
251214

252215
@Override
253216
public void completeGroup(Object groupId) {
254-
this.updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.COMPLETE, true));
217+
updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.COMPLETE, true));
255218
}
256219

257220
@Override
258221
public Iterator<MessageGroup> iterator() {
259-
List<MessageGroup> messageGroups = new ArrayList<MessageGroup>();
260-
261222
Query query = Query.query(Criteria.where(MessageDocumentFields.GROUP_ID).exists(true));
262223
Iterable<String> groupIds = mongoTemplate.getCollection(collectionName)
263224
.distinct(MessageDocumentFields.GROUP_ID, query.getQueryObject(), String.class);
264225

265-
for (Object groupId : groupIds) {
266-
messageGroups.add(getMessageGroup(groupId));
267-
}
268-
269-
return messageGroups.iterator();
270-
}
271-
272-
@Override
273-
public void registerMessageGroupExpiryCallback(MessageGroupCallback callback) {
274-
this.expiryCallbacks.add(callback);
275-
}
276-
277-
@Override
278-
@ManagedOperation
279-
public int expireMessageGroups(long timeout) {
280-
int count = 0;
281-
long threshold = System.currentTimeMillis() - timeout;
282-
for (MessageGroup group : this) {
283-
284-
long timestamp = group.getTimestamp();
285-
if (this.isTimeoutOnIdle() && group.getLastModified() > 0) {
286-
timestamp = group.getLastModified();
287-
}
226+
return StreamSupport.stream(groupIds.spliterator(), false)
227+
.map(this::getMessageGroup)
228+
.iterator();
288229

289-
if (timestamp <= threshold) {
290-
count++;
291-
expire(group);
292-
}
293-
}
294-
return count;
295230
}
296231

297232
@Override
@@ -314,11 +249,6 @@ public int getMessageGroupCount() {
314249
.size();
315250
}
316251

317-
@Override
318-
public MessageGroupMetadata getGroupMetadata(Object groupId) {
319-
throw new UnsupportedOperationException("Not yet implemented for this store");
320-
}
321-
322252
@Override
323253
public Message<?> getOneMessageFromGroup(Object groupId) {
324254
Assert.notNull(groupId, "'groupId' must not be null");
@@ -337,36 +267,12 @@ public Collection<Message<?>> getMessagesForGroup(Object groupId) {
337267
Assert.notNull(groupId, "'groupId' must not be null");
338268
Query query = groupOrderQuery(groupId);
339269
List<MessageDocument> documents = this.mongoTemplate.find(query, MessageDocument.class, this.collectionName);
340-
List<Message<?>> messages = new ArrayList<Message<?>>();
341-
342-
for (MessageDocument document : documents) {
343-
messages.add(document.getMessage());
344-
}
345-
return messages;
346-
}
347-
348-
private void expire(MessageGroup group) {
349-
350-
RuntimeException exception = null;
351-
352-
for (MessageGroupCallback callback : this.expiryCallbacks) {
353-
try {
354-
callback.execute(this, group);
355-
}
356-
catch (RuntimeException e) {
357-
if (exception == null) {
358-
exception = e;
359-
}
360-
logger.error("Exception in expiry callback", e);
361-
}
362-
}
363270

364-
if (exception != null) {
365-
throw exception;
366-
}
271+
return documents.stream()
272+
.map(MessageDocument::getMessage)
273+
.collect(Collectors.toList());
367274
}
368275

369-
370276
private void updateGroup(Object groupId, Update update) {
371277
this.mongoTemplate.updateFirst(groupOrderQuery(groupId), update, this.collectionName);
372278
}

src/reference/asciidoc/aggregator.adoc

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -814,9 +814,16 @@ by removing the group from the store entirely).
814814

815815
The `MessageGroupStore` maintains a list of these callbacks which it applies, on demand, to all messages whose timestamp is earlier than a time supplied as a parameter (see the `registerMessageGroupExpiryCallback(..)` and `expireMessageGroups(..)` methods above).
816816

817-
The `expireMessageGroups` method can be called with a timeout value.
818-
Any message older than the current time minus this value will be expired, and have the callbacks applied.
819-
Thus it is the user of the store that defines what is meant by message group "expiry".
817+
IMPORTANT: It is important not to use the same `MessageGroupStore` instance in different aggregator components, when you intend to rely on the `expireMessageGroups` functionality.
818+
Every `AbstractCorrelatingMessageHandler` registers its own `MessageGroupCallback` based on the `forceComplete()` callback.
819+
This way each group for expiration may be completed or discarded by the wrong aggregator.
820+
Starting with version 5.0.10, a `UniqueExpiryCallback` is used from the `AbstractCorrelatingMessageHandler` for the registration callback in the `MessageGroupStore`.
821+
The `MessageGroupStore`, in turn, checks for presence an instance of this class and logs an error with an appropriate message if one is already present in the callbacks set.
822+
This way the Framework disallows usage of the `MessageGroupStore` instance in different aggregators/resequencers to avoid the mentioned side effect of expiration the groups not created by the particular correlation handler.
823+
824+
You can call the `expireMessageGroups` method with a timeout value.
825+
Any message older than the current time minus this value is expired and has the callbacks applied.
826+
Thus, it is the user of the store that defines what is meant by message group "`expiry`".
820827

821828
As a convenience for users, Spring Integration provides a wrapper for the message expiry in the form of a `MessageGroupStoreReaper`:
822829

0 commit comments

Comments
 (0)