Skip to content

Commit 5bf6161

Browse files
artembilangaryrussell
authored andcommitted
INT-4550: Disallow multi aggregators on same MGS (#2622)
* INT-4550: Disallow multi aggregators on same MGS JIRA: https://jira.spring.io/browse/INT-4550 **Cherry-pick to 5.0.x** * * Introduce `UniqueExpiryCallback` * Use `UniqueExpiryCallback` in the `AbstractCorrelatingMessageHandler` * Check for uniqueness in the `AbstractMessageGroupStore` * Remove duplicate code in the `ConfigurableMongoDbMessageStore` * * Fix tests according a new logic * * Address PR review * Change `Assert.isTrue` to the `logger.error` for backward compatibility * Revert changes in tests since we don't throw exception anymore * Fix language on doc * * Fix Checkstyle violation in the `AbstractMessageGroupStore` * * Ignore `testDontReapMessageOfOtherHandler()`
1 parent 3d696ef commit 5bf6161

File tree

6 files changed

+71
-120
lines changed

6 files changed

+71
-120
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Date;
2323
import java.util.List;
2424
import java.util.Map;
25-
import java.util.Set;
2625
import java.util.UUID;
2726
import java.util.concurrent.ConcurrentHashMap;
2827
import java.util.concurrent.ScheduledFuture;
@@ -51,6 +50,7 @@
5150
import org.springframework.integration.store.MessageStore;
5251
import org.springframework.integration.store.SimpleMessageGroup;
5352
import org.springframework.integration.store.SimpleMessageStore;
53+
import org.springframework.integration.store.UniqueExpiryCallback;
5454
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
5555
import org.springframework.integration.support.MessageBuilder;
5656
import org.springframework.integration.support.locks.DefaultLockRegistry;
@@ -102,8 +102,6 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
102102

103103
private final Map<UUID, ScheduledFuture<?>> expireGroupScheduledFutures = new ConcurrentHashMap<>();
104104

105-
private final Set<Object> groupIds = ConcurrentHashMap.newKeySet();
106-
107105
private MessageGroupProcessor outputProcessor;
108106

109107
private MessageGroupStore messageStore;
@@ -188,8 +186,9 @@ public void setLockRegistry(LockRegistry lockRegistry) {
188186

189187
public final void setMessageStore(MessageGroupStore store) {
190188
this.messageStore = store;
191-
store.registerMessageGroupExpiryCallback(
192-
(messageGroupStore, group) -> this.forceReleaseProcessor.processMessageGroup(group));
189+
UniqueExpiryCallback expiryCallback =
190+
(messageGroupStore, group) -> this.forceReleaseProcessor.processMessageGroup(group);
191+
store.registerMessageGroupExpiryCallback(expiryCallback);
193192
}
194193

195194
public void setCorrelationStrategy(CorrelationStrategy correlationStrategy) {
@@ -746,7 +745,6 @@ protected void forceComplete(MessageGroup group) {
746745
protected void remove(MessageGroup group) {
747746
Object correlationKey = group.getGroupId();
748747
this.messageStore.removeMessageGroup(correlationKey);
749-
this.groupIds.remove(group.getGroupId());
750748
}
751749

752750
protected int findLastReleasedSequenceNumber(Object groupId, Collection<Message<?>> partialSequence) {
@@ -755,7 +753,6 @@ protected int findLastReleasedSequenceNumber(Object groupId, Collection<Message<
755753
}
756754

757755
protected MessageGroup store(Object correlationKey, Message<?> message) {
758-
this.groupIds.add(correlationKey);
759756
return this.messageStore.addMessageToGroup(correlationKey, message);
760757
}
761758

@@ -949,9 +946,7 @@ private class ForceReleaseMessageGroupProcessor implements MessageGroupProcessor
949946

950947
@Override
951948
public Object processMessageGroup(MessageGroup group) {
952-
if (AbstractCorrelatingMessageHandler.this.groupIds.contains(group.getGroupId())) {
953-
forceComplete(group);
954-
}
949+
forceComplete(group);
955950
return null;
956951
}
957952

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageG
4242

4343
protected final Log logger = LogFactory.getLog(getClass());
4444

45-
private final Collection<MessageGroupCallback> expiryCallbacks = new LinkedHashSet<MessageGroupCallback>();
45+
private final Collection<MessageGroupCallback> expiryCallbacks = new LinkedHashSet<>();
4646

4747
private final MessageGroupFactory persistentMessageGroupFactory =
4848
new SimpleMessageGroupFactory(SimpleMessageGroupFactory.GroupType.PERSISTENT);
@@ -72,13 +72,10 @@ protected MessageGroupFactory getMessageGroupFactory() {
7272
/**
7373
* Convenient injection point for expiry callbacks in the message store. Each of the callbacks provided will simply
7474
* be registered with the store using {@link #registerMessageGroupExpiryCallback(MessageGroupCallback)}.
75-
*
7675
* @param expiryCallbacks the expiry callbacks to add
7776
*/
7877
public void setExpiryCallbacks(Collection<MessageGroupCallback> expiryCallbacks) {
79-
for (MessageGroupCallback callback : expiryCallbacks) {
80-
registerMessageGroupExpiryCallback(callback);
81-
}
78+
expiryCallbacks.forEach(this::registerMessageGroupExpiryCallback);
8279
}
8380

8481
public boolean isTimeoutOnIdle() {
@@ -110,6 +107,17 @@ public void setLazyLoadMessageGroups(boolean lazyLoadMessageGroups) {
110107

111108
@Override
112109
public void registerMessageGroupExpiryCallback(MessageGroupCallback callback) {
110+
if (callback instanceof UniqueExpiryCallback) {
111+
boolean uniqueExpiryCallbackPresent =
112+
this.expiryCallbacks.stream()
113+
.anyMatch(UniqueExpiryCallback.class::isInstance);
114+
115+
if (!uniqueExpiryCallbackPresent && this.logger.isErrorEnabled()) {
116+
this.logger.error("Only one instance of 'UniqueExpiryCallback' can be registered in the " +
117+
"'MessageGroupStore'. Use a separate 'MessageGroupStore' for each aggregator/resequencer.");
118+
}
119+
}
120+
113121
this.expiryCallbacks.add(callback);
114122
}
115123

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.store;
18+
19+
/**
20+
* A marker interface extension of the {@link MessageGroupStore.MessageGroupCallback}
21+
* for components which should be registered in the {@link MessageGroupStore} only once.
22+
* The {@link MessageGroupStore} implementation ensures that only once instance of this
23+
* class is present in the expire callbacks.
24+
*
25+
* @author Meherzad Lahewala
26+
* @author Artme Bilan
27+
*
28+
* @since 5.0.10
29+
*/
30+
@FunctionalInterface
31+
public interface UniqueExpiryCallback extends MessageGroupStore.MessageGroupCallback {
32+
33+
}

spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ public void testScheduleRemoveAnEmptyGroupAfterConfiguredDelay() throws Exceptio
425425
}
426426

427427
@Test
428+
@Ignore("Until 5.2 with new 'owner' feature on groups")
428429
public void testDontReapMessageOfOtherHandler() {
429430
MessageGroupStore groupStore = new SimpleMessageStore();
430431

spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java

Lines changed: 12 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
package org.springframework.integration.mongodb.store;
1818

1919
import java.util.ArrayList;
20-
import java.util.Arrays;
2120
import java.util.Collection;
2221
import java.util.Iterator;
23-
import java.util.LinkedHashSet;
2422
import java.util.List;
2523
import java.util.UUID;
24+
import java.util.stream.Collectors;
25+
import java.util.stream.StreamSupport;
2626

2727
import org.springframework.data.domain.Sort;
2828
import org.springframework.data.mongodb.MongoDbFactory;
@@ -32,12 +32,10 @@
3232
import org.springframework.data.mongodb.core.query.Query;
3333
import org.springframework.data.mongodb.core.query.Update;
3434
import org.springframework.integration.store.MessageGroup;
35-
import org.springframework.integration.store.MessageGroupMetadata;
3635
import org.springframework.integration.store.MessageGroupStore;
3736
import org.springframework.integration.store.MessageStore;
3837
import org.springframework.integration.store.SimpleMessageGroup;
3938
import org.springframework.jmx.export.annotation.ManagedAttribute;
40-
import org.springframework.jmx.export.annotation.ManagedOperation;
4139
import org.springframework.messaging.Message;
4240
import org.springframework.util.Assert;
4341

@@ -59,10 +57,6 @@ public class ConfigurableMongoDbMessageStore extends AbstractConfigurableMongoDb
5957

6058
public final static String DEFAULT_COLLECTION_NAME = "configurableStoreMessages";
6159

62-
private final Collection<MessageGroupCallback> expiryCallbacks = new LinkedHashSet<>();
63-
64-
private volatile boolean timeoutOnIdle;
65-
6660

6761
public ConfigurableMongoDbMessageStore(MongoTemplate mongoTemplate) {
6862
this(mongoTemplate, DEFAULT_COLLECTION_NAME);
@@ -86,33 +80,8 @@ public ConfigurableMongoDbMessageStore(MongoDbFactory mongoDbFactory, String col
8680

8781
public ConfigurableMongoDbMessageStore(MongoDbFactory mongoDbFactory, MappingMongoConverter mappingMongoConverter,
8882
String collectionName) {
89-
super(mongoDbFactory, mappingMongoConverter, collectionName);
90-
}
91-
92-
/**
93-
* Convenient injection point for expiry callbacks in the message store. Each of the callbacks provided will simply
94-
* be registered with the store using {@link #registerMessageGroupExpiryCallback(MessageGroupCallback)}.
95-
* @param expiryCallbacks the expiry callbacks to add
96-
*/
97-
public void setExpiryCallbacks(Collection<MessageGroupCallback> expiryCallbacks) {
98-
for (MessageGroupCallback callback : expiryCallbacks) {
99-
registerMessageGroupExpiryCallback(callback);
100-
}
101-
}
10283

103-
public boolean isTimeoutOnIdle() {
104-
return this.timeoutOnIdle;
105-
}
106-
107-
/**
108-
* Allows you to override the rule for the timeout calculation. Typical timeout is based from the time
109-
* the {@link MessageGroup} was created. If you want the timeout to be based on the time
110-
* the {@link MessageGroup} was idling (e.g., inactive from the last update) invoke this method with 'true'.
111-
* Default is 'false'.
112-
* @param timeoutOnIdle The boolean.
113-
*/
114-
public void setTimeoutOnIdle(boolean timeoutOnIdle) {
115-
this.timeoutOnIdle = timeoutOnIdle;
84+
super(mongoDbFactory, mappingMongoConverter, collectionName);
11685
}
11786

11887
@Override
@@ -205,7 +174,7 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
205174
Assert.notNull(groupId, "'groupId' must not be null");
206175
Assert.notNull(messages, "'messageToRemove' must not be null");
207176

208-
Collection<UUID> ids = new ArrayList<UUID>();
177+
Collection<UUID> ids = new ArrayList<>();
209178
for (Message<?> messageToRemove : messages) {
210179
ids.add(messageToRemove.getHeaders().getId());
211180
if (ids.size() >= getRemoveBatchSize()) {
@@ -225,11 +194,6 @@ private void removeMessages(Object groupId, Collection<UUID> ids) {
225194
this.mongoTemplate.remove(query, this.collectionName);
226195
}
227196

228-
@Override
229-
public void removeMessagesFromGroup(Object groupId, Message<?>... messages) {
230-
removeMessagesFromGroup(groupId, Arrays.asList(messages));
231-
}
232-
233197
@Override
234198
public Message<?> pollMessageFromGroup(final Object groupId) {
235199
Assert.notNull(groupId, "'groupId' must not be null");
@@ -247,52 +211,24 @@ public Message<?> pollMessageFromGroup(final Object groupId) {
247211

248212
@Override
249213
public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
250-
this.updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.LAST_RELEASED_SEQUENCE, sequenceNumber));
214+
updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.LAST_RELEASED_SEQUENCE, sequenceNumber));
251215
}
252216

253217
@Override
254218
public void completeGroup(Object groupId) {
255-
this.updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.COMPLETE, true));
219+
updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.COMPLETE, true));
256220
}
257221

258222
@Override
259223
public Iterator<MessageGroup> iterator() {
260-
List<MessageGroup> messageGroups = new ArrayList<MessageGroup>();
261-
262224
Query query = Query.query(Criteria.where(MessageDocumentFields.GROUP_ID).exists(true));
263225
Iterable<String> groupIds = mongoTemplate.getCollection(collectionName)
264226
.distinct(MessageDocumentFields.GROUP_ID, query.getQueryObject(), String.class);
265227

266-
for (Object groupId : groupIds) {
267-
messageGroups.add(getMessageGroup(groupId));
268-
}
269-
270-
return messageGroups.iterator();
271-
}
228+
return StreamSupport.stream(groupIds.spliterator(), false)
229+
.map(this::getMessageGroup)
230+
.iterator();
272231

273-
@Override
274-
public void registerMessageGroupExpiryCallback(MessageGroupCallback callback) {
275-
this.expiryCallbacks.add(callback);
276-
}
277-
278-
@Override
279-
@ManagedOperation
280-
public int expireMessageGroups(long timeout) {
281-
int count = 0;
282-
long threshold = System.currentTimeMillis() - timeout;
283-
for (MessageGroup group : this) {
284-
285-
long timestamp = group.getTimestamp();
286-
if (this.isTimeoutOnIdle() && group.getLastModified() > 0) {
287-
timestamp = group.getLastModified();
288-
}
289-
290-
if (timestamp <= threshold) {
291-
count++;
292-
expire(group);
293-
}
294-
}
295-
return count;
296232
}
297233

298234
@Override
@@ -315,11 +251,6 @@ public int getMessageGroupCount() {
315251
.size();
316252
}
317253

318-
@Override
319-
public MessageGroupMetadata getGroupMetadata(Object groupId) {
320-
throw new UnsupportedOperationException("Not yet implemented for this store");
321-
}
322-
323254
@Override
324255
public Message<?> getOneMessageFromGroup(Object groupId) {
325256
Assert.notNull(groupId, "'groupId' must not be null");
@@ -338,36 +269,12 @@ public Collection<Message<?>> getMessagesForGroup(Object groupId) {
338269
Assert.notNull(groupId, "'groupId' must not be null");
339270
Query query = groupOrderQuery(groupId);
340271
List<MessageDocument> documents = this.mongoTemplate.find(query, MessageDocument.class, this.collectionName);
341-
List<Message<?>> messages = new ArrayList<Message<?>>();
342272

343-
for (MessageDocument document : documents) {
344-
messages.add(document.getMessage());
345-
}
346-
return messages;
273+
return documents.stream()
274+
.map(MessageDocument::getMessage)
275+
.collect(Collectors.toList());
347276
}
348277

349-
private void expire(MessageGroup group) {
350-
351-
RuntimeException exception = null;
352-
353-
for (MessageGroupCallback callback : this.expiryCallbacks) {
354-
try {
355-
callback.execute(this, group);
356-
}
357-
catch (RuntimeException e) {
358-
if (exception == null) {
359-
exception = e;
360-
}
361-
logger.error("Exception in expiry callback", e);
362-
}
363-
}
364-
365-
if (exception != null) {
366-
throw exception;
367-
}
368-
}
369-
370-
371278
private void updateGroup(Object groupId, Update update) {
372279
this.mongoTemplate.updateFirst(groupOrderQuery(groupId), update, this.collectionName);
373280
}

src/reference/asciidoc/aggregator.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,13 @@ The callback has direct access to the store and the message group so that it can
812812
The `MessageGroupStore` maintains a list of these callbacks, which it applies, on demand, to all messages whose timestamps are earlier than a time supplied as a parameter (see the `registerMessageGroupExpiryCallback(..)` and `expireMessageGroups(..)` methods, described earlier).
813813
For more detail, see <<reaper>>.
814814
815+
IMPORTANT: It is important not to use the same `MessageGroupStore` instance in different aggregator components, when you intend to rely on the `expireMessageGroups` functionality.
816+
Every `AbstractCorrelatingMessageHandler` registers its own `MessageGroupCallback` based on the `forceComplete()` callback.
817+
This way each group for expiration may be completed or discarded by the wrong aggregator.
818+
Starting with version 5.0.10, a `UniqueExpiryCallback` is used from the `AbstractCorrelatingMessageHandler` for the registration callback in the `MessageGroupStore`.
819+
The `MessageGroupStore`, in turn, checks for presence an instance of this class and logs an error with an appropriate message if one is already present in the callbacks set.
820+
This way the Framework disallows usage of the `MessageGroupStore` instance in different aggregators/resequencers to avoid the mentioned side effect of expiration the groups not created by the particular correlation handler.
821+
815822
You can call the `expireMessageGroups` method with a timeout value.
816823
Any message older than the current time minus this value is expired and has the callbacks applied.
817824
Thus, it is the user of the store that defines what is meant by message group "`expiry`".

0 commit comments

Comments
 (0)