在《 OpenFeign 与 Ribbon 源码分析总结》这篇文章中,我们只是简单地了解 Ribbon 的重试机制的实现原理,本篇我们再对 Ribbon 的重试机制地实现做详细分析,从源码分析找出我们想要地答案,即如何配置 Ribbon 实现调用每个服务的接口使用不一样的重试策略,如配置失败重试多少次,以及自定义重试策略 RetryHandler 。
Ribbon Ribbon RetryHandler
LoadBalancerFeignClient : OpenFeign 整合 Ribbon 时使用的 Client ( OpenFeign 使用 Client 发送请求); FeignLoadBalancer : OpenFeign 整合 Ribbon 的桥接器,由 LoadBalancerFeignClient 创建; LoadBalancerCommand : Ribbon 将请求转为 RxJava API 调用的实现,由 FeignLoadBalancer 调用; CachingSpringLoadBalancerFactory : OpenFeign 整合 Ribbon 用于创建 FeignLoadBalancer 桥接器的带缓存功能的 FeignLoadBalancer 工厂。 RibbonLoadBalancerClient : Ribbon 提供的实现 Spring Cloud 负载均衡接口( LoadBalancerClient )的类; RibbonAutoConfiguration : Ribbon 的自动配置类,注册 RibbonLoadBalancerClient 到 Spring 容器。 SpringClientFactory : Ribbon 自己管理一群 ApplicationContext , Ribbon 会为每个 Client 创建一个 ApplicationContext ; RibbonClientConfiguration : Ribbon 为每个 Client 提供 ApplicationContext 实现环境隔离,这是 Ribbon 为每个 Client 创建 ApplicationContext 时都使用的配置类,用于注册 Ribbon 的各种功能组件,如负载均衡器 ILoadBalancer ; RequestSpecificRetryHandler : RetryHandler 接口的实现类, OpenFeign 整合 Ribbon 使用的默认失败重试策略处理器; Ribbon 重试机制地实现源码分析 Ribbon 的重试机制使用了 RxJava 的 API ,而重试次数以及是否重试的决策由 RetryHandler 实现。 Ribbon 提供两个 RetryHandler 的实现类,如下图所示。
现在我们要找出 Ribbon 使用的是哪个 RetryHandler ,我们只分析 OpenFeign 与 Ribbon 整合的使用, Spring Cloud 的 @LoadBalanced 注解方式使用我们不做分析。
spring-cloud-netflix-ribbon 的 spring.factories 文件导入的自动配置类是 RibbonAutoConfiguration ,该配置类向 Spring 容器注入了一个 RibbonLoadBalancerClient , RibbonLoadBalancerClient 正是 Ribbon 为 Spring Cloud 的负载均衡接口提供的实现类。
在创建 RibbonLoadBalancerClient 时给构造方法传入了一个 SpringClientFactory ,源码如下。
@Configuration
public class RibbonAutoConfiguration{
// 创建RibbonLoadBalancerClient
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
}
复制代码
SpringClientFactory 是 Ribbon 使用的 ApplicationContext , Ribbon 会为每个 Client 都创建一个 AnnotationConfigApplicationContext ,用作环境隔离。
SpringClientFactory 在调用父类构造方法时传入了一个配置类: RibbonClientConfiguration ,源码如下。
public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification>{
public SpringClientFactory() {
super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
}
}
复制代码
RibbonClientConfiguration 配置类在每个 Client 对应的 AnnotationConfigApplicationContext 初始化时生效,在第一次调用服务的接口时 AnnotationConfigApplicationContext 才被创建。创建 ApplicationContext 并且调用 register 方法注册 RibbonClientConfiguration 配置类以及其它一些配置类,最后调用其 refresh 方法初始化该 ApplicationContext 。
RibbonClientConfiguration 负责为每个 Client 对应的 ApplicationContext 注入服务列表 ServerList<Server> 、服务列表更新器 ServerListUpdater 、负载均衡器 ILoadBalancer 、负载均衡算法 IRule 、客户端配置 IClientConfig 、重试决策处理器 RetryHandler 等。
ServerList<Server> :从注册中心获取可用服务提供者节点; ServerListUpdater :定时更新本地缓存的服务列表,调用 ServerList 从注册中心获取; IRule :实现各种负载均衡算法,如随机、轮询等; ILoadBalancer :调用负载均衡算法 IRule 选择一个服务提供者节点调用; RetryHandler :决定本次失败是否重试; 由于 RibbonClientConfiguration 注册的 Bean 是注册在 Client 隔离的 ApplicationContext 中的, 所以调用每个服务提供者的接口将可以使用不同的客户端配置( IClientConfig )、重试决策处理器( RetryHandler )等。这也是我们能够为 Ribbon 配置调用每个服务的接口使用不一样的重试策略的前提条件,不过这也不是充分必要条件。
RibbonClientConfiguration 配置类会注册一个重试决策处理器 RetryHandler ,但这个 RetryHandler 并未被使用,也可能是别的地方使用。
@Configuration
public class RibbonClientConfiguration{
// 未使用
@Bean
@ConditionalOnMissingBean
public RetryHandler retryHandler(IClientConfig config) {
return new DefaultLoadBalancerRetryHandler(config);
}
}
复制代码
OpenFeign 整合 Ribbon 时,真正使用的 RetryHandler 是 RequestSpecificRetryHandler 。前面我们分析 OpenFeign 整合 Ribbon 源码时提到一个启到桥接作用的类: FeignLoadBalancer 。
当 OpenFeign 整合 Ribbon 使用时, OpenFeigin 使用的 Client 是 LoadBalancerFeignClient ,由 LoadBalancerFeignClient 创建 FeignLoadBalancer ,并调用 FeignLoadBalancer 的 executeWithLoadBalancer 方法实现负载均衡调用。
executeWithLoadBalancer 方法实际是 FeignLoadBalancer 的父类 AbstractLoadBalancerAwareClient 提供的方法,其源码如下(有删减)。
public abstract class AbstractLoadBalancerAwareClient{
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit({....})
.toBlocking()
.single();
} catch (Exception e) {
}
}
}
复制代码
executeWithLoadBalancer 方法中会创建一个 LoadBalancerCommand ,然后调用 LoadBalancerCommand 的 submit 方法提交请求, submit 方法源码如下(有删减):
public Observable<T> submit(final ServerOperation<T> operation) {
// .......
// 获取重试次数
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer
Observable<T> o = (server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(Server server) {
//.......
// 相同节点的重试
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
// 不同节点的重试
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
return o.onErrorResumeNext(...);
}
复制代码
submit 方法中调用 retryHandler 的 getMaxRetriesOnSameServer 方法和 getMaxRetriesOnNextServer 方法分别获取配置 maxRetrysSame 、 maxRetrysNext 。 maxRetrysSame 表示调用相同节点的重试次数,默认为 0 ; maxRetrysNext 表示调用不同节点的重试次数,默认为 1 。
retryPolicy 方法返回的是一个包装 RetryHandler 重试决策者的 RxJava API 的对象,最终由该 RetryHandler 决定是否需要重试,如抛出的异常是否允许重试。而是否达到最大重试次数则是在 retryPolicy 返回的 Func2 中完成,这是 RxJava 的 API , retryPolicy 方法的源码如下。
private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
return new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer tryCount, Throwable e) {
if (e instanceof AbortExecutionException) {
return false;
}
// 大于最大重试次数
if (tryCount > maxRetrys) {
return false;
}
if (e.getCause() != null && e instanceof RuntimeException) {
e = e.getCause();
}
// 调用RetryHandler判断是否重试
return retryHandler.isRetriableException(e, same);
}
};
}
复制代码
那么这个 retryHandler 是怎么来的呢?
FeignLoadBalancer 的 executeWithLoadBalancer 方法中调用 buildLoadBalancerCommand 方法构造 LoadBalancerCommand 对象时创建的, buildLoadBalancerCommand 方法源码如下。
protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) {
// 获取RetryHandler
RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
// 使用Builder构造者模式构造LoadBalancerCommand
LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder()
.withLoadBalancerContext(this)
// 传入RetryHandler
.withRetryHandler(handler)
.withLoadBalancerURI(request.getUri());
return builder.build();
}
复制代码
从源码中可以看出, Ribbon 使用的 RetryHandler 是 RequestSpecificRetryHandler 。这里还用到了 Builder 构造者模式。
FeignLoadBalancer 的 getRequestSpecificRetryHandler 方法源码如下:
@Override
public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
RibbonRequest request, IClientConfig requestConfig) {
//.....
if (!request.toRequest().httpMethod().name().equals("GET")) {
// 调用this.getRetryHandler()方法获取一次RetryHandler
return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(),
requestConfig);
}
else {
// 调用this.getRetryHandler()方法获取一次RetryHandler
return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
requestConfig);
}
}
复制代码
RequestSpecificRetryHandler 的构造方法可以传入一个 RetryHandler ,这有点像类加载器 ClassLoader 实现的双亲委派模型。比如当 RequestSpecificRetryHandler 配置的重试次数为 0 时,则会获取父 RetryHandler 配置的重试次数。
this.getRetryHandler 方法获取到的又是哪个 RetryHandler ?(源码在 FeignLoadBalancer 的祖父类 LoadBalancerContext 中)
[FeignLoadBalancer的父类的父类LoadBalancerContext]
public class LoadBalancerContext{
protected RetryHandler defaultRetryHandler = new DefaultLoadBalancerRetryHandler();
public final RetryHandler getRetryHandler() {
return defaultRetryHandler;
}
}
[FeignLoadBalancer]
public class FeignLoadBalancer extends
AbstractLoadBalancerAwareClient{
public FeignLoadBalancer(ILoadBalancer lb, IClientConfig clientConfig,
ServerIntrospector serverIntrospector) {
super(lb, clientConfig);
// 使用DefaultLoadBalancerRetryHandler
this.setRetryHandler(RetryHandler.DEFAULT);
this.clientConfig = clientConfig;
// IClientConfig,RibbonClientConfiguration配置类注入的
this.ribbon = RibbonProperties.from(clientConfig);
RibbonProperties ribbon = this.ribbon;
// 从IClientConfig中读取超时参数配置
this.connectTimeout = ribbon.getConnectTimeout();
this.readTimeout = ribbon.getReadTimeout();
this.serverIntrospector = serverIntrospector;
}
}
复制代码
从 FeignLoadBalancer 的构造方法中可以看出, RequestSpecificRetryHandler 的父 RetryHandler 是 DefaultLoadBalancerRetryHandler 。
RetryHandler 接口的定义如下图所示。
RetryHandler 接口方法说明:
isRetriableException方法 :该异常是否可重试; isCircuitTrippingException 方法:是否是 Circuit 熔断类型异常; getMaxRetriesOnSameServer 方法:调用同一节点的最大重试次数; getMaxRetriesOnNextServer 方法:调用不同节点的最大重试次数; Ribbon 的重试策略配置 FeignLoadBalancer 在创建 RequestSpecificRetryHandler 时传入了 IClientConfig ,这个 IClientConfig 是从哪里创建的我们稍会再分析。 RequestSpecificRetryHandler 在构造方法中从这个 IClientConfig 中获取调用同服务节点的最大重试次数和调用不同服务节点的最大重试次数,源码如下。
public class RequestSpecificRetryHandler implements RetryHandler {
public RequestSpecificRetryHandler(boolean okToRetryOnConnectErrors,
boolean okToRetryOnAllErrors, RetryHandler baseRetryHandler, @Nullable IClientConfig requestConfig) {
// .....
// 从 IClientConfig中获取两种最大重试次数的配置
if (requestConfig != null) {
if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetries)) {
// 获取同节点调用最大重试次数
this.retrySameServer = (Integer)requestConfig.get(CommonClientConfigKey.MaxAutoRetries);
}
if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetriesNextServer)) {
// 获取不同节点调用最大重试次数
this.retryNextServer = (Integer)requestConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer);
}
}
}
}
复制代码
requestConfig 是在 LoadBalancerFeignClient 创建 FeignLoadBalancer 时,从 SpringClientFactory 中获取的,也正是 RibbonClientConfiguration 自动配置类注入的。
public FeignLoadBalancer create(String clientName) {
FeignLoadBalancer client = this.cache.get(clientName);
if (client != null) {
return client;
}
// this.factory就是SpringClientFactory
IClientConfig config = this.factory.getClientConfig(clientName);
ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,ServerIntrospector.class);
// 创建FeignLoadBalancer
client = this.loadBalancedRetryFactory != null
? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,this.loadBalancedRetryFactory)
: new FeignLoadBalancer(lb, config, serverIntrospector);
// 缓存FeignLoadBalancer
this.cache.put(clientName, client);
return client;
}
复制代码
IClientConfig 是在 RibbonClientConfiguration 中配置的,其源码如下:
public class RibbonClientConfiguration {
// 默认连接超时
public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
// 默认读超时
public static final int DEFAULT_READ_TIMEOUT = 1000;
// 自动注入,${ribbon.client.name}
@RibbonClientName
private String name;
// 注册IClientConfig实例,使用DefaultClientConfigImpl
@Bean
@ConditionalOnMissingBean
public IClientConfig ribbonClientConfig() {
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.loadProperties(this.name);
// 配置连接超时
config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
// 配置读超时
config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
return config;
}
}
复制代码
那么我们要怎么修改配置呢?
如何在 application 配置文件中配置 Ribbon 的重试次数等参数。
我们可以在 RibbonClientConfiguration 这个配置类的 ribbonClientConfig 方法下断点调试,如下图所示。
从图中可以看出,配置参数 key 的格式为:
<服务提供者的名称(serverId)>:<ribbon>:<参数名>=<value> 复制代码
假设我们针对服务提供者 sck-demo-provider 配置最大同节点重试次数为 10 ,配置最大不同节点重试次数为 12 ,配置连接超时为 15 秒,那么我们需要在 application-[环境].yaml 配置文件中添加如下配置。
sck-demo-provider:
ribbon:
MaxAutoRetries: 10
MaxAutoRetriesNextServer: 12
ConnectTimeout: 15000
复制代码
其中 MaxAutoRetries 和 MaxAutoRetriesNextServer 都能生效,但是 ConnectTimeout 配置是不生效的,原因是在 RibbonClientConfiguration 中创建 DefaultClientConfigImpl 时,先调用 loadProperties 方法(传入的 name 参数就是服务名称)从配置文件获取配置,再调用 set 方法覆盖了三个配置:连接超时配置、读超时配置、是否开启 gzip 压缩配置。所以这种方式配置连接超是不生效的。
代码配置就是我们手动注册 IClientConfig ,而不使用 RibbonClientConfiguration 自动注册的。 RibbonClientConfiguration 自动注册 IClientConfig 的方法上添加了 @ConditionalOnMissingBean 条件注解,正因为如此,我们才可以自己注册 IClientConfig 。
但要注意一点, RibbonClientConfiguration 是在 Ribbon 为每个 Client 创建的 ApplicationContext 中生效的,所以我们需要创建一个配置类( Configuration ),并将其注册到 SpringClientFactory 。这样,在 SpringClientFactory 为 Client 创建 ApplicationContext 时,就会将配置类注册到 ApplicationContext ,向 SpringClientFactory 注册的配置类也就成了创建的 ApplicationContext 的配置类。
@Configuration
public class RibbonConfiguration implements InitializingBean {
@Resource
private SpringClientFactory springClientFactory;
@Override
public void afterPropertiesSet() throws Exception {
List<RibbonClientSpecification> cfgs = new ArrayList<>();
RibbonClientSpecification configuration = new RibbonClientSpecification();
// 针对哪个服务提供者配置
configuration.setName(ProviderConstant.SERVICE_NAME);
// 注册的配置类
configuration.setConfiguration(new Class[]{RibbonClientCfg.class});
cfgs.add(configuration);
springClientFactory.setConfigurations(cfgs);
}
// 指定在RibbonClientConfiguration之后生效
@AutoConfigureBefore(RibbonClientConfiguration.class)
public static class RibbonClientCfg {
@Bean
public IClientConfig ribbonClientConfig() {
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.setClientName("随便填,不影响,用不到");
config.set(CommonClientConfigKey.MaxAutoRetries, 1);
config.setProperty(CommonClientConfigKey.MaxAutoRetriesNextServer, 3);
config.set(CommonClientConfigKey.ConnectTimeout, 15000);
config.set(CommonClientConfigKey.ReadTimeout, 15000);
return config;
}
}
}
复制代码
因为 Ribbon 是在第一次调用接口时才会创建 ApplicationContext ,所以我们在应用程序的 Spring 容器初始化阶段获取 SpringClientFactory 并为其添加自定义配置类能够生效。
RibbonClientCfg 声明在 RibbonClientConfiguration 之前生效,这样 RibbonClientConfiguration 就不会向容器中注册 IClientConfig 了。
RetryHandler ? OpenFeign 整合 Ribbon 使用时,默认使用的是 FeignLoadBalancer 的 getRequestSpecificRetryHandler 方法创建的 RequestSpecificRetryHandler ,笔者也看了一圈源码,实在找不到怎么替换 RetryHandler ,可能 OpenFeign 就是不想给我们替换吧。这种情况我们只能另寻辟径了。
既然使用的是 FeignLoadBalancer 的 getRequestSpecificRetryHandler 方法返回的 RetryHandler ,那么我们是不是可以继承 FeignLoadBalancer 并重写 getRequestSpecificRetryHandler 方法来替换 RetryHandler 呢?答案是可以的。
自定义的 FeignLoadBalancer 代码如下:
/**
* 自定义FeignLoadBalancer,替换默认的RequestSpecificRetryHandler
*/
public static class MyFeignLoadBalancer extends FeignLoadBalancer {
public MyFeignLoadBalancer(ILoadBalancer lb, IClientConfig clientConfig, ServerIntrospector serverIntrospector) {
super(lb, clientConfig, serverIntrospector);
}
@Override
public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonRequest request, IClientConfig requestConfig) {
// 返回自定义的RequestSpecificRetryHandler
// 参数一:是否连接异常重试时重试
// 参数二:是否所有异常都重试
return new RequestSpecificRetryHandler(false, false,
getRetryHandler(), requestConfig) {
/**
* @param e 抛出的异常
* @param sameServer 是否同节点服务的重试
* @return
*/
@Override
public boolean isRetriableException(Throwable e, boolean sameServer) {
if (e instanceof ClientException) {
// 连接异常重试
if (((ClientException) e).getErrorType() == ClientException.ErrorType.CONNECT_EXCEPTION) {
return true;
}
// 连接超时重试
if (((ClientException) e).getErrorType() == ClientException.ErrorType.SOCKET_TIMEOUT_EXCEPTION) {
return true;
}
// 读超时重试,读超时重试只允许不同服务节点的重试
// 所以同节点的重试不支持,读超时了就不要重新请求同一个节点了。
if (((ClientException) e).getErrorType() == ClientException.ErrorType.READ_TIMEOUT_EXCEPTION) {
return !sameServer;
}
// 服务端异常
// 服务端异常切换新节点重试
if (((ClientException) e).getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
return !sameServer;
}
}
// 连接异常时重试
return isConnectionException(e);
}
};
}
}
复制代码
由于 FeignLoadBalancer 是在 OpenFeign 的 LoadBalancerFeignClient 中调用一个 CachingSpringLoadBalancerFactory 创建的,所以我们还需要替换 OpenFeign 的 FeignRibbonClientAutoConfiguration 配置类注册的 CachingSpringLoadBalancerFactory ,并且重写 CachingSpringLoadBalancerFactory 的 create 方法,代码如下。
@Configuration
public class RibbonConfiguration {
/**
* 使用自定义FeignLoadBalancer缓存工厂
*
* @return
*/
@Bean
public CachingSpringLoadBalancerFactory cachingSpringLoadBalancerFactory() {
return new CachingSpringLoadBalancerFactory(springClientFactory) {
private volatile Map<String, FeignLoadBalancer> cache = new ConcurrentReferenceHashMap<>();
@Override
public FeignLoadBalancer create(String clientName) {
FeignLoadBalancer client = this.cache.get(clientName);
if (client != null) {
return client;
}
IClientConfig config = this.factory.getClientConfig(clientName);
ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,
ServerIntrospector.class);
// 使用自定义的FeignLoadBalancer
client = new MyFeignLoadBalancer(lb, config, serverIntrospector);
this.cache.put(clientName, client);
return client;
}
};
}
}
复制代码