Skip to content

Commit 7ea8ecb

Browse files
committed
Warn when SimpleAsyncTaskExecutor is used
Issue: SPR-16203
1 parent 8c1bc63 commit 7ea8ecb

File tree

6 files changed

+101
-37
lines changed

6 files changed

+101
-37
lines changed

spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import org.springframework.core.task.AsyncTaskExecutor;
3232
import org.springframework.core.task.SimpleAsyncTaskExecutor;
33+
import org.springframework.core.task.SyncTaskExecutor;
3334
import org.springframework.lang.Nullable;
3435
import org.springframework.util.Assert;
3536
import org.springframework.web.context.request.RequestAttributes;
@@ -62,6 +63,9 @@ public final class WebAsyncManager {
6263

6364
private static final Object RESULT_NONE = new Object();
6465

66+
private static final AsyncTaskExecutor DEFAULT_TASK_EXECUTOR =
67+
new SimpleAsyncTaskExecutor(WebAsyncManager.class.getSimpleName());
68+
6569
private static final Log logger = LogFactory.getLog(WebAsyncManager.class);
6670

6771
private static final UrlPathHelper urlPathHelper = new UrlPathHelper();
@@ -72,10 +76,12 @@ public final class WebAsyncManager {
7276
private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor =
7377
new TimeoutDeferredResultProcessingInterceptor();
7478

79+
private static Boolean taskExecutorWarning = true;
80+
7581

7682
private AsyncWebRequest asyncWebRequest;
7783

78-
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(this.getClass().getSimpleName());
84+
private AsyncTaskExecutor taskExecutor = DEFAULT_TASK_EXECUTOR;
7985

8086
private volatile Object concurrentResult = RESULT_NONE;
8187

@@ -280,6 +286,9 @@ public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object..
280286
if (executor != null) {
281287
this.taskExecutor = executor;
282288
}
289+
else {
290+
logExecutorWarning();
291+
}
283292

284293
List<CallableProcessingInterceptor> interceptors = new ArrayList<>();
285294
interceptors.add(webAsyncTask.getInterceptor());
@@ -333,6 +342,33 @@ public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object..
333342
}
334343
}
335344

345+
@SuppressWarnings("ConstantConditions")
346+
private void logExecutorWarning() {
347+
if (taskExecutorWarning && logger.isWarnEnabled()) {
348+
synchronized (DEFAULT_TASK_EXECUTOR) {
349+
AsyncTaskExecutor executor = this.taskExecutor;
350+
if (taskExecutorWarning &&
351+
(executor instanceof SimpleAsyncTaskExecutor || executor instanceof SyncTaskExecutor)) {
352+
String executorTypeName = executor.getClass().getSimpleName();
353+
logger.warn("\n!!!\n" +
354+
"An Executor is required to handle java.util.concurrent.Callable return values.\n" +
355+
"Please, configure a TaskExecutor in the MVC config under \"async support\".\n" +
356+
"The " + executorTypeName + " currently in use is not suitable under load.\n" +
357+
"-------------------------------\n" +
358+
"Request URI: '" + formatRequestUri() + "'\n" +
359+
"!!!");
360+
taskExecutorWarning = false;
361+
}
362+
}
363+
}
364+
}
365+
366+
private String formatRequestUri() {
367+
HttpServletRequest request = this.asyncWebRequest.getNativeRequest(HttpServletRequest.class);
368+
return request != null ? request.getRequestURI() : "servlet container";
369+
}
370+
371+
336372
private void setConcurrentResultAndDispatch(Object result) {
337373
synchronized (WebAsyncManager.this) {
338374
if (this.concurrentResult != RESULT_NONE) {

spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -115,7 +115,7 @@ public void startCallableProcessing() throws Exception {
115115
verifyDefaultAsyncScenario();
116116
verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, task);
117117
verify(interceptor).preProcess(this.asyncWebRequest, task);
118-
verify(interceptor).postProcess(this.asyncWebRequest, task, new Integer(concurrentResult));
118+
verify(interceptor).postProcess(this.asyncWebRequest, task, concurrentResult);
119119
}
120120

121121
@Test
@@ -161,9 +161,9 @@ public void startCallableProcessingBeforeConcurrentHandlingException() throws Ex
161161

162162
assertFalse(this.asyncManager.hasConcurrentResult());
163163

164-
verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull());
165-
verify(this.asyncWebRequest).addErrorHandler((Consumer<Throwable>) notNull());
166-
verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull());
164+
verify(this.asyncWebRequest).addTimeoutHandler(notNull());
165+
verify(this.asyncWebRequest).addErrorHandler(notNull());
166+
verify(this.asyncWebRequest).addCompletionHandler(notNull());
167167
}
168168

169169
@Test
@@ -303,9 +303,9 @@ public void startDeferredResultProcessingBeforeConcurrentHandlingException() thr
303303

304304
assertFalse(this.asyncManager.hasConcurrentResult());
305305

306-
verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull());
307-
verify(this.asyncWebRequest).addErrorHandler((Consumer<Throwable>) notNull());
308-
verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull());
306+
verify(this.asyncWebRequest).addTimeoutHandler(notNull());
307+
verify(this.asyncWebRequest).addErrorHandler(notNull());
308+
verify(this.asyncWebRequest).addCompletionHandler(notNull());
309309
}
310310

311311
@Test
@@ -353,7 +353,7 @@ public void startDeferredResultProcessingPostProcessException() throws Exception
353353
@Test
354354
public void startDeferredResultProcessingNullInput() throws Exception {
355355
try {
356-
this.asyncManager.startDeferredResultProcessing((DeferredResult<?>) null);
356+
this.asyncManager.startDeferredResultProcessing(null);
357357
fail("Expected exception");
358358
}
359359
catch (IllegalArgumentException ex) {
@@ -368,9 +368,9 @@ private void setupDefaultAsyncScenario() {
368368

369369
@SuppressWarnings("unchecked")
370370
private void verifyDefaultAsyncScenario() {
371-
verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull());
372-
verify(this.asyncWebRequest).addErrorHandler((Consumer<Throwable>) notNull());
373-
verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull());
371+
verify(this.asyncWebRequest).addTimeoutHandler(notNull());
372+
verify(this.asyncWebRequest).addErrorHandler(notNull());
373+
verify(this.asyncWebRequest).addCompletionHandler(notNull());
374374
verify(this.asyncWebRequest).startAsync();
375375
verify(this.asyncWebRequest).dispatch();
376376
}

spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/AsyncSupportConfigurer.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,10 +24,10 @@
2424
import org.springframework.core.task.AsyncTaskExecutor;
2525
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2626
import org.springframework.lang.Nullable;
27+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
2728
import org.springframework.web.context.request.async.CallableProcessingInterceptor;
2829
import org.springframework.web.context.request.async.DeferredResult;
2930
import org.springframework.web.context.request.async.DeferredResultProcessingInterceptor;
30-
import org.springframework.web.context.request.async.WebAsyncTask;
3131

3232
/**
3333
* Helps with configuring options for asynchronous request processing.
@@ -49,16 +49,15 @@ public class AsyncSupportConfigurer {
4949

5050

5151
/**
52-
* Set the default {@link AsyncTaskExecutor} to use when a controller method
53-
* returns a {@link Callable}. Controller methods can override this default on
54-
* a per-request basis by returning a {@link WebAsyncTask}.
55-
* <p>By default a {@link SimpleAsyncTaskExecutor} instance is used, and it's
56-
* highly recommended to change that default in production since the simple
57-
* executor does not re-use threads.
58-
* <p>As of 5.0 this executor is also used when a controller returns a reactive
59-
* type that does streaming (e.g. "text/event-stream" or
60-
* "application/stream+json") for the blocking writes to the
61-
* {@link javax.servlet.ServletOutputStream}.
52+
* The provided task executor is used to:
53+
* <ol>
54+
* <li>Handle {@link Callable} controller method return values.
55+
* <li>Perform blocking writes when streaming to the response
56+
* through a reactive (e.g. Reactor, RxJava) controller method return value.
57+
* </ol>
58+
* <p>By default only a {@link SimpleAsyncTaskExecutor} is used. However when
59+
* using the above two use cases, it's recommended to configure an executor
60+
* backed by a thread pool such as {@link ThreadPoolTaskExecutor}.
6261
* @param taskExecutor the task executor instance to use by default
6362
*/
6463
public AsyncSupportConfigurer setTaskExecutor(AsyncTaskExecutor taskExecutor) {

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.core.ReactiveAdapter;
3636
import org.springframework.core.ReactiveAdapterRegistry;
3737
import org.springframework.core.ResolvableType;
38+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3839
import org.springframework.core.task.SyncTaskExecutor;
3940
import org.springframework.core.task.TaskExecutor;
4041
import org.springframework.http.MediaType;
@@ -79,6 +80,8 @@ class ReactiveTypeHandler {
7980

8081
private final TaskExecutor taskExecutor;
8182

83+
private Boolean taskExecutorWarning;
84+
8285
private final ContentNegotiationManager contentNegotiationManager;
8386

8487

@@ -92,6 +95,7 @@ public ReactiveTypeHandler() {
9295
Assert.notNull(manager, "ContentNegotiationManager is required");
9396
this.reactiveRegistry = registry;
9497
this.taskExecutor = executor;
98+
this.taskExecutorWarning = executor instanceof SimpleAsyncTaskExecutor || executor instanceof SyncTaskExecutor;
9599
this.contentNegotiationManager = manager;
96100
}
97101

@@ -127,16 +131,19 @@ public ResponseBodyEmitter handleValue(Object returnValue, MethodParameter retur
127131
if (adapter.isMultiValue()) {
128132
if (mediaTypes.stream().anyMatch(MediaType.TEXT_EVENT_STREAM::includes) ||
129133
ServerSentEvent.class.isAssignableFrom(elementClass)) {
134+
logExecutorWarning(returnType);
130135
SseEmitter emitter = new SseEmitter(STREAMING_TIMEOUT_VALUE);
131136
new SseEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue);
132137
return emitter;
133138
}
134139
if (CharSequence.class.isAssignableFrom(elementClass)) {
140+
logExecutorWarning(returnType);
135141
ResponseBodyEmitter emitter = getEmitter(mediaType.orElse(MediaType.TEXT_PLAIN));
136142
new TextEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue);
137143
return emitter;
138144
}
139145
if (mediaTypes.stream().anyMatch(MediaType.APPLICATION_STREAM_JSON::includes)) {
146+
logExecutorWarning(returnType);
140147
ResponseBodyEmitter emitter = getEmitter(MediaType.APPLICATION_STREAM_JSON);
141148
new JsonEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue);
142149
return emitter;
@@ -171,6 +178,27 @@ protected void extendResponse(ServerHttpResponse outputMessage) {
171178
};
172179
}
173180

181+
@SuppressWarnings("ConstantConditions")
182+
private void logExecutorWarning(MethodParameter returnType) {
183+
if (this.taskExecutorWarning && logger.isWarnEnabled()) {
184+
synchronized (this) {
185+
if (this.taskExecutorWarning) {
186+
String executorTypeName = this.taskExecutor.getClass().getSimpleName();
187+
logger.warn("\n!!!\n" +
188+
"Streaming through a reactive type requires an Executor to write to the response.\n" +
189+
"Please, configure a TaskExecutor in the MVC config under \"async support\".\n" +
190+
"The " + executorTypeName + " currently in use is not suitable under load.\n" +
191+
"-------------------------------\n" +
192+
"Controller:\t" + returnType.getContainingClass().getName() + "\n" +
193+
"Method:\t\t" + returnType.getMethod().getName() + "\n" +
194+
"Returning:\t" + ResolvableType.forMethodParameter(returnType).toString() + "\n" +
195+
"!!!");
196+
this.taskExecutorWarning = false;
197+
}
198+
}
199+
}
200+
}
201+
174202

175203
private abstract static class AbstractEmitterSubscriber implements Subscriber<Object>, Runnable {
176204

spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ public void setup() throws Exception {
8080
ContentNegotiationManagerFactoryBean factoryBean = new ContentNegotiationManagerFactoryBean();
8181
factoryBean.afterPropertiesSet();
8282
ContentNegotiationManager manager = factoryBean.getObject();
83-
this.handler = new ReactiveTypeHandler(ReactiveAdapterRegistry.getSharedInstance(), new SyncTaskExecutor(), manager);
83+
ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
84+
this.handler = new ReactiveTypeHandler(adapterRegistry, new SyncTaskExecutor(), manager);
8485
resetRequest();
8586
}
8687

@@ -89,8 +90,8 @@ private void resetRequest() {
8990
this.servletResponse = new MockHttpServletResponse();
9091
this.webRequest = new ServletWebRequest(this.servletRequest, this.servletResponse);
9192

92-
AsyncWebRequest asyncWebRequest = new StandardServletAsyncWebRequest(this.servletRequest, this.servletResponse);
93-
WebAsyncUtils.getAsyncManager(this.webRequest).setAsyncWebRequest(asyncWebRequest);
93+
AsyncWebRequest webRequest = new StandardServletAsyncWebRequest(this.servletRequest, this.servletResponse);
94+
WebAsyncUtils.getAsyncManager(this.webRequest).setAsyncWebRequest(webRequest);
9495
this.servletRequest.setAsyncSupported(true);
9596
}
9697

@@ -121,7 +122,8 @@ public void deferredResultSubscriberWithOneValue() throws Exception {
121122
// RxJava 1 Single
122123
AtomicReference<SingleEmitter<String>> ref = new AtomicReference<>();
123124
Single<String> single = Single.fromEmitter(ref::set);
124-
testDeferredResultSubscriber(single, Single.class, forClass(String.class), () -> ref.get().onSuccess("foo"), "foo");
125+
testDeferredResultSubscriber(single, Single.class, forClass(String.class),
126+
() -> ref.get().onSuccess("foo"), "foo");
125127

126128
// RxJava 2 Single
127129
AtomicReference<io.reactivex.SingleEmitter<String>> ref2 = new AtomicReference<>();

src/docs/asciidoc/web/webmvc.adoc

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3640,14 +3640,13 @@ Spring MVC supports Reactor and RxJava through the
36403640
`spring-core` which allows it to adapt from multiple reactive libraries.
36413641
====
36423642

3643-
When streaming to the response via reactive types, Spring MVC supports reactive back
3644-
pressure, but still needs to use blocking I/O to perform actual writes. This is done
3645-
through the <<mvc-ann-async-configuration-spring-mvc,configured>> MVC `TaskExecutor` on
3646-
a separate thread in order to avoid blocking the upstream source (e.g. a `Flux` returned
3647-
from the `WebClient`). By default a `SyncTaskExecutor` is used which is not suitable for
3648-
production. https://jira.spring.io/browse/SPR-16203[SPR-16203] will provide better
3649-
defaults in Spring Framework 5.1. In the mean time please configure the executor through
3650-
the <<mvc-ann-async-configuration-spring-mvc,MVC config>>.
3643+
For streaming to the response, reactive back pressure is supported, but writes to the
3644+
response are still blocking, and are executed on a separate thread through the
3645+
<<mvc-ann-async-configuration-spring-mvc,configured>> `TaskExecutor` in order to avoid
3646+
blocking the upstream source (e.g. a `Flux` returned from the `WebClient`).
3647+
By default `SimpleAsyncTaskExecutor` is used for the blocking writes but that is not
3648+
suitable under load. If you plan to stream with a reactive type, please use the
3649+
<<mvc-ann-async-configuration-spring-mvc,MVC config>> to configure a task executor.
36513650

36523651

36533652

0 commit comments

Comments
 (0)