Spring Retry 实战 & 源码

Spring Retry 实战 & 源码

前言

To make processing more robust and less prone to failure, it sometimes helps to automatically retry a failed operation, in case it might succeed on a subsequent attempt. Errors that are susceptible to this kind of treatment are transient in nature. For example, a remote call to a web service or an RMI service that fails because of a network glitch or a DeadLockLoserException in a database update may resolve itself after a short wait. To automate the retry of such operations, Spring Retry has the RetryOperations strategy.

日常开发中,我们无法避免在调用远程Web服务时候因为网络故障或数据库死锁等原因导致的调用失败,有时候这些故障会在短暂的等待后自行恢复,而为了自动重试这些操作,我们可以选择引入 Spring Retry .

快速开始

本文引入的 spring-retry 版本为 1.2.2RELEASEaspectjweaver 版本为1.9.2

引入依赖

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>

<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

启用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 启动类开启Spring Retry自动装备: @EnableRetry
@EnableRetry
@SpringBootApplication
public class SamplesWithoutDbApplication {

public static void main(String[] args) {
SpringApplication.run(SamplesWithoutDbApplication.class, args);
}
}

// 方法开启Spring Retry
@Retryable(value = Exception.class, maxAttempts = 2,
backoff = @Backoff(delay = 2000L, multiplier = 1.5))
@Override
public void sendMessage(String message) {
...
}

@Retryable 简介

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Retryable {

/**
* 重试侦听器bean名称,与其他属性互斥。
*/
String interceptor() default "";

/**
* 可重试的异常类型,includes的同义词。默认为空(如果excludes也为空,则重试所有异常)。
*/
Class<? extends Throwable>[] value() default {};

/**
* 可重试的异常类型,默认为空(如果excludes也为空,则重试所有异常)。
*/
Class<? extends Throwable>[] include() default {};

/**
* 无需重试的异常类型。默认为空(如果includes也为空,则重试所有异常)。
*/
Class<? extends Throwable>[] exclude() default {};

...

/**
* 最大尝试次数(包括第一次失败),默认为3
*/
int maxAttempts() default 3;

/**
* 计算最大尝试次数表达式(包括第一次失败)
*/
String maxAttemptsExpression() default "";

/**
* 重试等待策略
*/
Backoff backoff() default @Backoff();

...

}

@Backoff 简介

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(RetryConfiguration.class)
@Documented
public @interface Backoff {

/**
* 同 delay
*/
long value() default 1000;

/**
* 以毫秒为单位的延迟时间。
*/
long delay() default 0;

/**
* 两次重试之间的最大等待时间(毫秒)。如果小于delay(),则应用默认值30000。
*/
long maxDelay() default 0;

/**
* 如果为正,则用作生成下一个延迟的倍数。
*/
double multiplier() default 0;

...

/**
* 当 multiplier() > 0,将此值设置为true,使延迟随机化,
* 以便最大延迟是前一个延迟的倍数,并且两个值之间的分布是均匀的。
*/
boolean random() default false;

}

测试

1
2
3
4
5
6
7
...
2019-09-08 17:53:14.052 INFO 2200 --- [io-18080-exec-1] c.g.b.s.service.impl.UserServiceImpl : 2019-09-08T17:53:14.052 :start send message...
2019-09-08 17:53:16.054 INFO 2200 --- [io-18080-exec-1] c.g.b.s.service.impl.UserServiceImpl : 2019-09-08T17:53:16.054 :start send message...
2019-09-08 17:53:16.062 ERROR 2200 --- [io-18080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.IllegalArgumentException: Send message is not allowed to be empty] with root cause

java.lang.IllegalArgumentException: Send message is not allowed to be empty
...

因为代码里面maxAttempts 设置为2,所以在重试2次后依旧异常的情况下,抛出异常。

源码解读

在翻源码之前,我们可以大胆的猜测一下,Spring Retry的实现原理——动态代理,下面是猜测的伪代码:

1
2
3
4
5
6
7
try {
method.invoke(obejct, args);
} catch (Exception e) {
if (canRetry()) {
doRetry();
}
}

然后我们带着猜测,从我们引入依赖后敲的第一个 Spring Retry 的注解 @EnableRetry 入手

1
2
3
4
5
6
7
8
9
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@EnableAspectJAutoProxy(proxyTargetClass = false)
@Import(RetryConfiguration.class)
@Documented
public @interface EnableRetry {

boolean proxyTargetClass() default false;
}

熟悉Spring Boot的小伙伴可能此时会心一笑,没错Spring Boot中大量使用的EnableXXX注解通过@Import(XXXConfiguration.class) 来实现装配。

所以我们进入到 RetryConfiguration 中,先看这样一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@PostConstruct
public void init() {
Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);
retryableAnnotationTypes.add(Retryable.class);
this.pointcut = buildPointcut(retryableAnnotationTypes);
this.advice = buildAdvice();
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}

...

protected Advice buildAdvice() {
AnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor();
...
return interceptor;
}

@PostConstruct 用于在完成依赖注入以执行任何初始化之后需要执行的方法。所以上面这段代码我们理解为在

RetryConfiguration初始化后完成对 RetryConfiguration#advice 的初始化。

RetryConfiguration#advice 指向的是 AnnotationAwareRetryOperationsInterceptor ,是切面(或者代理)具体要执行的操作。

而在AnnotationAwareRetryOperationsInterceptor 中我们看到了一个熟悉的方法——invoke

1
2
3
4
5
6
7
8
9
10
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());
if (delegate != null) {
return delegate.invoke(invocation);
}
else {
return invocation.proceed();
}
}

delegate 不为空的时候执行 RetryOperationsInterceptor#invoke ,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public Object invoke(final MethodInvocation invocation) throws Throwable {

String name;
if (StringUtils.hasText(label)) {
name = label;
} else {
name = invocation.getMethod().toGenericString();
}
final String label = name;

RetryCallback<Object, Throwable> retryCallback = new RetryCallback<Object, Throwable>() {

public Object doWithRetry(RetryContext context) throws Exception {

context.setAttribute(RetryContext.NAME, label);

if (invocation instanceof ProxyMethodInvocation) {
try {
return ((ProxyMethodInvocation) invocation).invocableClone().proceed();
}
catch (Exception e) {
throw e;
}
catch (Error e) {
throw e;
}
catch (Throwable e) {
throw new IllegalStateException(e);
}
} else {
throw new IllegalStateException("MethodInvocation of the wrong type detected - this should not happen with Spring AOP, so please raise an issue if you see this exception");
}
}

};

if (recoverer != null) {
ItemRecovererCallback recoveryCallback = new ItemRecovererCallback(
invocation.getArguments(), recoverer);
return this.retryOperations.execute(retryCallback, recoveryCallback);
}

return this.retryOperations.execute(retryCallback);

}

上述代码创建了一个重试回调方法,然后执行重试操作(RetryTemplate#execute),而RetryTemplate#execute又调用了 RetryTemplate#doExecute 这是真正执行重试流程的地方,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state)
throws E, ExhaustedRetryException {

RetryPolicy retryPolicy = this.retryPolicy;
BackOffPolicy backOffPolicy = this.backOffPolicy;

// 初始化重试策略上下文,如重试次数、上一次执行结果的异常信息等...
RetryContext context = open(retryPolicy, state);
if (this.logger.isTraceEnabled()) {
this.logger.trace("RetryContext retrieved: " + context);
}

// 利用ThreadLocal确保上下文能获取到RetryContext
RetrySynchronizationManager.register(context);

Throwable lastException = null;

boolean exhausted = false;
try {

...

// 获取backoff上下文
BackOffContext backOffContext = ...;

// 重试流程
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {

try {
// 重置最后一次异常(确保本次执行结果不会受到上一次结果的影响)
lastException = null;
// 执行重试流程
return retryCallback.doWithRetry(context);
} catch (Throwable e) {

// 记录异常信息
lastException = e;
try {
registerThrowable(retryPolicy, state, context, e);
} catch (Exception ex) {
throw new TerminatedRetryException("Could not register throwable",
ex);
} finally {
// 执行异常拦截器
doOnErrorInterceptors(retryCallback, context, e);
}

if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
// 如果还有重试机会,则进行休眠、等待下一次重试
backOffPolicy.backOff(backOffContext);
} catch (BackOffInterruptedException ex) {
lastException = e;
// 被其他线程阻止,则重试失败
throw ex;
}
}

if (shouldRethrow(retryPolicy, context, state)) {
// 确定重试失败,则抛出异常
throw RetryTemplate.<E>wrapIfNecessary(e);
}

}

// 可以通过像circuit breaker 或者 rollback classifier 来退出重试
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}
}
...

exhausted = true;
// 恢复回调,清理缓存等
return handleRetryExhausted(recoveryCallback, context, state);

} catch (Throwable e) {
throw RetryTemplate.<E>wrapIfNecessary(e);
} finally {
close(retryPolicy, context, state, lastException == null || exhausted);
doCloseInterceptors(retryCallback, context, lastException);
RetrySynchronizationManager.clear();
}

}

总结

上述简单的介绍了 Spring Retry 执行重试的操作的流程,原理也符合我们预先的猜测——动态代理。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×