Spring Async 实战 & 源码

Spring Async 实战 & 源码

前言

什么是异步调用?什么时候需要异步调用?

相信大家日常开发过程中,大部分场景使用的是同步的方式来处理。但是当我遇到如下场景如:

执行一次请求需要执行

  1. A(调用第三方API获取信息数据a)消耗200ms,
  2. B(根据条件去数据库查询或RPC接口获取信息数据b)消耗150ms,
  3. C(组装a,b)消耗20ms。
  • 同步:C等待B执行完成,B等待A执行完成,结果为370ms
  • 异步:C等待A、B(并行)执行完成,结果为220ms

注:以上仅举例介绍异步在特殊场景下带来的性能优势,不考虑网络以及线程切换等带来的开销,数据无参考价值。

异步带来的性能优势不言而喻,当处理一些无需同步返回操作的时候尤为明显(不会阻塞当前线程、可并行等)。比如发送邮件(也可以选择MQ),写日志表等等。话不多说,接下来便进入实战环节~

快速开始

JDK Version:1.8

Spring Version::5.1.3

启用异步

@EnableAsync 启用异步(自动装配)

1
2
3
4
5
6
7
8
@EnableAsync
@SpringBootApplication
public class SamplesWithoutDbApplication {

public static void main(String[] args) {
SpringApplication.run(SamplesWithoutDbApplication.class, args);
}
}

编写代码

@Async 给方法赋予异步执行的能力

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
@Service
public class AsyncService {

...

/**
* 异步执行:有返回值
*/
@Async
public Future<String> supplyAsync() {

String result = "fail";
try {
// sleep 2 seconds
TimeUnit.SECONDS.sleep(2);
log.info("[{}] - supplyAsync end at {}.", Thread.currentThread().getName(), LocalDateTime.now());
result = "hello world...";

} catch (InterruptedException e) {
e.printStackTrace();
}
return new AsyncResult<>(result);
}


/**
* 异步执行:无返回值
*/
@Async
public void runAsync() {

try {
// sleep 2 seconds
TimeUnit.SECONDS.sleep(2);
log.info("[{}] - runAsync end at {}.", Thread.currentThread().getName(), LocalDateTime.now());

} catch (InterruptedException e) {
e.printStackTrace();
}
}

...

}

暴露服务接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@RestController
public class AsyncController {

@Autowired
private AsyncService asyncService;

@RequestMapping("supplyAsync")
public String supplyAsync() throws Exception {

Future<String> asyncResult = asyncService.supplyAsync();
log.info("supply async has returned...");
return asyncResult.get(3, TimeUnit.SECONDS);
}


@RequestMapping("runAsync")
public void runAsync() {

asyncService.runAsync();
log.info("run async has returned...");
}

}

验证

1
2
2019-09-13 21:28:46.914  INFO 2064 --- [io-18080-exec-1] c.g.b.s.controller.AsyncController       : run async has returned...
2019-09-13 21:28:48.920 INFO 2064 --- [ task-1] c.g.b.samples.service.AsyncService : [task-1] - runAsync end at 2019-09-13T21:28:48.920.

通过日志我们能发现执行 AsyncService#runAsync 的线程名称为 task-1 ,而执行响应的线程为 io-18080-exec-1 ,异步调用成功。

源码解读

原理猜测

日常环节,在翻源码之前,我们不妨猜测一下Spring Async 的实现原理——动态代理,下面是猜测的伪代码:

1
2
3
executorService.execute(() -> {
method.invoke(obejct, args);
});

源码验证

我们带着猜测,从 @EnableAsync 注解入手:

1
2
3
4
5
6
7
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
...
}

会心一笑,你看到了 @Import(AsyncConfigurationSelector.class) ,二话不说,直接点进 AsyncConfigurationSelector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";


/**
* Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
* for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
* respectively.
*/
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}

这里提供了两种自动装配类,那么程序走的是 PROXY 还是 ASPECTJ 呢?AsyncConfigurationSelector#selectImports 上的Java Doc 给了我们提示,我们查看一下 EnableAsync#mode()

1
AdviceMode mode() default AdviceMode.PROXY;

默认是 AdviceMode.PROXY , 所以装配的是 ProxyAsyncConfiguration,而通过debug也能证实这一点,那么我们转移到 ProxyAsyncConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}

}

这个配置类给我们装配了一个 AsyncAnnotationBeanPostProcessor Bean,然后继续往下翻 -> AsyncAnnotationAdvisor -> AnnotationAsyncExecutionInterceptor , 而 AnnotationAsyncExecutionInterceptor 本身只是重写了一个 #getExecutorQualifier,我们将目光投向他的父类——AsyncExecutionInterceptor,在这个类中,我们看到了一个熟悉的方法名 AsyncExecutionInterceptor#invoke, 我们最开始猜测 Spring Async 是通过动态代理来实现的,看到这更一步确定了我们的猜测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {

...

@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}

Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};

return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

...

}

通过上诉代码我们可知,Spring Asyncinvocation.proceed() 封装到了 一个 Callable 中,然后调用了其父类的 AsyncExecutionAspectSupport#doSubmit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
}

这里是通过使用 JDK1.8 提共的 CompletableFuture#supplyAsync 来实现异步调用,所以我们如果不使用Spring Async ,可以借助 CompletableFuture 来实现异步调用,重构的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void runAsyncByThreadPool() {

CompletableFuture.runAsync(() -> {
try {
// sleep 2 seconds
TimeUnit.SECONDS.sleep(2);
log.info("[{}] - runAsyncByThreadPool end at {}.", Thread.currentThread().getName(), LocalDateTime.now());

} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

注:有返回值可以使用 CompletableFuture#supplyAsync

扩展

Spring 提供了 RequestContextHolder#getRequestAttributes() 静态方法供我们获取到当前上下文的 request,通过的是 ThreadLocal,但是我们执行方法异步调用的时候切换了线程,所以是无法通过 RequestContextHolder#getRequestAttributes()获取到 request ,如下会输出 "request is null"

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Async
public void runAsyncWithRequest() {

ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();

if (requestAttributes != null) {

HttpServletRequest request = requestAttributes.getRequest();
log.info("[{}] - runAsyncWithRequest end at {}.", Thread.currentThread().getName(), LocalDateTime.now());
log.info("==> {}: {}", request.getMethod().toUpperCase(), request.getRequestURI());
} else {
log.info("request is null");
}
}

那么要如何处理呢?

我们知道 ThreadPoolTaskExecutor 提供了#setTaskDecorator 方法,可以围绕任务的调用设置一些执行上下文,通过之前的代码阅读,我们知道 Spring Async 通过 CompletableFuture#supplyAsync 实现异步的,而 CompletableFuture 是提供了重载方法方便我们指定 task 执行的线程池,那么我们只需要替换 Spring Async 指定的线程池即可

通过阅读源码发现, Spring Async 提供的 executor 是通过 AsyncExecutionAspectSupport#determineAsyncExecutor 通过调用this.executors.get(method)提供的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
// qualifier 是 Async#value,我们没有设置,所以为null
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
// 从defaultExecutor获取一个 executor 赋值给 targetExecutor
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}

Spring 加载AsyncExecutionAspectSupport 的时候会初始化 executors ——一个 size0ConcurrentHashMap

1
private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);

executors 只有当 this.executors.get(method) == null 的时候才会创建一个 executorput 进去,通过上述代码可知,当我们没有设置Async#value 的时候,程序会从defaultExecutor获取一个 executor 提供给CompletableFuture,而 defaultExecutor 是通过AsyncExecutionAspectSupport#getDefaultExecutor获取,代码如下

1
2
3
4
5
6
7
rotected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
return beanFactory.getBean(TaskExecutor.class);
}
...
}

SpringTaskExecutionAutoConfiguration#applicationTaskExecutor 里面给我们提供了一个TaskExecutor Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder() {
TaskExecutionProperties.Pool pool = this.properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix());
builder = builder.customizers(this.taskExecutorCustomizers);
builder = builder.taskDecorator(this.taskDecorator.getIfUnique());
return builder;
}

@Lazy
@Bean(name = APPLICATION_TASK_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}

通过@ConditionalOnMissingBean(Executor.class)我们发现当Spring 不存在 Executor 这个Bean 的时候才会创建,这样就方便我们扩展,我们可以自己顶一个Executor 去覆盖掉默认实现,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Bean
public Executor taskExecutor() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setQueueCapacity(100);
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setAllowCoreThreadTimeOut(true);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("task-executor-");

executor.setTaskDecorator(runnable -> {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return () -> {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
runnable.run();
} finally {
RequestContextHolder.resetRequestAttributes();
}
};
});
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}

最后启动程序,调用runAsyncWithRequest方法,输出日志如下

1
2
3
2019-09-13 23:22:46.001  INFO 8772 --- [io-18080-exec-1] c.g.b.s.controller.AsyncController       : runAsync with request has returned...
2019-09-13 23:22:46.007 INFO 8772 --- [task-executor-1] c.g.b.samples.service.AsyncService : [task-executor-1] - runAsyncWithRequest end at 2019-09-13T23:22:46.007.
2019-09-13 23:22:46.007 INFO 8772 --- [task-executor-1] c.g.b.samples.service.AsyncService : ==> POST: /runAsyncWithRequest

完结~

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×