转载

Spring Cloud Kubernetes Ribbon源码分析

在项目中使用 Ribbon 的目的是在客户端(服务消费端)实现负载均衡。

在上一篇《 Spring Cloud OpenFeign 源码分析》中我们分析了为什么使用 OpenFeign 时,不配置 url ,且不导入 Ribbon 的依赖会报错。本篇继续分析 OpenFeign 是如何与 Ribbon 整合、 Ribbon 是如何实现负载均衡的、 Ribbon 是如何从注册中心获取服务的。

OpenFeignRibbon 整合后的接口调用流程

OpenFeignRibbon 整合实现负载均衡调用接口的流程如下:

spring-cloud-openfeign-core 模块:

  • 1、调用 LoadBalancerFeignClientexecute 调用远程方法;
  • 2、调用 FeignLoadBalancerexecuteWithLoadBalancer 方法实现负载均衡调用。

ribbon-core 模块:

  • 3、调用 LoadBalancerCommandsubmit 方法实现异步调用同步阻塞等待结果。
  • 4、调用 LoadBalancerCommandselectServer 方法从多个服务提供者中负载均衡选择一个调用;

ribbon-loadbalancer 模块:

  • 5、调用 ILoadBalancerchooseServer 方法选择服务;
  • 6、调用 IRulechoose 方法按某种算法选择一个服务,如随机算法、轮询算法;

OpenFeign 是如何与 Ribbon 整合的

sck-demo 项目项目地址: https://github.com/wujiuye/share-projects/tree/master/sck-demo

当我们使用 openfeign 时,如果不配置 @FeignClienturl 属性,那么就需要导入 spring-cloud-starter-kubernetes-ribbon 的依赖,使用 LoadBalancerFeignClient 调用接口。如果我们不需要使用 Ribbon 来实现负载均衡,那么我们可以直接将 @FeignClienturl 属性配置为: http://{serviceId} ,而不用添加 Ribbon 的依赖。

sck-demo 项目中添加 spring-cloud-starter-kubernetes-ribbon 依赖,非 Spring Cloud Kubernetes 项目只需添加 spring-cloud-starter-netflix-ribbon

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-kubernetes-ribbon</artifactId>
</dependency>
复制代码

当我们在项目中添加 spring-cloud-starter-kubernetes-ribbon 依赖配置时,会将 spring-cloud-starter-netflix-ribbonspring-cloud-kubernetes-ribbon 都会导入到项目中,如下图所示。

Spring Cloud Kubernetes Ribbon源码分析

当项目中使用 openfeign 并添加 spring-cloud-starter-netflix-ribbon 后, Ribbon 就能通过自动配置与 openfeign 整合,为项目注入 ILoadBalancer 的实现类实例。默认使用的是 ZoneAwareLoadBalancer ,这是 spring-cloud-netflix-ribbon 下的类。

spring-cloud-netflix-ribbonMETA-INF 目录下的 spring.factories 文件内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=/
org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration
复制代码

可以说 spring-cloud-netflix-ribbonspring-cloud-commonsloadbalancer 接口的实现。

RibbonAutoConfiguration 会注入一个 LoadBalancerClientLoadBalancerClientspring-cloud-commons 定义的负载均衡接口, RibbonLoadBalancerClientRibbon 实现 spring-cloud-commons 负载均衡接口 LoadBalancerClient 的实现类,是提供给代码中使用 @LoadBalanced 注解使用的。

@Bean
	@ConditionalOnMissingBean(LoadBalancerClient.class)
	public LoadBalancerClient loadBalancerClient() {
		return new RibbonLoadBalancerClient(springClientFactory());
	}
复制代码

在创建 RibbonLoadBalancerClient 时调用 springClientFactory 方法创建 SpringClientFactory

@Bean
	public SpringClientFactory springClientFactory() {
		SpringClientFactory factory = new SpringClientFactory();
		factory.setConfigurations(this.configurations);
		return factory;
	}
复制代码

SpringClientFactoryNamedContextFactory 的子类,其构建方法调用父类构造方法时传入了一个配置类 RibbonClientConfiguration.class ,这是 RibbonClientConfiguration 配置类生效的原因。

public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {

	static final String NAMESPACE = "ribbon";

	public SpringClientFactory() {
		super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
	}
}
复制代码

SpringClientFactory 会为每个服务提供者创建一个 ApplicationContext ,实现 bean 的隔离,解决 bean 名称冲突问题,以及实现使用不同配置。

Spring Cloud Kubernetes Ribbon源码分析

在创建 ApplicationContext 时会注册 defaultConfigTypebean 工厂,该 defaultConfigType 就是构造方法传递进来的 RibbonClientConfiguration.class

protected AnnotationConfigApplicationContext createContext(String name) {
        // 创建ApplicationContext
		AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
		......
		// 注册多个Configuration类
		context.register(PropertyPlaceholderAutoConfiguration.class,
		                this.defaultConfigType);
		......
		// 调用ApplicationContext的refresh方法
		context.refresh();
		return context;
	}
复制代码

createContext 方法是什么时候被调用的呢?

sck-demo 中服务消费者调用服务提供者接口为例:

@Service
public class DemoInvokeServiceImpl implements DemoInvokeService {

    @Resource
    private DemoService demoService;

    @Override
    public ListGenericResponse<DemoDto> invokeDemo() {
        return demoService.getServices();
    }

}
复制代码

DemoService 是被 @FeignClient 注解声明的接口,当我们调用 DemoService 的某个方法时,经过《 Spring Cloud OpenFeign 源码分析》我们知道,最终会调用到 LoadBalancerFeignClientexecute 方法时。

public class LoadBalancerFeignClient implements Client {
    //............
	private SpringClientFactory clientFactory;

       // 后面再分析execute方法
	@Override
	public Response execute(Request request, Request.Options options) throws IOException{
	    // .....
	    IClientConfig requestConfig = getClientConfig(options, clientName);
	    // .....
	}
}
复制代码

execute 方法中需要调用 getClientConfig 方法从 SpringClientFactory 获取 IClientConfig 实例,即获取客户端配置。 getClientConfig 方法就是要从服务提供者的 ApplicationContext 工厂中获取实现了 IClientConfig 接口的 bean

当首次调用某个服务提供者的接口时,由于并未初始化 AnnotationConfigApplicationContext ,因此会先调用 createContext 方法创建 ApplicationContext ,该方法将 RibbonClientConfiguration 类注册到 ApplicationContext ,最后调用 context.refresh(); 时就会调用到 RibbonClientConfiguration 的被 @Bean 注释的方法。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
		RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
    // ........
    
	@RibbonClientName
	private String name = "client";

	@Autowired
	private PropertiesFactory propertiesFactory;

    // IClientConfig实例,配置client的连接超时、读超时等
	@Bean
	@ConditionalOnMissingBean
	public IClientConfig ribbonClientConfig() {
		DefaultClientConfigImpl config = new DefaultClientConfigImpl();
		config.loadProperties(this.name);
		config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
		config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
		config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
		return config;
	}

    // 配置Ribbon使用的负载均衡算法,默认使用ZoneAvoidanceRule
	@Bean
	@ConditionalOnMissingBean
	public IRule ribbonRule(IClientConfig config) {
		if (this.propertiesFactory.isSet(IRule.class, name)) {
			return this.propertiesFactory.get(IRule.class, config, name);
		}
		ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
		rule.initWithNiwsConfig(config);
		return rule;
	}

    // 配置服务更新器,定时从注册中心拉去服务,由ILoadBalancer启动
	@Bean
	@ConditionalOnMissingBean
	public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
		return new PollingServerListUpdater(config);
	}

    // 配置ribbon的负载均衡器,默认使用ZoneAwareLoadBalancer
	@Bean
	@ConditionalOnMissingBean
	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
		if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
			return this.propertiesFactory.get(ILoadBalancer.class, config, name);
		}
		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
				serverListFilter, serverListUpdater);
	}
	
	// ......其它的暂时不去理解
}
复制代码

ILoadBalancerRibbon 定义的负载均衡接口。 ZoneAwareLoadBalancerDynamicServerListLoadBalancer 的子类, DynamicServerListLoadBalancer 封装了服务更新逻辑。

Spring Cloud Kubernetes Ribbon源码分析

DynamicServerListLoadBalancer 在构造方法中调用 enableAndInitLearnNewServersFeature 方法开启服务更新器 ServerListUpdaterServerListUpdater 定时从注册中心拉取可用的服务更新服务列表缓存。

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    protected volatile ServerListUpdater serverListUpdater;
    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };
    public void enableAndInitLearnNewServersFeature(){
        serverListUpdater.start(updateAction);
    }
    // 调用ServerList获取服务
    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            // 如果需要过滤
            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
            }
        }
        updateAllServerList(servers);
    }
}
复制代码

Ribbon 是如何实现负载均衡的

ServerList 我们后面再讲解,先搞清楚 openfeginribbon 整合后的整个调用链路。我们继续从 LoadBalancerFeignClientexecute 方法继续分析。( LoadBalancerFeignClient 是由 FeignRibbonClientAutoConfiguration 自动配置类配置的,如果忘记的话可以再看下上一篇。)

public class LoadBalancerFeignClient implements Client {
    //............
	private SpringClientFactory clientFactory;

	@Override
	public Response execute(Request request, Request.Options options) throws IOException {
		try {
			URI asUri = URI.create(request.url());
			// 拿到的是服务的名称,如:sck-demo-prodiver
			String clientName = asUri.getHost();
			URI uriWithoutHost = cleanUrl(request.url(), clientName);
			// delegate是:class Default implements Client {}
			FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
					this.delegate, request, uriWithoutHost);
			// 首先获取客户端配置
			IClientConfig requestConfig = getClientConfig(options, clientName);
			return
			    // 负载均衡选择一个服务提供者
			    lbClient(clientName)
			    // 调用接口获取响应结果
				.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
		}
		catch (ClientException e) {
			IOException io = findIOException(e);
			if (io != null) {
				throw io;
			}
			throw new RuntimeException(e);
		}
	}
}
复制代码

lbClient 创建一个 FeignLoadBalancer 对象,调用 FeignLoadBalancerexecuteWithLoadBalancer 方法实现负载均衡调用接口,最终会调用到 FeignLoadBalancerexecute 方法。 Ribbon 使用 RxJava 实现异步调用转同步阻塞获取结果。

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        // command也封装了负载均衡的实现逻辑
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            // 调用FeignLoadBalancer的execute方法
                            return Observable.just(
                            AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)
                            );
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
           .....
        }
        
    }
复制代码

LoadBalancerCommandsubmit 方法代码比较多,逻辑也比较复杂,因此就不展开说明了。

public Observable<T> submit(final ServerOperation<T> operation) {
    // Use the load balancer
    Observable<T> o = (server == null ? selectServer() : Observable.just(server))
}
复制代码

selectServer 方法返回一个 Observable<Server>ObservableRxJavaAPI ,我们跳过这部分。

private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                    // 调用LoadBalancerContext的getServerFromLoadBalancer方法
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }
复制代码

selectServer 方法中调用 LoadBalancerContextgetServerFromLoadBalancer 方法获取一个服务提供者,此 LoadBalancerContext 实际是 FeignLoadBalancer (在 buildLoadBalancerCommand 方法中可以找到答案)。

Spring Cloud Kubernetes Ribbon源码分析

getServerFromLoadBalancer 方法部分代码如下:

public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
    ILoadBalancer lb = getLoadBalancer();
    if (host == null) {
        if (lb != null){
            Server svc = lb.chooseServer(loadBalancerKey);
            return svc;
        } 
        // .....
    }
}
复制代码

由于 Ribbon 默认使用的 ILoadBalancerZoneAwareLoadBalancer ,因此 getLoadBalancer 方法返回的是 ZoneAwareLoadBalancer 。获取到负载均衡器后调用负载均衡器的 chooseServer 选择一个服务提供者。

Spring Cloud Kubernetes Ribbon源码分析

ZoneAwareLoadBalancerchooseServer 方法:

@Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
    }
复制代码

if 条件成立时,调用的是父类 BaseLoadBalancerchooseServer 方法:

public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {
        
        protected IRule rule = DEFAULT_RULE;
        
    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                // 调用IRule的choose方法,rule是在创建ZoneAwareLoadBalancer时通过构造方法注入的
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }
}
复制代码

IRule 是服务选择器、是负载均衡算法的实现。在 RibbonAutoConfiguration 配置类中注入。

// 配置Ribbon使用的负载均衡算法,默认使用ZoneAvoidanceRule
	@Bean
	@ConditionalOnMissingBean
	public IRule ribbonRule(IClientConfig config) {
		if (this.propertiesFactory.isSet(IRule.class, name)) {
			return this.propertiesFactory.get(IRule.class, config, name);
		}
		ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
		rule.initWithNiwsConfig(config);
		return rule;
	}
复制代码

在创建 ZoneAwareLoadBalancer 时通过构造方法注入。

@Bean
	@ConditionalOnMissingBean
	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
	        // ......
		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
				serverListFilter, serverListUpdater);
	}
复制代码

至于 ZoneAvoidanceRule 是怎么从多个服务提供者中选择一个调用的,这就是负载均衡算法的实现,本篇不做分析。

Ribbon 是如何从注册中心获取服务提供者的

前面我们分析到, ZoneAwareLoadBalancerDynamicServerListLoadBalancer 的子类, DynamicServerListLoadBalancer 封装了服务更新逻辑,定时调用 ServerListgetUpdatedListOfServers 方法从注册中心拉取服务。

ServerListribbon-loadbalancer 包下的类,并不是 spring-cloud 的接口,所以与 spring-cloud 的服务发现接口是没有关系的。

public interface ServerList<T extends Server> {

    public List<T> getInitialListOfServers();
    
    /**
     * Return updated list of servers. This is called say every 30 secs
     * (configurable) by the Loadbalancer's Ping cycle
     * 
     */
    public List<T> getUpdatedListOfServers();   

}
复制代码

在分析 RibbonClientConfiguration 时,我们发现有一个方法会注册一个 ServerList<Server> ,但这个方法必不会执行到。

public class RibbonClientConfiguration {
    
    @Bean
	@ConditionalOnMissingBean
	public ServerList<Server> ribbonServerList(IClientConfig config) {
		if (this.propertiesFactory.isSet(ServerList.class, name)) {
			return this.propertiesFactory.get(ServerList.class, config, name);
		}
		ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
		serverList.initWithNiwsConfig(config);
		return serverList;
	}
	
}
复制代码

因为我们在 sck-demo 项目中使用的是 spring-cloud-starter-kubernetes-ribbon ,所以我们现在来看下 spring-cloud-kubernetes-ribbon 负责做什么。首先从 spring-cloud-starter-kubernetes-ribbonspring.factories 文件中找到自动配置类。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=/
org.springframework.cloud.kubernetes.ribbon.RibbonKubernetesAutoConfiguration
复制代码

自动配置类 RibbonKubernetesAutoConfiguration 的源码如下:

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnBean(SpringClientFactory.class)
@ConditionalOnProperty(value = "spring.cloud.kubernetes.ribbon.enabled",matchIfMissing = true)
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@RibbonClients(defaultConfiguration = KubernetesRibbonClientConfiguration.class)
public class RibbonKubernetesAutoConfiguration {
}
复制代码

SpringClientFactory 我们分析过了, RibbonAutoConfiguration 我们也分析过了,只剩下 KubernetesRibbonClientConfiguration 这个配置类。

KubernetesRibbonClientConfiguration 是使用 @RibbonClients 注解导入的配置类,也就是通过 ImportBeanDefinitionRegistrar 注册的。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(KubernetesRibbonProperties.class)
public class KubernetesRibbonClientConfiguration {

	@Bean
	@ConditionalOnMissingBean
	public ServerList<?> ribbonServerList(KubernetesClient client, IClientConfig config,
			KubernetesRibbonProperties properties) {
		KubernetesServerList serverList;
		if (properties.getMode() == KubernetesRibbonMode.SERVICE) {
			serverList = new KubernetesServicesServerList(client, properties);
		}
		else {
			serverList = new KubernetesEndpointsServerList(client, properties);
		}
		serverList.initWithNiwsConfig(config);
		return serverList;
	}

}
复制代码

看到这我们就明白了, spring-cloud-kubernetes-ribbon 负责实现 ribbon 的服务列表接口 ServerList<Server> 。当 spring.cloud.kubernetes.ribbon.mode 配置为 SERVICE 时,使用 KubernetesServicesServerList ,否则使用 KubernetesEndpointsServerList 。默认 modePOD

@ConfigurationProperties(prefix = "spring.cloud.kubernetes.ribbon")
public class KubernetesRibbonProperties {
	/**
	 * Ribbon enabled,default true.
	 */
	private Boolean enabled = true;
	/**
	 * {@link KubernetesRibbonMode} setting ribbon server list with ip of pod or service
	 * name. default value is POD.
	 */
	private KubernetesRibbonMode mode = KubernetesRibbonMode.POD;
	/**
	 * cluster domain.
	 */
	private String clusterDomain = "cluster.local";
}
复制代码

KubernetesRibbonMode 是个枚举类,支持 podservice

public enum KubernetesRibbonMode {

	/**
	 * using pod ip and port.
	 */
	POD,
	/**
	 * using kubernetes service name and port.
	 */
	SERVICE

}
复制代码

什么意思呢? 当 modeservice 时,就是获取服务提供者在 kubernetes 中的 service 的名称和端口,使用这种模式会导致 Ribbon 的负载均衡失效,转而使用 kubernetes 的负载均衡。而当 modepod 时,就是获取服务提供者的 podip 和端口,该 ipkubernetes 集群的内部 ip ,只要服务消费者是部署在同一个 kubernetes 集群内就能通过 podip 和服务提供者暴露的端口访问 pod 上的服务提供者。

Spring Cloud Kubernetes Ribbon源码分析

如果我们不想使用 Ribbon 实现负载均衡,那么我们可以在配置文件中添加如下配置项:

spring:
  cloud:
    kubernetes:
      ribbon:
        mode: SERVICE
复制代码

你学会了吗?下一篇我们了解 Spring Cloud Kubernetes 的服务注册。

原文  https://juejin.im/post/5ef70b026fb9a07e7f6bc5bb
正文到此结束
Loading...