在项目中使用 Ribbon 的目的是在客户端(服务消费端)实现负载均衡。
在上一篇《 Spring Cloud OpenFeign 源码分析》中我们分析了为什么使用 OpenFeign 时,不配置 url ,且不导入 Ribbon 的依赖会报错。本篇继续分析 OpenFeign 是如何与 Ribbon 整合、 Ribbon 是如何实现负载均衡的、 Ribbon 是如何从注册中心获取服务的。
OpenFeign 与 Ribbon 整合后的接口调用流程 OpenFeign 与 Ribbon 整合实现负载均衡调用接口的流程如下:
spring-cloud-openfeign-core 模块:
LoadBalancerFeignClient 的 execute 调用远程方法; FeignLoadBalancer 的 executeWithLoadBalancer 方法实现负载均衡调用。 ribbon-core 模块:
LoadBalancerCommand 的 submit 方法实现异步调用同步阻塞等待结果。 LoadBalancerCommand 的 selectServer 方法从多个服务提供者中负载均衡选择一个调用; ribbon-loadbalancer 模块:
ILoadBalancer 的 chooseServer 方法选择服务; IRule 的 choose 方法按某种算法选择一个服务,如随机算法、轮询算法; OpenFeign 是如何与 Ribbon 整合的 sck-demo 项目项目地址: https://github.com/wujiuye/share-projects/tree/master/sck-demo 。
当我们使用 openfeign 时,如果不配置 @FeignClient 的 url 属性,那么就需要导入 spring-cloud-starter-kubernetes-ribbon 的依赖,使用 LoadBalancerFeignClient 调用接口。如果我们不需要使用 Ribbon 来实现负载均衡,那么我们可以直接将 @FeignClient 的 url 属性配置为: http://{serviceId} ,而不用添加 Ribbon 的依赖。
sck-demo 项目中添加 spring-cloud-starter-kubernetes-ribbon 依赖,非 Spring Cloud Kubernetes 项目只需添加 spring-cloud-starter-netflix-ribbon 。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-kubernetes-ribbon</artifactId>
</dependency>
复制代码
当我们在项目中添加 spring-cloud-starter-kubernetes-ribbon 依赖配置时,会将 spring-cloud-starter-netflix-ribbon 和 spring-cloud-kubernetes-ribbon 都会导入到项目中,如下图所示。
当项目中使用 openfeign 并添加 spring-cloud-starter-netflix-ribbon 后, Ribbon 就能通过自动配置与 openfeign 整合,为项目注入 ILoadBalancer 的实现类实例。默认使用的是 ZoneAwareLoadBalancer ,这是 spring-cloud-netflix-ribbon 下的类。
spring-cloud-netflix-ribbon 的 META-INF 目录下的 spring.factories 文件内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=/ org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration 复制代码
可以说 spring-cloud-netflix-ribbon 是 spring-cloud-commons 的 loadbalancer 接口的实现。
RibbonAutoConfiguration 会注入一个 LoadBalancerClient , LoadBalancerClient 是 spring-cloud-commons 定义的负载均衡接口, RibbonLoadBalancerClient 是 Ribbon 实现 spring-cloud-commons 负载均衡接口 LoadBalancerClient 的实现类,是提供给代码中使用 @LoadBalanced 注解使用的。
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
复制代码
在创建 RibbonLoadBalancerClient 时调用 springClientFactory 方法创建 SpringClientFactory :
@Bean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
复制代码
SpringClientFactory 是 NamedContextFactory 的子类,其构建方法调用父类构造方法时传入了一个配置类 RibbonClientConfiguration.class ,这是 RibbonClientConfiguration 配置类生效的原因。
public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {
static final String NAMESPACE = "ribbon";
public SpringClientFactory() {
super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
}
}
复制代码
SpringClientFactory 会为每个服务提供者创建一个 ApplicationContext ,实现 bean 的隔离,解决 bean 名称冲突问题,以及实现使用不同配置。
在创建 ApplicationContext 时会注册 defaultConfigType 到 bean 工厂,该 defaultConfigType 就是构造方法传递进来的 RibbonClientConfiguration.class 。
protected AnnotationConfigApplicationContext createContext(String name) {
// 创建ApplicationContext
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
......
// 注册多个Configuration类
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
......
// 调用ApplicationContext的refresh方法
context.refresh();
return context;
}
复制代码
那 createContext 方法是什么时候被调用的呢?
以 sck-demo 中服务消费者调用服务提供者接口为例:
@Service
public class DemoInvokeServiceImpl implements DemoInvokeService {
@Resource
private DemoService demoService;
@Override
public ListGenericResponse<DemoDto> invokeDemo() {
return demoService.getServices();
}
}
复制代码
DemoService 是被 @FeignClient 注解声明的接口,当我们调用 DemoService 的某个方法时,经过《 Spring Cloud OpenFeign 源码分析》我们知道,最终会调用到 LoadBalancerFeignClient 的 execute 方法时。
public class LoadBalancerFeignClient implements Client {
//............
private SpringClientFactory clientFactory;
// 后面再分析execute方法
@Override
public Response execute(Request request, Request.Options options) throws IOException{
// .....
IClientConfig requestConfig = getClientConfig(options, clientName);
// .....
}
}
复制代码
execute 方法中需要调用 getClientConfig 方法从 SpringClientFactory 获取 IClientConfig 实例,即获取客户端配置。 getClientConfig 方法就是要从服务提供者的 ApplicationContext 工厂中获取实现了 IClientConfig 接口的 bean 。
当首次调用某个服务提供者的接口时,由于并未初始化 AnnotationConfigApplicationContext ,因此会先调用 createContext 方法创建 ApplicationContext ,该方法将 RibbonClientConfiguration 类注册到 ApplicationContext ,最后调用 context.refresh(); 时就会调用到 RibbonClientConfiguration 的被 @Bean 注释的方法。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
// ........
@RibbonClientName
private String name = "client";
@Autowired
private PropertiesFactory propertiesFactory;
// IClientConfig实例,配置client的连接超时、读超时等
@Bean
@ConditionalOnMissingBean
public IClientConfig ribbonClientConfig() {
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.loadProperties(this.name);
config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
return config;
}
// 配置Ribbon使用的负载均衡算法,默认使用ZoneAvoidanceRule
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
// 配置服务更新器,定时从注册中心拉去服务,由ILoadBalancer启动
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
// 配置ribbon的负载均衡器,默认使用ZoneAwareLoadBalancer
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
// ......其它的暂时不去理解
}
复制代码
ILoadBalancer 是 Ribbon 定义的负载均衡接口。 ZoneAwareLoadBalancer 是 DynamicServerListLoadBalancer 的子类, DynamicServerListLoadBalancer 封装了服务更新逻辑。
DynamicServerListLoadBalancer 在构造方法中调用 enableAndInitLearnNewServersFeature 方法开启服务更新器 ServerListUpdater , ServerListUpdater 定时从注册中心拉取可用的服务更新服务列表缓存。
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
protected volatile ServerListUpdater serverListUpdater;
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
public void enableAndInitLearnNewServersFeature(){
serverListUpdater.start(updateAction);
}
// 调用ServerList获取服务
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
// 如果需要过滤
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers);
}
}
复制代码
Ribbon 是如何实现负载均衡的 ServerList 我们后面再讲解,先搞清楚 openfegin 与 ribbon 整合后的整个调用链路。我们继续从 LoadBalancerFeignClient 的 execute 方法继续分析。( LoadBalancerFeignClient 是由 FeignRibbonClientAutoConfiguration 自动配置类配置的,如果忘记的话可以再看下上一篇。)
public class LoadBalancerFeignClient implements Client {
//............
private SpringClientFactory clientFactory;
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
// 拿到的是服务的名称,如:sck-demo-prodiver
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
// delegate是:class Default implements Client {}
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
// 首先获取客户端配置
IClientConfig requestConfig = getClientConfig(options, clientName);
return
// 负载均衡选择一个服务提供者
lbClient(clientName)
// 调用接口获取响应结果
.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
}
复制代码
lbClient 创建一个 FeignLoadBalancer 对象,调用 FeignLoadBalancer 的 executeWithLoadBalancer 方法实现负载均衡调用接口,最终会调用到 FeignLoadBalancer 的 execute 方法。 Ribbon 使用 RxJava 实现异步调用转同步阻塞获取结果。
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
// command也封装了负载均衡的实现逻辑
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
// 调用FeignLoadBalancer的execute方法
return Observable.just(
AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)
);
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
.....
}
}
复制代码
LoadBalancerCommand 的 submit 方法代码比较多,逻辑也比较复杂,因此就不展开说明了。
public Observable<T> submit(final ServerOperation<T> operation) {
// Use the load balancer
Observable<T> o = (server == null ? selectServer() : Observable.just(server))
}
复制代码
selectServer 方法返回一个 Observable<Server> , Observable 是 RxJava 的 API ,我们跳过这部分。
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
// 调用LoadBalancerContext的getServerFromLoadBalancer方法
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
复制代码
selectServer 方法中调用 LoadBalancerContext 的 getServerFromLoadBalancer 方法获取一个服务提供者,此 LoadBalancerContext 实际是 FeignLoadBalancer (在 buildLoadBalancerCommand 方法中可以找到答案)。
getServerFromLoadBalancer 方法部分代码如下:
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
if (lb != null){
Server svc = lb.chooseServer(loadBalancerKey);
return svc;
}
// .....
}
}
复制代码
由于 Ribbon 默认使用的 ILoadBalancer 是 ZoneAwareLoadBalancer ,因此 getLoadBalancer 方法返回的是 ZoneAwareLoadBalancer 。获取到负载均衡器后调用负载均衡器的 chooseServer 选择一个服务提供者。
ZoneAwareLoadBalancer 的 chooseServer 方法:
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
}
复制代码
if 条件成立时,调用的是父类 BaseLoadBalancer 的 chooseServer 方法:
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
protected IRule rule = DEFAULT_RULE;
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
// 调用IRule的choose方法,rule是在创建ZoneAwareLoadBalancer时通过构造方法注入的
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
}
复制代码
IRule 是服务选择器、是负载均衡算法的实现。在 RibbonAutoConfiguration 配置类中注入。
// 配置Ribbon使用的负载均衡算法,默认使用ZoneAvoidanceRule
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
复制代码
在创建 ZoneAwareLoadBalancer 时通过构造方法注入。
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
// ......
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
复制代码
至于 ZoneAvoidanceRule 是怎么从多个服务提供者中选择一个调用的,这就是负载均衡算法的实现,本篇不做分析。
Ribbon 是如何从注册中心获取服务提供者的 前面我们分析到, ZoneAwareLoadBalancer 是 DynamicServerListLoadBalancer 的子类, DynamicServerListLoadBalancer 封装了服务更新逻辑,定时调用 ServerList 的 getUpdatedListOfServers 方法从注册中心拉取服务。
ServerList 是 ribbon-loadbalancer 包下的类,并不是 spring-cloud 的接口,所以与 spring-cloud 的服务发现接口是没有关系的。
public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
/**
* Return updated list of servers. This is called say every 30 secs
* (configurable) by the Loadbalancer's Ping cycle
*
*/
public List<T> getUpdatedListOfServers();
}
复制代码
在分析 RibbonClientConfiguration 时,我们发现有一个方法会注册一个 ServerList<Server> ,但这个方法必不会执行到。
public class RibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ServerList<Server> ribbonServerList(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerList.class, name)) {
return this.propertiesFactory.get(ServerList.class, config, name);
}
ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
serverList.initWithNiwsConfig(config);
return serverList;
}
}
复制代码
因为我们在 sck-demo 项目中使用的是 spring-cloud-starter-kubernetes-ribbon ,所以我们现在来看下 spring-cloud-kubernetes-ribbon 负责做什么。首先从 spring-cloud-starter-kubernetes-ribbon 的 spring.factories 文件中找到自动配置类。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=/ org.springframework.cloud.kubernetes.ribbon.RibbonKubernetesAutoConfiguration 复制代码
自动配置类 RibbonKubernetesAutoConfiguration 的源码如下:
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnBean(SpringClientFactory.class)
@ConditionalOnProperty(value = "spring.cloud.kubernetes.ribbon.enabled",matchIfMissing = true)
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@RibbonClients(defaultConfiguration = KubernetesRibbonClientConfiguration.class)
public class RibbonKubernetesAutoConfiguration {
}
复制代码
SpringClientFactory 我们分析过了, RibbonAutoConfiguration 我们也分析过了,只剩下 KubernetesRibbonClientConfiguration 这个配置类。
KubernetesRibbonClientConfiguration 是使用 @RibbonClients 注解导入的配置类,也就是通过 ImportBeanDefinitionRegistrar 注册的。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(KubernetesRibbonProperties.class)
public class KubernetesRibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(KubernetesClient client, IClientConfig config,
KubernetesRibbonProperties properties) {
KubernetesServerList serverList;
if (properties.getMode() == KubernetesRibbonMode.SERVICE) {
serverList = new KubernetesServicesServerList(client, properties);
}
else {
serverList = new KubernetesEndpointsServerList(client, properties);
}
serverList.initWithNiwsConfig(config);
return serverList;
}
}
复制代码
看到这我们就明白了, spring-cloud-kubernetes-ribbon 负责实现 ribbon 的服务列表接口 ServerList<Server> 。当 spring.cloud.kubernetes.ribbon.mode 配置为 SERVICE 时,使用 KubernetesServicesServerList ,否则使用 KubernetesEndpointsServerList 。默认 mode 是 POD 。
@ConfigurationProperties(prefix = "spring.cloud.kubernetes.ribbon")
public class KubernetesRibbonProperties {
/**
* Ribbon enabled,default true.
*/
private Boolean enabled = true;
/**
* {@link KubernetesRibbonMode} setting ribbon server list with ip of pod or service
* name. default value is POD.
*/
private KubernetesRibbonMode mode = KubernetesRibbonMode.POD;
/**
* cluster domain.
*/
private String clusterDomain = "cluster.local";
}
复制代码
KubernetesRibbonMode 是个枚举类,支持 pod 和 service 。
public enum KubernetesRibbonMode {
/**
* using pod ip and port.
*/
POD,
/**
* using kubernetes service name and port.
*/
SERVICE
}
复制代码
什么意思呢? 当 mode 为 service 时,就是获取服务提供者在 kubernetes 中的 service 的名称和端口,使用这种模式会导致 Ribbon 的负载均衡失效,转而使用 kubernetes 的负载均衡。而当 mode 为 pod 时,就是获取服务提供者的 pod 的 ip 和端口,该 ip 是 kubernetes 集群的内部 ip ,只要服务消费者是部署在同一个 kubernetes 集群内就能通过 pod 的 ip 和服务提供者暴露的端口访问 pod 上的服务提供者。
如果我们不想使用 Ribbon 实现负载均衡,那么我们可以在配置文件中添加如下配置项:
spring:
cloud:
kubernetes:
ribbon:
mode: SERVICE
复制代码
你学会了吗?下一篇我们了解 Spring Cloud Kubernetes 的服务注册。