Skip to content

Commit 806db14

Browse files
committed
INT-4497: Add RateLimiterRequestHandlerAdvice
JIRA: https://jira.spring.io/browse/INT-4497
1 parent a167290 commit 806db14

File tree

5 files changed

+292
-2
lines changed

5 files changed

+292
-2
lines changed

build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
buildscript {
1+
buildscript {
22
ext.kotlinVersion = '1.3.21'
33
repositories {
44
maven { url 'https://repo.spring.io/plugins-release' }
@@ -131,6 +131,7 @@ subprojects { subproject ->
131131
postgresVersion = '42.2.5'
132132
reactorNettyVersion = '0.8.5.RELEASE'
133133
reactorVersion = '3.2.6.RELEASE'
134+
resilience4jVersion = '0.13.2'
134135
romeToolsVersion = '1.12.0'
135136
servletApiVersion = '4.0.1'
136137
smackVersion = '4.3.1'
@@ -381,6 +382,7 @@ project('spring-integration-core') {
381382
compile("io.fastjson:boon:$boonVersion", optional)
382383
compile("com.esotericsoftware:kryo-shaded:$kryoShadedVersion", optional)
383384
compile("io.micrometer:micrometer-core:$micrometerVersion", optional)
385+
compile("io.github.resilience4j:resilience4j-ratelimiter:$resilience4jVersion", optional)
384386

385387
testCompile ("org.aspectj:aspectjweaver:$aspectjVersion")
386388
testCompile "io.projectreactor:reactor-test:$reactorVersion"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.handler.advice;
18+
19+
import java.time.Duration;
20+
21+
import org.springframework.context.ApplicationEventPublisher;
22+
import org.springframework.messaging.Message;
23+
import org.springframework.messaging.MessagingException;
24+
import org.springframework.util.Assert;
25+
26+
import io.github.resilience4j.ratelimiter.RateLimiter;
27+
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
28+
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
29+
import io.vavr.CheckedFunction0;
30+
import io.vavr.control.Try;
31+
32+
/**
33+
* An {@link AbstractRequestHandlerAdvice} extension for a rate limiting to service method calls.
34+
* The implementation is based on the
35+
* <a href="https://github.com/resilience4j/resilience4j#ratelimiter">Resilience4j</a>.
36+
*
37+
* @author Artem Bilan
38+
*
39+
* @since 5.2
40+
*/
41+
public class RateLimiterRequestHandlerAdvice extends AbstractRequestHandlerAdvice {
42+
43+
public static final String DEFAULT_NAME = "RateLimiterRequestHandlerAdvice";
44+
45+
private final RateLimiter rateLimiter;
46+
47+
private ApplicationEventPublisher applicationEventPublisher;
48+
49+
/**
50+
* Construct an instance based on default rate limiter options
51+
* and {@value #DEFAULT_NAME} as a rate limiter name.
52+
* @see RateLimiter#ofDefaults
53+
*/
54+
public RateLimiterRequestHandlerAdvice() {
55+
this(RateLimiter.ofDefaults(DEFAULT_NAME));
56+
}
57+
58+
/**
59+
* Construct an instance based on default rate limiter options and provided name.
60+
* @param name the name for the rate limiter.
61+
*/
62+
public RateLimiterRequestHandlerAdvice(String name) {
63+
this(RateLimiter.ofDefaults(name));
64+
Assert.hasText(name, "'name' must not be empty");
65+
}
66+
67+
/**
68+
* Construct an instance based on the provided {@link RateLimiter}.
69+
* @param rateLimiter the {@link RateLimiter} to use.
70+
*/
71+
public RateLimiterRequestHandlerAdvice(RateLimiter rateLimiter) {
72+
Assert.notNull(rateLimiter, "'rateLimiter' must not be null");
73+
this.rateLimiter = rateLimiter;
74+
}
75+
76+
/**
77+
* Construct an instance based on the provided {@link RateLimiterConfig}
78+
* and {@value #DEFAULT_NAME} as a rate limiter name.
79+
* @param rateLimiterConfig the {@link RateLimiterConfig} to use.
80+
*/
81+
public RateLimiterRequestHandlerAdvice(RateLimiterConfig rateLimiterConfig) {
82+
this(rateLimiterConfig, DEFAULT_NAME);
83+
}
84+
85+
/**
86+
* Construct an instance based on the provided {@link RateLimiterConfig} and name.
87+
* @param rateLimiterConfig the {@link RateLimiterConfig} to use.
88+
* @param name the name for the rate limiter.
89+
*/
90+
public RateLimiterRequestHandlerAdvice(RateLimiterConfig rateLimiterConfig, String name) {
91+
Assert.notNull(rateLimiterConfig, "'rateLimiterConfig' must not be null");
92+
Assert.hasText(name, "'name' must not be empty");
93+
this.rateLimiter = RateLimiter.of(name, rateLimiterConfig);
94+
}
95+
96+
/**
97+
* Change the {@code limitForPeriod} option of the {@link #rateLimiter}.
98+
* @param limitForPeriod the {@code limitForPeriod} to use.
99+
* @see RateLimiter#changeLimitForPeriod(int)
100+
*/
101+
public void setLimitForPeriod(int limitForPeriod) {
102+
this.rateLimiter.changeLimitForPeriod(limitForPeriod);
103+
}
104+
105+
/**
106+
* Change the {@code timeoutDuration} option of the {@link #rateLimiter}.
107+
* @param timeoutDuration the {@code timeoutDuration} to use.
108+
* @see RateLimiter#changeTimeoutDuration(Duration)
109+
*/
110+
public void setTimeoutDuration(Duration timeoutDuration) {
111+
this.rateLimiter.changeTimeoutDuration(timeoutDuration);
112+
}
113+
114+
/**
115+
* Obtain the metrics from the rate limiter.
116+
* @return the {@link RateLimiter.Metrics} from rate limiter.
117+
* @see RateLimiter#getMetrics()
118+
*/
119+
public RateLimiter.Metrics getMetrics() {
120+
return this.rateLimiter.getMetrics();
121+
}
122+
123+
@Override
124+
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
125+
CheckedFunction0<Object> restrictedCall =
126+
RateLimiter.decorateCheckedSupplier(this.rateLimiter, callback::execute);
127+
try {
128+
return Try.of(restrictedCall).get();
129+
}
130+
catch (RequestNotPermitted ex) {
131+
throw new RateLimitExceededException(message, "Rate limit exceeded for: " + target, ex);
132+
}
133+
}
134+
135+
136+
/**
137+
* A {@link MessagingException} wrapper for the {@link RequestNotPermitted}
138+
* with the {@code requestMessage} and {@code target} context.
139+
*/
140+
public static class RateLimitExceededException extends MessagingException {
141+
142+
private static final long serialVersionUID = 1L;
143+
144+
RateLimitExceededException(Message<?> message, String description, RequestNotPermitted cause) {
145+
super(message, description, cause);
146+
}
147+
148+
}
149+
150+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.handler.advice;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
22+
import java.time.Duration;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.integration.annotation.ServiceActivator;
30+
import org.springframework.integration.channel.QueueChannel;
31+
import org.springframework.integration.config.EnableIntegration;
32+
import org.springframework.messaging.Message;
33+
import org.springframework.messaging.MessageChannel;
34+
import org.springframework.messaging.MessagingException;
35+
import org.springframework.messaging.PollableChannel;
36+
import org.springframework.messaging.support.GenericMessage;
37+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
38+
39+
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
40+
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
41+
42+
/**
43+
* @author Artem Bilan
44+
*
45+
* @since 5.2
46+
*/
47+
@SpringJUnitConfig
48+
public class RateLimiterRequestHandlerAdviceTests {
49+
50+
@Autowired
51+
private MessageChannel requestChannel;
52+
53+
@Autowired
54+
private PollableChannel resultChannel;
55+
56+
@Test
57+
void testRateLimiter() throws InterruptedException {
58+
Message<?> testMessage = new GenericMessage<>("test");
59+
this.requestChannel.send(testMessage);
60+
61+
assertThatExceptionOfType(MessagingException.class)
62+
.isThrownBy(() -> this.requestChannel.send(testMessage))
63+
.withCauseInstanceOf(RequestNotPermitted.class)
64+
.withMessageContaining("Rate limit exceeded for: ");
65+
66+
Thread.sleep(200);
67+
68+
this.requestChannel.send(testMessage);
69+
70+
assertThat(this.resultChannel.receive(10_000)).isNotNull();
71+
assertThat(this.resultChannel.receive(10_000)).isNotNull();
72+
}
73+
74+
@Configuration
75+
@EnableIntegration
76+
public static class ContextConfiguration {
77+
78+
@Bean
79+
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
80+
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
81+
.timeoutDuration(Duration.ofMillis(100))
82+
.limitRefreshPeriod(Duration.ofMillis(500))
83+
.limitForPeriod(1)
84+
.build());
85+
}
86+
87+
@Bean
88+
public PollableChannel resultChannel() {
89+
return new QueueChannel();
90+
}
91+
92+
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
93+
adviceChain = "rateLimiterRequestHandlerAdvice")
94+
public String handleRequest(String payload) {
95+
return payload;
96+
}
97+
98+
}
99+
100+
}

src/reference/asciidoc/handler-advice.adoc

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,12 @@ For chains that produce a reply, every child element can be advised.
4949
[[advice-classes]]
5050
==== Provided Advice Classes
5151

52-
In addition to providing the general mechanism to apply AOP advice classes, Spring Integration provides three standard advice classes:
52+
In addition to providing the general mechanism to apply AOP advice classes, Spring Integration provides these out-of-the-box advice implementations:
5353

5454
* `RequestHandlerRetryAdvice` (described in <<retry-advice>>)
5555
* `RequestHandlerCircuitBreakerAdvice` (described in <<circuit-breaker-advice>>)
5656
* `ExpressionEvaluatingRequestHandlerAdvice` (described in <<expression-advice>>)
57+
* `RateLimiterRequestHandlerAdvice` (described in <<rate-limiter-advice>>)
5758

5859
[[retry-advice]]
5960
===== Retry Advice
@@ -464,6 +465,37 @@ public class EerhaApplication {
464465
----
465466
====
466467

468+
[[rate-limiter-advice]]
469+
===== Rate Limiter Advice
470+
471+
The Rate Limiter advice (`RateLimiterRequestHandlerAdvice`) allows to ensure that an endpoint does not get overloaded with requests.
472+
When the rate limit is breached the request will go in a blocked state.
473+
474+
A typical use case for this advice might be an external service provider not allowing more than `n` number of request per minute.
475+
476+
The `RateLimiterRequestHandlerAdvice` implementation is fully based on the https://github.com/resilience4j/resilience4j#ratelimiter[Resilience4j] project and requires either `RateLimiter` or `RateLimiterConfig` injections.
477+
Can also be configured with defaults and/or custom name.
478+
479+
The following example configures a rate limiter advice with one request per 1 second:
480+
====
481+
[source, java]
482+
----
483+
@Bean
484+
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
485+
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
486+
.limitRefreshPeriod(Duration.ofSeconds(1))
487+
.limitForPeriod(1)
488+
.build());
489+
}
490+
491+
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
492+
adviceChain = "rateLimiterRequestHandlerAdvice")
493+
public String handleRequest(String payload) {
494+
...
495+
}
496+
----
497+
====
498+
467499
[[custom-advice]]
468500
==== Custom Advice Classes
469501

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ If you are interested in more details, see the Issue Tracker tickets that were r
77
[[x5.2-new-components]]
88
=== New Components
99

10+
[[x5.2-rateLimitAdvice]]
11+
=== Rate Limit Advice Support
12+
13+
The `RateLimiterRequestHandlerAdvice` is now available for limiting requests rate on handlers.
14+
See <<rate-limiter-advice>> for more information.
15+
1016
[[x5.2-general]]
1117
=== General Changes
1218

0 commit comments

Comments
 (0)