前言 什么是异步调用?什么时候需要异步调用?
相信大家日常开发过程中,大部分场景使用的是同步的方式来处理。但是当我遇到如下场景如:
执行一次请求需要执行
A(调用第三方API获取信息数据a)消耗200ms,
B(根据条件去数据库查询或RPC接口获取信息数据b)消耗150ms,
C(组装a,b)消耗20ms。
同步:C等待B执行完成,B等待A执行完成,结果为370ms
异步:C等待A、B(并行)执行完成,结果为220ms
注:以上仅举例介绍异步在特殊场景下带来的性能优势,不考虑网络以及线程切换等带来的开销,数据无参考价值。
异步带来的性能优势不言而喻,当处理一些无需同步返回操作的时候尤为明显(不会阻塞当前线程、可并行等)。比如发送邮件(也可以选择MQ),写日志表等等。话不多说,接下来便进入实战环节~
快速开始
JDK Version:1.8
Spring Version::5.1.3
启用异步 @EnableAsync
启用异步(自动装配)
1 2 3 4 5 6 7 8 @EnableAsync @SpringBootApplication public class SamplesWithoutDbApplication { public static void main (String[] args) { SpringApplication.run(SamplesWithoutDbApplication.class, args); } }
编写代码 @Async
给方法赋予异步执行的能力
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Slf 4j@Service public class AsyncService { ... @Async public Future<String> supplyAsync () { String result = "fail" ; try { TimeUnit.SECONDS.sleep(2 ); log.info("[{}] - supplyAsync end at {}." , Thread.currentThread().getName(), LocalDateTime.now()); result = "hello world..." ; } catch (InterruptedException e) { e.printStackTrace(); } return new AsyncResult<>(result); } @Async public void runAsync () { try { TimeUnit.SECONDS.sleep(2 ); log.info("[{}] - runAsync end at {}." , Thread.currentThread().getName(), LocalDateTime.now()); } catch (InterruptedException e) { e.printStackTrace(); } } ... }
暴露服务接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Slf 4j@RestController public class AsyncController { @Autowired private AsyncService asyncService; @RequestMapping ("supplyAsync" ) public String supplyAsync () throws Exception { Future<String> asyncResult = asyncService.supplyAsync(); log.info("supply async has returned..." ); return asyncResult.get(3 , TimeUnit.SECONDS); } @RequestMapping ("runAsync" ) public void runAsync () { asyncService.runAsync(); log.info("run async has returned..." ); } }
验证 1 2 2019-09-13 21:28:46.914 INFO 2064 --- [io-18080-exec-1] c.g.b.s.controller.AsyncController : run async has returned... 2019-09-13 21:28:48.920 INFO 2064 --- [ task-1] c.g.b.samples.service.AsyncService : [task-1] - runAsync end at 2019-09-13T21:28:48.920.
通过日志我们能发现执行 AsyncService#runAsync
的线程名称为 task-1
,而执行响应的线程为 io-18080-exec-1
,异步调用成功。
源码解读 原理猜测 日常环节,在翻源码之前,我们不妨猜测一下Spring Async
的实现原理——动态代理,下面是猜测的伪代码:
1 2 3 executorService.execute(() -> { method.invoke(obejct, args); });
源码验证 我们带着猜测,从 @EnableAsync
注解入手:
1 2 3 4 5 6 7 @Target (ElementType.TYPE)@Retention (RetentionPolicy.RUNTIME)@Documented @Import (AsyncConfigurationSelector.class)public @interface EnableAsync { ... }
会心一笑,你看到了 @Import(AsyncConfigurationSelector.class)
,二话不说,直接点进 AsyncConfigurationSelector
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class AsyncConfigurationSelector extends AdviceModeImportSelector <EnableAsync > { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration" ; @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default : return null ; } } }
这里提供了两种自动装配类,那么程序走的是 PROXY
还是 ASPECTJ
呢?AsyncConfigurationSelector#selectImports
上的Java Doc
给了我们提示,我们查看一下 EnableAsync#mode()
,
1 AdviceMode mode () default AdviceMode.PROXY ;
默认是 AdviceMode.PROXY
, 所以装配的是 ProxyAsyncConfiguration
,而通过debug
也能证实这一点,那么我们转移到 ProxyAsyncConfiguration
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Configuration @Role (BeanDefinition.ROLE_INFRASTRUCTURE)public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean (name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role (BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor () { Assert.notNull(this .enableAsync, "@EnableAsync annotation metadata was not injected" ); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this .executor, this .exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this .enableAsync.getClass("annotation" ); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation" )) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this .enableAsync.getBoolean("proxyTargetClass" )); bpp.setOrder(this .enableAsync.<Integer>getNumber("order" )); return bpp; } }
这个配置类给我们装配了一个 AsyncAnnotationBeanPostProcessor
Bean,然后继续往下翻 -> AsyncAnnotationAdvisor
-> AnnotationAsyncExecutionInterceptor
, 而 AnnotationAsyncExecutionInterceptor
本身只是重写了一个 #getExecutorQualifier
,我们将目光投向他的父类——AsyncExecutionInterceptor
,在这个类中,我们看到了一个熟悉的方法名 AsyncExecutionInterceptor#invoke
, 我们最开始猜测 Spring Async
是通过动态代理来实现的,看到这更一步确定了我们的猜测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor , Ordered { ... @Override @Nullable public Object invoke (final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null ); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null ) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either" ); } Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null ; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); } ... }
通过上诉代码我们可知,Spring Async
将 invocation.proceed()
封装到了 一个 Callable
中,然后调用了其父类的 AsyncExecutionAspectSupport#doSubmit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware { @Nullable protected Object doSubmit (Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { if (CompletableFuture.class.isAssignableFrom(returnType)) { return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException(ex); } }, executor); } else if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null ; } } }
这里是通过使用 JDK1.8 提共的 CompletableFuture#supplyAsync
来实现异步调用,所以我们如果不使用Spring Async
,可以借助 CompletableFuture
来实现异步调用,重构的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 public void runAsyncByThreadPool () { CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2 ); log.info("[{}] - runAsyncByThreadPool end at {}." , Thread.currentThread().getName(), LocalDateTime.now()); } catch (InterruptedException e) { e.printStackTrace(); } }); }
注:有返回值可以使用 CompletableFuture#supplyAsync
扩展 Spring
提供了 RequestContextHolder#getRequestAttributes()
静态方法供我们获取到当前上下文的 request
,通过的是 ThreadLocal
,但是我们执行方法异步调用的时候切换了线程,所以是无法通过 RequestContextHolder#getRequestAttributes()
获取到 request
,如下会输出 "request is null"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Async public void runAsyncWithRequest () { ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); if (requestAttributes != null ) { HttpServletRequest request = requestAttributes.getRequest(); log.info("[{}] - runAsyncWithRequest end at {}." , Thread.currentThread().getName(), LocalDateTime.now()); log.info("==> {}: {}" , request.getMethod().toUpperCase(), request.getRequestURI()); } else { log.info("request is null" ); } }
那么要如何处理呢?
我们知道 ThreadPoolTaskExecutor
提供了#setTaskDecorator
方法,可以围绕任务的调用设置一些执行上下文,通过之前的代码阅读,我们知道 Spring Async
通过 CompletableFuture#supplyAsync
实现异步的,而 CompletableFuture
是提供了重载方法方便我们指定 task
执行的线程池,那么我们只需要替换 Spring Async
指定的线程池即可
通过阅读源码发现, Spring Async
提供的 executor 是通过 AsyncExecutionAspectSupport#determineAsyncExecutor
通过调用this.executors.get(method)
提供的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 protected AsyncTaskExecutor determineAsyncExecutor (Method method) { AsyncTaskExecutor executor = this .executors.get(method); if (executor == null ) { Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this .beanFactory, qualifier); } else { targetExecutor = this .defaultExecutor.get(); } if (targetExecutor == null ) { return null ; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this .executors.put(method, executor); } return executor; }
当Spring
加载AsyncExecutionAspectSupport
的时候会初始化 executors
——一个 size
为0
的 ConcurrentHashMap
1 private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16 );
而 executors
只有当 this.executors.get(method) == null
的时候才会创建一个 executor
并 put
进去,通过上述代码可知,当我们没有设置Async#value
的时候,程序会从defaultExecutor
获取一个 executor
提供给CompletableFuture
,而 defaultExecutor
是通过AsyncExecutionAspectSupport#getDefaultExecutor
获取,代码如下
1 2 3 4 5 6 7 rotected Executor getDefaultExecutor (@Nullable BeanFactory beanFactory) { if (beanFactory != null ) { try { return beanFactory.getBean(TaskExecutor.class); } ... }
Spring
在 TaskExecutionAutoConfiguration#applicationTaskExecutor
里面给我们提供了一个TaskExecutor
Bean
,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder () { TaskExecutionProperties.Pool pool = this .properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); builder = builder.threadNamePrefix(this .properties.getThreadNamePrefix()); builder = builder.customizers(this .taskExecutorCustomizers); builder = builder.taskDecorator(this .taskDecorator.getIfUnique()); return builder; } @Lazy @Bean (name = APPLICATION_TASK_EXECUTOR_BEAN_NAME)@ConditionalOnMissingBean (Executor.class)public ThreadPoolTaskExecutor applicationTaskExecutor (TaskExecutorBuilder builder) { return builder.build(); }
通过@ConditionalOnMissingBean(Executor.class)
我们发现当Spring
不存在 Executor
这个Bean
的时候才会创建,这样就方便我们扩展,我们可以自己顶一个Executor
去覆盖掉默认实现,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Bean public Executor taskExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setQueueCapacity(100 ); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(10 ); executor.setAllowCoreThreadTimeOut(true ); executor.setKeepAliveSeconds(60 ); executor.setThreadNamePrefix("task-executor-" ); executor.setTaskDecorator(runnable -> { RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); return () -> { try { RequestContextHolder.setRequestAttributes(requestAttributes); runnable.run(); } finally { RequestContextHolder.resetRequestAttributes(); } }; }); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; }
最后启动程序,调用runAsyncWithRequest
方法,输出日志如下
1 2 3 2019-09-13 23:22:46.001 INFO 8772 --- [io-18080-exec-1] c.g.b.s.controller.AsyncController : runAsync with request has returned... 2019-09-13 23:22:46.007 INFO 8772 --- [task-executor-1] c.g.b.samples.service.AsyncService : [task-executor-1] - runAsyncWithRequest end at 2019-09-13T23:22:46.007. 2019-09-13 23:22:46.007 INFO 8772 --- [task-executor-1] c.g.b.samples.service.AsyncService : ==> POST: /runAsyncWithRequest
完结~