转载

EurekaClient 源码浅析

背景: 最近在研究springCloud,对服务注册中心也非常好奇,然后就看了一下源码,而且以后面试也需要了解一下,因此记录一下

注意:EurekaClient的内容很多,我只分析主干部分

前提: 这里的springboot版本为2.1.5.RELEASE,spring-cloud版本为Greenwich.SR1。

eureka架构图

EurekaClient 源码浅析

从这里我们可以看到几个重要的概念:

Eureka可以向EurekaServer Register、Renew、Cancel、Get Registy,下面就分别这几个概念讨论吧。

根据springboot的自动配置特性,我们找到org.springframework.cloud.spring-cloud-netflix-eureka-client.2.1.1.RELEASE.spring-cloud-netflix-eureka-client-2.1.1.RELEASE.jar下面的META-INF/spring.factories。

EurekaClient 源码浅析

最重要的就是 EurekaClientAutoConfiguration 这个配置类了

打开配置类EurekaClientAutoConfiguration截取一下重要的配置:

EurekaClientConfigBean

@Bean  @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)  public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {     EurekaClientConfigBean client = new EurekaClientConfigBean();      if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {             // We don't register during bootstrap by default, but there will be another      // chance later.              client.setRegisterWithEureka(false);      }       return client;}
复制代码

这个bean是eureka的配置信息,ymal配置的前缀是 eureka.client 开始的。这个配置类中有大量的eureka client默认配置。

比如:

1.默认的eureka配置:

/** * Default Eureka URL. */public static final String DEFAULT_URL = "http://localhost:8761" + DEFAULT_PREFIX + "/";复制代码

也就是说,如果没有配置eureka server的url,它会默认注册到本地8761地址中。

2.默认从eureka server中拉取配置时间间隔

/** * Indicates how often(in seconds) to fetch the registry information from the eureka  * server. */private int registryFetchIntervalSeconds = 30;复制代码

也就是说默认每个30秒从eureka server中拉取一次所有服务的配置信息。

3.复制实例变化信息到eureka服务器所需要的时间间隔

/**  * Indicates how often(in seconds) to replicate instance changes to be replicated to * the eureka server. */private int instanceInfoReplicationIntervalSeconds = 30;复制代码

还有很多配置可以自己去了解一下。

EurekaInstanceConfigBean

@Bean@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,      ManagementMetadataProvider managementMetadataProvider) {   String hostname = getProperty("eureka.instance.hostname");   boolean preferIpAddress = Boolean         .parseBoolean(getProperty("eureka.instance.prefer-ip-address"));   String ipAddress = getProperty("eureka.instance.ip-address");   boolean isSecurePortEnabled = Boolean         .parseBoolean(getProperty("eureka.instance.secure-port-enabled"));   String serverContextPath = env.getProperty("server.servlet.context-path", "/");   int serverPort = Integer         .valueOf(env.getProperty("server.port", env.getProperty("port", "8080")));   Integer managementPort = env.getProperty("management.server.port", Integer.class); // nullable.     String managementContextPath = env         .getProperty("management.server.servlet.context-path"); // nullable.                                                   // should   // be wrapped into   // optional   Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port",         Integer.class); // nullable   EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);   instance.setNonSecurePort(serverPort);   instance.setInstanceId(getDefaultInstanceId(env));   instance.setPreferIpAddress(preferIpAddress);   instance.setSecurePortEnabled(isSecurePortEnabled);   if (StringUtils.hasText(ipAddress)) {      instance.setIpAddress(ipAddress);   }   if (isSecurePortEnabled) {      instance.setSecurePort(serverPort);   }   if (StringUtils.hasText(hostname)) {      instance.setHostname(hostname);   }   String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");   String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");   if (StringUtils.hasText(statusPageUrlPath)) {      instance.setStatusPageUrlPath(statusPageUrlPath);   }   if (StringUtils.hasText(healthCheckUrlPath)) {      instance.setHealthCheckUrlPath(healthCheckUrlPath);   }   ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort,         serverContextPath, managementContextPath, managementPort);   if (metadata != null) {      instance.setStatusPageUrl(metadata.getStatusPageUrl());      instance.setHealthCheckUrl(metadata.getHealthCheckUrl());      if (instance.isSecurePortEnabled()) {         instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());      }      Map<String, String> metadataMap = instance.getMetadataMap();      metadataMap.computeIfAbsent("management.port",            k -> String.valueOf(metadata.getManagementPort()));   }   else {      // without the metadata the status and health check URLs will not be set      // and the status page and health check url paths will not include the      // context path so set them here      if (StringUtils.hasText(managementContextPath)) {         instance.setHealthCheckUrlPath(               managementContextPath + instance.getHealthCheckUrlPath());         instance.setStatusPageUrlPath(               managementContextPath + instance.getStatusPageUrlPath());      }   }   setupJmxPort(instance, jmxPort);   return instance;}复制代码

这个主要是eureka client 这个几点自身的一些配置信息。

接下来是两个最主要的bean : DiscoveryClient 和 EurekaServiceRegistry

@Beanpublic DiscoveryClient discoveryClient(EurekaClient client,      EurekaClientConfig clientConfig) {   return new EurekaDiscoveryClient(client, clientConfig);}@Beanpublic EurekaServiceRegistry eurekaServiceRegistry() {   return new EurekaServiceRegistry();}复制代码

EurekaServiceRegistry 用于服务注册,DiscoveryClient 用于服务发现

先看 EurekaServiceRegistry

EurekaClient 源码浅析

我们看到这个类里面有几个方法:

register(EurekaRegistration reg) : 这个方法就是用于eureka client 注册到eureka server的,不过具体的注册逻辑不在这里。Eureka client的注册动作是在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中执行的,其实最终的注册是发生在 InstanceInfoReplicator类里面的。

maybeInitializeClient(EurekaRegistration reg): 初始化 eureka client,eureka client 等会再说。

deregister(EurekaRegistration reg):注销下线操作,同样具体下线操作不在这里,这里只是将本节点实例状态设置为下线而已。

setStatus(EurekaRegistration registration, String status):设置本节点实例的状态。

getStatus(EurekaRegistration registration):获取本节点实例状态。

最后看一下DiscoveryClient,发现有两个DiscoveryClient,一个是类,一个是接口:

EurekaClient 源码浅析

类是netflix提供的,接口时springcloud提供的。回到EurekaClientAutoConfiguration,它在声明DiscoveryClient这个bean的时候用的是EurekaDiscoveryClient

@Beanpublic DiscoveryClient discoveryClient(EurekaClient client,      EurekaClientConfig clientConfig) {   return new EurekaDiscoveryClient(client, clientConfig);}复制代码

找到 EurekaDiscoveryClient

EurekaClient 源码浅析

发现EurekaDiscoveryClient里面包含netflix 里的EurekaClient接口,而这个接口的默认实现就是DiscoveryClient

EurekaClient 源码浅析

也就是说EurekaDiscoveryClient和DiscoveryClient是组合模式,最后调用的还是netflix的类DiscoveryClient。这个类才是服务注册发现的关键,接下来分析这个类。

DiscoveryClient

先看看源码解释:

EurekaClient 源码浅析

根据这么一段话,也就是说,这个类是用于和Eureka Server来交互,这个类的主要职责是:

a) <em>Registering</em> the instance with <tt>Eureka Server</tt> 服务注册

b) <em>Renewal</em>of the lease with <tt>Eureka Server</tt> 服务续约

c) <em>Cancellation</em> of the lease from <tt>Eureka Server</tt> during shutdown  服务下线

d) <em>Querying</em> the list of services/instances registered with <tt>Eureka Server</tt> 获取注册列表信息

DiscoveryClient构造器中的重要的方法 initScheduledTasks();

Fetch Registers : 获取注册列表信息

if(clientConfig.shouldFetchRegistry()){    //registry cache refresh timer    int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();    int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();    scheduler.schedule(new TimedSupervisorTask("cacheRefresh"                    , scheduler, cacheRefreshExecutor,                    registryFetchIntervalSeconds,                    TimeUnit.SECONDS, expBackOffBound,                    new CacheRefreshThread()),            registryFetchIntervalSeconds,            TimeUnit.SECONDS);}复制代码

从上面的代码可以看出,eureka client 开启一个scheduler,每个一定的时间(默认是30秒,可以通过registryFetchIntervalSeconds配置)从eureka server 拉取eureka client的配置信息。进一步看到CacheRefreshThread这个runnable。

class CacheRefreshThread implements Runnable { 
public void run() { 
  refreshRegistry();//主要定时执行该方法 
  } 
}复制代码

进一步分析 refreshRegistry

void refreshRegistry() {    try {              ...                // 定时获取注册信息              boolean success = fetchRegistry(remoteRegionsModified);        if (success) {            registrySize = localRegionApps.get().size();            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();        }               ...    } catch (Throwable e) {        logger.error("Cannot fetch registry from server", e);    }}复制代码

继续分析 fetchRegistry(boolean forceFullRegistryFetch)方法,里面最主要的是getAndStoreFullRegistry();来发送HTTP请求到注册中心来获取注册信息,并缓存到本地

private void getAndStoreFullRegistry() throws Throwable {       ...    Applications apps = null;    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {        apps = httpResponse.getEntity();    }    logger.info("The response status is {}", httpResponse.getStatusCode());    if (apps == null) {        logger.error("The application is null for some reason. Not storing this information");    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {             // 把拉取的信息缓存在 localRegionApps 中             localRegionApps.set(this.filterAndShuffle(apps));        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());    } else {        logger.warn("Not updating applications as another thread is updating it already");    }}复制代码

而那个localRegionApps 就是用于缓存拉取信息的

private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();复制代码

Renew 服务的续期

回到initScheduledTasks()方法,

if(clientConfig.shouldRegisterWithEureka()){    ...    // Heartbeat timer    scheduler.schedule(new TimedSupervisorTask("heartbeat",            scheduler,             heartbeatExecutor,            renewalIntervalInSecs,             TimeUnit.SECONDS,             expBackOffBound,            new HeartbeatThread()),            renewalIntervalInSecs,             TimeUnit.SECONDS);          ...}else{    logger.info("Not registering with Eureka server per configuration");}复制代码

进一步看 HeartbeatThread 这个Runnable

/** * The heartbeat task that renews the lease in the given intervals. */private class HeartbeatThread implements Runnable {    public void run() {        if (renew()) {            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();        }    }}复制代码
/** * Renew with the eureka service by making the appropriate REST call */boolean renew() {    EurekaHttpResponse<InstanceInfo> httpResponse;    try {        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {            REREGISTER_COUNTER.increment();            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());            long timestamp = instanceInfo.setIsDirtyWithTime();
            // 注册            boolean success = register();            if (success) {                instanceInfo.unsetIsDirty(timestamp);            }            return success;        }        return httpResponse.getStatusCode() == Status.OK.getStatusCode();    } catch (Throwable e) {        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);        return false;    }}复制代码

以上代码是每隔一定时间去发送心跳(默认30秒),如果返回的结果是404,就把client注册到Eureka Server上。

Register 服务注册

/** * Register with the eureka service by making the appropriate REST call. */boolean register() throws Throwable {    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);    EurekaHttpResponse<Void> httpResponse;    try {        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);    } catch (Exception e) {        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);        throw e;    }    if (logger.isInfoEnabled()) {        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());    }    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();}复制代码

这个方法实在DiscoveryClient类中的。

在DiscoveryClient.initScheduledTasks()中,有一段这么代码

// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(        this,        instanceInfo,        clientConfig.getInstanceInfoReplicationIntervalSeconds(),        2); // burstSize...instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());复制代码

在InstanceInfoReplicator 构造方法中会创建一个scheduler。注册方法的调用是通过InstanceInfoReplicator.instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds())发送注册请求到注册中心

public void start(int initialDelayMs) {    if (started.compareAndSet(false, true)) {        instanceInfo.setIsDirty();  // for initial register        Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);        scheduledPeriodicRef.set(next);    }}复制代码
public void run() {    try {        discoveryClient.refreshInstanceInfo();        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();        if (dirtyTimestamp != null) {            // 注册            discoveryClient.register();            instanceInfo.unsetIsDirty(dirtyTimestamp);        }    } catch (Throwable t) {        logger.warn("There was a problem with the instance info replicator", t);    } finally {        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);        scheduledPeriodicRef.set(next);    }}复制代码

Cancel: 服务下线:

这个方法实在DiscoveryClient类中的。

/** * unregister w/ the eureka service. */void unregister() {    // It can be null if shouldRegisterWithEureka == false    if(eurekaTransport != null && eurekaTransport.registrationClient != null) {        try {            logger.info("Unregistering ...");            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());            logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());        } catch (Exception e) {            logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);        }    }}复制代码

这个方法的调用是通过 shutdown() 方法的

/** * Shuts down Eureka Client. Also sends a deregistration request to the * eureka server. */@PreDestroy@Overridepublic synchronized void shutdown() {    if (isShutdown.compareAndSet(false, true)) {        logger.info("Shutting down DiscoveryClient ...");        if (statusChangeListener != null && applicationInfoManager != null) {            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());        }        cancelScheduledTasks();        // If APPINFO was registered        if (applicationInfoManager != null                && clientConfig.shouldRegisterWithEureka()                && clientConfig.shouldUnregisterOnShutdown()) {            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);            unregister();        }        if (eurekaTransport != null) {            eurekaTransport.shutdown();        }        heartbeatStalenessMonitor.shutdown();        registryStalenessMonitor.shutdown();        logger.info("Completed shut down of DiscoveryClient");    }}复制代码

这个方法上有个注解@PreDestroy,表示在对象销毁之前触发的。可以看到这个方法中关闭了定时任务,通知eureka server 下线。

以上就是我对Eureka client的理解,

总结:

1.springcloud 整合了netflix,通过组合模式, EurekaDiscoveryClient 包含 EurekaClient, 核心功能都是由 DiscoveryClient 来完成的。

2.Eureka client 主要有 Register、Renew、Cancel、Get Registy功能。

3.Eureka client和Server的交互通过大量的定时任务触发,交互通过http协议,设计核心类为 EurekaHttpClient。

4.在DiscoveryClient被创建的时候,在其构造方法中,启动了三个线程池,然后分别启动了三个定时器任务:注册当前服务到注册中心;持续发送心跳进行续约任务;定时刷新注册中心注册细信息到本地,所以可以说,在项目启动的时候这些任务就被执行了。

原文  https://juejin.im/post/5d5e6d22e51d4561eb0b269d
正文到此结束
Loading...