转载

Eureka 源码解析 —— 应用实例注册发现 (二)之续租

Eureka 源码解析 —— 应用实例注册发现 (二)之续租

������关注 微信公众号:【芋道源码】 有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢
  4. 新的 源码解析文章 实时 收到通知。 每周更新一篇左右
  5. 认真的 源码交流微信群。

1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 续租应用实例的过程

FROM 《深度剖析服务发现组件Netflix Eureka》 二次编辑

Eureka 源码解析 —— 应用实例注册发现 (二)之续租
  • 蓝框 部分,为本文重点。
  • 蓝框 部分,Eureka-Server 集群间复制注册的应用实例信息,不在本文内容范畴。

推荐 Spring Cloud 书籍:

  • 请支持正版。下载盗版, 等于主动编写低级 BUG
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》

2. Eureka-Client 发起续租

Eureka-Client 向 Eureka-Server 发起注册应用实例成功后获得租约 ( Lease )。

Eureka-Client 固定间隔 向 Eureka-Server 发起 续租 ( renew ),避免租约过期。

默认情况下,租约有效期为 90 秒,续租频率为 30 秒。两者比例为 1 : 3 ,保证在网络异常等情况下,有三次重试的机会。

2.1 初始化定时任务

Eureka-Client 在初始化过程中,创建 心跳 线程, 固定间隔 向 Eureka-Server 发起 续租 ( renew )。实现代码如下:

// DiscoveryClient.java

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
 Provider<BackupRegistry> backupRegistryProvider) {
 // ... 省略无关代码
 scheduler = Executors.newScheduledThreadPool(2,
 new ThreadFactoryBuilder()
 .setNameFormat("DiscoveryClient-%d")
 .setDaemon(true)
 .build());

 heartbeatExecutor = new ThreadPoolExecutor(
 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
 new SynchronousQueue<Runnable>(),
 new ThreadFactoryBuilder()
 .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
 .setDaemon(true)
 .build()
 ); // use direct handoff
 
 // ... 省略无关代码
 
 // 【3.2.14】初始化定时任务
 initScheduledTasks();
}

private void initScheduledTasks(){

 // 向 Eureka-Server 心跳(续租)执行器
 if (clientConfig.shouldRegisterWithEureka()) {
 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率
 int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //
 logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

 // Heartbeat timer
 scheduler.schedule(
 new TimedSupervisorTask(
 "heartbeat",
 scheduler,
 heartbeatExecutor,
 renewalIntervalInSecs,
 TimeUnit.SECONDS,
 expBackOffBound,
 new HeartbeatThread()
 ),
 renewalIntervalInSecs, TimeUnit.SECONDS);
 
 // ... 省略无关代码
 }
 // ... 省略无关代码
}
  • scheduler ,定时任务服务,用于定时触发心跳( 续租 )。细心如你,会发现任务提交的方式是 ScheduledExecutorService#schedule(...) 方法, 只延迟执行一次心跳,说好的固定频率执行心跳呢 !!!答案在「2.3 TimedSupervisorTask」揭晓。
  • heartbeatExecutor ,心跳任务执行线程池。为什么有 scheduler 的情况下,还有 heartbeatExecutor ???答案也在「2.3 TimedSupervisorTask」揭晓。
  • HeartbeatThread,心跳线程,在「2.2 TimedSupervisorTask」详细解析。

2.2 HeartbeatThread

com.netflix.discovery.DiscoveryClient.HeartbeatThread ,心跳线程, 实现 执行 Eureka-Client 向 Eureka-Server 发起 续租 ( renew )请求。实现代码如下:

// DiscoveryClient.java
/**
* 最后成功向 Eureka-Server 心跳时间戳
*/
private volatile long lastSuccessfulHeartbeatTimestamp = -1;

private class HeartbeatThread implements Runnable{

 public void run(){
 if (renew()) {
 lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
 }
 }
}
  • 调用 #renew 方法,执行续租逻辑。实现代码如下:

    // DiscoveryClient.java
    boolean renew(){
     EurekaHttpResponse<InstanceInfo> httpResponse;
     try {
     httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
     logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
     if (httpResponse.getStatusCode() == 404) {
     REREGISTER_COUNTER.increment();
     logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
     long timestamp = instanceInfo.setIsDirtyWithTime();
     // 发起注册
     boolean success = register();
     if (success) {
     instanceInfo.unsetIsDirty(timestamp);
     }
     return success;
     }
     return httpResponse.getStatusCode() == 200;
     } catch (Throwable e) {
     logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
     return false;
     }
    }
    
    • 调用 AbstractJerseyEurekaHttpClient#sendHeartBeat(...) 方法,发起 续租请求 ,实现代码如下:

      // AbstractJerseyEurekaHttpClient.java
      @Override
      public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus){
       String urlPath = "apps/" + appName + '/' + id;
       ClientResponse response = null;
       try {
       WebResource webResource = jerseyClient.resource(serviceUrl)
       .path(urlPath)
       .queryParam("status", info.getStatus().toString())
       .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
       if (overriddenStatus != null) {
       webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
       }
       Builder requestBuilder = webResource.getRequestBuilder();
       addExtraHeaders(requestBuilder);
       response = requestBuilder.put(ClientResponse.class);
       EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
       if (response.hasEntity()) {
       eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
       }
       return eurekaResponseBuilder.build();
       } finally {
       if (logger.isDebugEnabled()) {
       logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
       }
       if (response != null) {
       response.close();
       }
       }
      }
      
      • PUT 请求 Eureka-Server 的 apps/${APP_NAME}/${INSTANCE_INFO_ID} 接口,参数为 statuslastDirtyTimestampoverriddenstatus ,实现续租。
    • 调用 AbstractJerseyEurekaHttpClient#register(...) 方法,当 Eureka-Server 不存在租约 时,重新发起注册,在 《Eureka 源码解析 —— 应用实例注册发现 (一)之注册》 有详细解析。

2.3 TimedSupervisorTask

com.netflix.discovery.TimedSupervisorTask ,监管 定时任务 的任务。

A supervisor task that schedules subtasks while enforce a timeout.

创建 TimedSupervisorTask 代码如下:

public class TimedSupervisorTask extends TimerTask{

 private final Counter timeoutCounter;
 private final Counter rejectedCounter;
 private final Counter throwableCounter;
 private final LongGauge threadPoolLevelGauge;

 /**
* 定时任务服务
*/
 private final ScheduledExecutorService scheduler;
 /**
* 执行子任务线程池
*/
 private final ThreadPoolExecutor executor;
 /**
* 子任务执行超时时间
*/
 private final long timeoutMillis;
 /**
* 子任务
*/
 private final Runnable task;
 /**
* 当前任子务执行频率
*/
 private final AtomicLong delay;
 /**
* 最大子任务执行频率
*
* 子任务执行超时情况下使用
*/
 private final long maxDelay;

 public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task){
 this.scheduler = scheduler;
 this.executor = executor;
 this.timeoutMillis = timeUnit.toMillis(timeout);
 this.task = task;
 this.delay = new AtomicLong(timeoutMillis);
 this.maxDelay = timeoutMillis * expBackOffBound;

 // Initialize the counters and register.
 timeoutCounter = Monitors.newCounter("timeouts");
 rejectedCounter = Monitors.newCounter("rejectedExecutions");
 throwableCounter = Monitors.newCounter("throwables");
 threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
 Monitors.registerObject(name, this);
 }

}
  • scheduler ,定时任务服务,用于定时【 发起 】子任务。
  • executor ,执行子任务线程池,用于【 提交 】子任务执行。
  • task ,子任务。
  • timeoutMillis ,子任务执行超时时间,单位:毫秒。
  • delay ,当前子任务执行频率,单位:毫秒。值等于 timeout 参数。
  • maxDelay最大 子任务执行频率,单位:毫秒。值等于 timeout * expBackOffBound 参数。

Eureka 源码解析 —— 应用实例注册发现 (二)之续租

  • scheduler 初始化延迟执行 TimedSupervisorTask 。
  • TimedSupervisorTask 执行时,提交 taskexecutor 执行任务。
    • task 执行正常,TimedSupervisorTask 再次 提交 自己scheduler 延迟 timeoutMillis 执行。
    • task 执行超时,重新计算延迟时间( 不允许超过 maxDelay ), 再次 提交 自己scheduler 延迟执行。

实现代码如下:

// TimedSupervisorTask.java
 1: @Override
 2: public void run(){
 3: Future<?> future = null;
 4: try {
 5: // 提交 任务
 6: future = executor.submit(task);
 7: //
 8: threadPoolLevelGauge.set((long) executor.getActiveCount());
 9: // 等待任务 执行完成 或 超时
 10: future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
 11: // 设置 下一次任务执行频率
 12: delay.set(timeoutMillis);
 13: //
 14: threadPoolLevelGauge.set((long) executor.getActiveCount());
 15: } catch (TimeoutException e) {
 16: logger.error("task supervisor timed out", e);
 17: timeoutCounter.increment(); //
 18: 
 19: // 设置 下一次任务执行频率
 20: long currentDelay = delay.get();
 21: long newDelay = Math.min(maxDelay, currentDelay * 2);
 22: delay.compareAndSet(currentDelay, newDelay);
 23: 
 24: } catch (RejectedExecutionException e) {
 25: if (executor.isShutdown() || scheduler.isShutdown()) {
 26: logger.warn("task supervisor shutting down, reject the task", e);
 27: } else {
 28: logger.error("task supervisor rejected the task", e);
 29: }
 30: 
 31: rejectedCounter.increment(); //
 32: } catch (Throwable e) {
 33: if (executor.isShutdown() || scheduler.isShutdown()) {
 34: logger.warn("task supervisor shutting down, can't accept the task");
 35: } else {
 36: logger.error("task supervisor threw an exception", e);
 37: }
 38: 
 39: throwableCounter.increment(); //
 40: } finally {
 41: // 取消 未完成的任务
 42: if (future != null) {
 43: future.cancel(true);
 44: }
 45: 
 46: // 调度 下次任务
 47: if (!scheduler.isShutdown()) {
 48: scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
 49: }
 50: }
 51: }
  • 第 5 至 6 行 :提交子任务 task 到执行子任务线程池 executor
  • 第 9 至 10 行 :等待子任务 task 执行完成或执行超时。
  • 第 11 至 12 行 :子任务 task 执行完成,设置下一次执行延迟 delay
  • 第 19 至 22 行 :子任务 task 执行超时,重新计算下一次执行延迟 delay 。计算公式为 Math.min(maxDelay, currentDelay * 2) 。如果多次超时,超时时间不断乘以 2 ,不允许超过最大延迟时间( maxDelay )。
  • 第 41 至 44 行 : 强制 取消未完成的子任务。
  • 第 46 至 49 行 :调度下一次 TimedSupervisorTask 。

3. Eureka-Server 接收续租

3.1 接收续租请求

com.netflix.eureka.resources.InstanceResource ,处理 单个 应用实例信息的请求操作的 Resource ( Controller )。

续租应用实例信息的请求,映射 InstanceResource#renewLease() 方法,实现代码如下:

 1: @PUT
 2: public Response renewLease(
3: @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
4: @QueryParam("overriddenstatus") String overriddenStatus,
5: @QueryParam("status") String status,
6: @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp){
 7: boolean isFromReplicaNode = "true".equals(isReplication);
 8: // 续租
 9: boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
10: 
11: // 续租失败
12: // Not found in the registry, immediately ask for a register
13: if (!isSuccess) {
14: logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
15: return Response.status(Status.NOT_FOUND).build();
16: }
17: 
18: // 比较 InstanceInfo 的 lastDirtyTimestamp 属性
19: // Check if we need to sync based on dirty time stamp, the client
20: // instance might have changed some value
21: Response response = null;
22: if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
23: response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
24: // Store the overridden status since the validation found out the node that replicates wins
25: if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
26: && (overriddenStatus != null)
27: && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
28: && isFromReplicaNode) {
29: registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
30: }
31: } else { // 成功
32: response = Response.ok().build();
33: }
34: logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
35: return response;
36: }
  • 第 8 至 9 行 :调用 PeerAwareInstanceRegistryImpl#renew(...) 方法,续租。实现代码如下:

    // PeerAwareInstanceRegistryImpl.java
    public boolean renew(final String appName, final String id, final boolean isReplication){
     if (super.renew(appName, id, isReplication)) { // 续租
     // Eureka-Server 复制
     replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
     return true;
     }
     return false;
    }
    
    • 调用父类 AbstractInstanceRegistry#renew(...) 方法,注册应用实例信息。
  • 第 11 至 16 行 :续租失败,返回 404 响应。当 Eureka-Client 收到 404 响应后,会重新发起 InstanceInfo 的注册。

  • 第 18 至 30 行 :比较请求的 lastDirtyTimestamp 和 Server 的 InstanceInfo 的 lastDirtyTimestamp 属性差异,需要配置 eureka.syncWhenTimestampDiffers = true ( 默认开启 )。

    • 第 23 行 :调用 #validateDirtyTimestamp(...) 方法,比较 lastDirtyTimestamp 的差异。实现代码如下:

      // InstanceResource.java
       1: private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication){
       2: // 获取 InstanceInfo
       3: InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
       4: if (appInfo != null) {
       5: if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
       6: Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
       7: // 请求 的 较大
       8: if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
       9: logger.debug("Time to sync, since the last dirty timestamp differs -"
       10: + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args);
       11: return Response.status(Status.NOT_FOUND).build();
       12: // Server 的 较大
       13: } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
       14: // In the case of replication, send the current instance info in the registry for the
       15: // replicating node to sync itself with this one.
       16: if (isReplication) {
       17: logger.debug(
       18: "Time to sync, since the last dirty timestamp differs -"
       19: + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
       20: args);
       21: return Response.status(Status.CONFLICT).entity(appInfo).build();
       22: } else {
       23: return Response.ok().build();
       24: }
       25: }
       26: }
       27: 
       28: }
       29: return Response.ok().build();
       30: }
      
      • 第 7 至 11 行 :请求的 lastDirtyTimestamp 较大, 意味着请求方( 可能是 Eureka-Client ,也可能是 Eureka-Server 集群内的其他 Server )存在 InstanceInfo 和 Eureka-Server 的 InstanceInfo 的数据不一致,返回 404 响应。请求方收到 404 响应后重新发起注册
      • 第 16 至 21 行 :TODO[0021]:集群同步
      • 第 22 至 24 行 :Server 的 lastDirtyTimestamp 较大,并且请求方为 Eureka-Client,续租成功,返回 200 成功响应。
      • 第 29 行 : lastDirtyTimestamp 一致,返回 200 成功响应。
      • 第 24 至 30 行 :TODO[0022]:状态更新。
  • 第 31 至 33 行 :续租成功,返回 200 成功响应。

3.2 续租应用实例信息

调用 AbstractInstanceRegistry#renew(...) 方法,续租应用实例信息,实现代码如下:

 1: public boolean renew(String appName, String id, boolean isReplication){
 2: // 增加 续租次数 到 监控
 3: RENEW.increment(isReplication);
 4: // 获得 租约
 5: Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
 6: Lease<InstanceInfo> leaseToRenew = null;
 7: if (gMap != null) {
 8: leaseToRenew = gMap.get(id);
 9: }
10: // 租约不存在
11: if (leaseToRenew == null) {
12: RENEW_NOT_FOUND.increment(isReplication);
13: logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
14: return false;
15: } else {
16: InstanceInfo instanceInfo = leaseToRenew.getHolder();
17: if (instanceInfo != null) {
18: // touchASGCache(instanceInfo.getASGName());
19: // TODO 芋艿:over status
20: InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
21: instanceInfo, leaseToRenew, isReplication);
22: if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
23: logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
24: + "; re-register required", instanceInfo.getId());
25: RENEW_NOT_FOUND.increment(isReplication);
26: return false;
27: }
28: if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
29: Object[] args = {
30: instanceInfo.getStatus().name(),
31: instanceInfo.getOverriddenStatus().name(),
32: instanceInfo.getId()
33: };
34: logger.info(
35: "The instance status {} is different from overridden instance status {} for instance {}. "
36: + "Hence setting the status to overridden status", args);
37: instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
38: }
39: }
40: // TODO 芋艿:可能和自我保护
41: renewsLastMin.increment();
42: // 设置 租约最后更新时间(续租)
43: leaseToRenew.renew();
44: return true;
45: }
46: }
  • 第 2 至 3 行 :增加续租次数到监控。配合 Netflix Servo 实现监控信息采集。
  • 第 4 至 9 行 :获得租约( Lease )。
  • 第 10 至 14 行 :租约不存在,返回续租失败( false )。
  • 第 19 至 39 行 :TODO[0022]:状态更新。
  • 第 40 至 41 行 :TODO[0023]:自我保护。
  • 第 42 至 43 行 :调用 Lease#renew() 方法,设置租约最后更新时间( 续租 ),实现代码如下:

    public void renew(){
     lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }
    
  • 第 44 行 :返回续租成功( true )。

  • 整个过程修改的租约的过期时间,即使并发请求,也不会对数据的一致性产生不一致的影响,因此像注册操作一样加锁

666. 彩蛋

效率比想象的低一些,加油继续更新下一篇。

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?

原文  http://www.iocoder.cn/Eureka/instance-registry-renew/
正文到此结束
Loading...