转载

Eureka启动流程-源码小段分析

本文参考自朱荣鑫老师的书《Spring Cloud微服务架构进阶》,强烈建议阅读本书。

Eureka是Netflix开源的服务治理组件,内部网络间的微服务调用已不再使用IP地址,而使用微服务名称,所以需要有Eureka这样的的组件存在,负责维护服务的状态。Spring Cloud整合了Eureka,使用Spring生态可以做到对其开箱即用。

除了Eureka,Spring Cloud还整合了其他Netflix组件,统称为Spring Cloud Netflix,Spring Cloud Netflix包含了服务治理Eureka、路由Zuul、客户端负载均衡Ribbon、熔断器Hystrix。

除了上面这些,开发中常用的还有声明式Http客户端Feign,也是Netflix公司开源的。Spring Cloud在Feign的基础上支持了Spring MVC的注解,叫OpenFeign。

言归正传,微服务离不开服务治理,本章探讨Eureka的使用和源码以及集群的实现原理。

总览

Eureka启动流程-源码小段分析

上图是Eureka官方的架构图。这里面有如下角色。

  • Application Service:这是你的业务系统服务端(微服务)
  • Eureka Client:这是Eureka客户端,可以理解为一个jar包,嵌入在你的业务系统Application Service中,用于向Eureka服务端注册信息等
  • Application Client:这是你的业务系统客户端,嵌入了Eureka Client用于向Eureka服务端获取你要调用的Application Service信息,然后Application Client发起向Application Service的调用
  • Eureka Server:Eureka服务器,管理所有的微服务状态
  • us-east-1c:Eureka最初设计的目的是用于亚马逊云服务AWS的分布式系统的,所以引入了AWS的Region(区域)和Zone(Availability Zone可用区)的概念,一个Region包含多个Zone。上图中us-east-1c, us-east-1d, us-east-1e都属于Zone,这三个Zone属于us-east-1这个Region

Eureka Client提供了以下几个功能:

  • 向Eureka Server注册自己
  • 定时向Eureka Server续约
  • 客户端下线
  • 获取注册表

对应的,Eureka Server提供以下功能:

  • 提供服务注册
  • 接收服务心跳
  • 服务下线
  • 获取注册表中服务实例信息
  • 服务剔除
  • 集群同步

准备工作

为了方便跟踪问题,可以把netflix包的DEBUG日志打开。

logging:
  level:
    com.netflix: DEBUG
复制代码

同时,我们应该知道,Spring Boot的自动化配置原理是加载了类路径下的META-INF/spring.factories文件,如下图,eureka-client包中加载的自动化配置类如下:

Eureka启动流程-源码小段分析

分析步骤

通过Eureka DEBUG级别的日志打印,我们可以看到第一条有关Eureka的日志为:

2019-11-24 10:48:11.235  INFO 115648 --- [           main] com.netflix.discovery.DiscoveryClient    : Initializing Eureka in region us-east-1
2019-11-24 10:48:13.030  INFO 115648 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using JSON encoding codec LegacyJacksonJson
2019-11-24 10:48:13.030  INFO 115648 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using JSON decoding codec LegacyJacksonJson
2019-11-24 10:48:13.232  INFO 115648 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using XML encoding codec XStreamXml
复制代码

以上为类com.netflix.discovery.DisconveryClient打印的日志,在此处打一个断点,debug模式重新启动应用。在IDEA或Eclipse中查看调用栈,如下图:

Eureka启动流程-源码小段分析

从上图可以看出,该方法入口刚好吻合我们上面的分析,是Eureka的自动配置类EurekaClientAutoConfiguration触发的。其源码如下:

// 打印日志
logger.info("Initializing Eureka in region {}", this.clientConfig.getRegion());
// 配置文件中的register-with-eureka和fetch-registry如果都为false则不注册和拉去服务列表了
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
	logger.info("Client configured to neither register nor query for data.");
	this.scheduler = null;
	this.heartbeatExecutor = null;
	this.cacheRefreshExecutor = null;
	this.eurekaTransport = null;
	this.instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), this.clientConfig.getRegion());
	DiscoveryManager.getInstance().setDiscoveryClient(this);
	DiscoveryManager.getInstance().setEurekaClientConfig(config);
	this.initTimestampMs = System.currentTimeMillis();
	logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.getApplications().size());
} else {
复制代码

同理,其他源码的分析也可以通过DEBUG日志和断点来分析。以下不再赘述。

Eureka Client源码解析

Eureka为了做到开箱即用,简化开发人员的开发工作,将很多与Eureka Server交互的工作隐藏起来,自主完成。在应用的不同阶段执行不同工作,如下图。

Eureka启动流程-源码小段分析

从上面代码

logger.info("Initializing Eureka in region {}", this.clientConfig.getRegion());
复制代码

打断点逐步跟踪,可以发现Eureka Client的执行步骤如下:

1. 向Eureka服务器拉取全量注册信息

代码位于DiscoveryClient#fetchRegistry方法中。

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
	Stopwatch tracer = this.FETCH_REGISTRY_TIMER.start();

	label122: {
		boolean var4;
		try {
            Applications applications = this.getApplications();
            // 如果增量式拉取被禁止,或者Applications为null,进行全量拉取
			if (!this.clientConfig.shouldDisableDelta() && Strings.isNullOrEmpty(this.clientConfig.getRegistryRefreshSingleVipAddress()) && !forceFullRegistryFetch && applications != null && applications.getRegisteredApplications().size() != 0 && applications.getVersion() != -1L) {
				this.getAndUpdateDelta(applications);
			} else {
				logger.info("Disable delta property : {}", this.clientConfig.shouldDisableDelta());
				logger.info("Single vip registry refresh property : {}", this.clientConfig.getRegistryRefreshSingleVipAddress());
				logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
				logger.info("Application is null : {}", applications == null);
				logger.info("Registered Applications size is zero : {}", applications.getRegisteredApplications().size() == 0);
                logger.info("Application version is -1: {}", applications.getVersion() == -1L);
                // 全量拉取注册表信息
				this.getAndStoreFullRegistry();
			}

			applications.setAppsHashCode(applications.getReconcileHashCode());
			this.logTotalInstances();
			break label122;
		} catch (Throwable var8) {
			logger.error("DiscoveryClient_{} - was unable to refresh its cache! status = {}", new Object[]{this.appPathIdentifier, var8.getMessage(), var8});
			var4 = false;
		} finally {
			if (tracer != null) {
				tracer.stop();
			}

		}

		return var4;
	}

	this.onCacheRefreshed();
	this.updateInstanceRemoteStatus();
	return true;
}
复制代码

拉取注册表地址的代码为:

// 这里的urlPath传入的是/apps
WebResource webResource = this.jerseyClient.resource(this.serviceUrl).path(urlPath);
复制代码

以上的this.serviceUrl默认为http://localhost:8761/eureka所以注册表地址是http://localhost:8761/eureka//apps

为了避免本文篇幅太长,建议想了解这部分的原理请看朱荣鑫老师的书《Spring Cloud微服务架构进阶》

这里总结下Eureka Client启动的整个过程:

http://localhost:8761/eureka/apps
http://localhost:8761/eureka/apps/{app-name}
http://localhost:8761/eureka/apps/{app-name}/{instance-info-id}
http://localhost:8761/eureka/apps/{app-name}/{instance-info-id}

Eureka Server源码解析

Eureka Server作为一个开箱即用的服务注册中心,需要注意的是,Eureka Server同时也是一个Eureka Client,它会向它配置文件中的其他Eureka Server进行拉取注册表、服务注册和发送心跳等操作。

还是按照上面源码的分析步骤,将com.netflix包调为DEBUG级别,可以在日志中看到如下:

2019-11-25 11:14:23.816  INFO 237032 --- [           main] c.n.e.registry.AbstractInstanceRegistry  : Finished initializing remote region registries. All known remote regions: []
2019-11-25 11:14:23.817  INFO 237032 --- [           main] c.n.eureka.DefaultEurekaServerContext    : Initialized
2019-11-25 11:14:23.846  INFO 237032 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2019-11-25 11:14:23.866  INFO 237032 --- [           main] s.b.a.e.w.s.WebMvcEndpointHandlerMapping : Mapped "{[/actuator/health],methods=[GET],produces=[application/vnd.spring-boot.actuator.v2+json || application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.web.servlet.AbstractWebMvcEndpointHandlerMapping$OperationHandler.handle(javax.servlet.http.HttpServletRequest,java.util.Map<java.lang.String, java.lang.String>)
复制代码

查看方法AbstractInstanceRegistry#register,该方法是提供服务注册功能的基础。源码如下:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
	try {
        read.lock();
        // 根据appName对服务实例集群进行分类
		Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
		REGISTER.increment(isReplication);
		if (gMap == null) {
			final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
			gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
			if (gMap == null) {
				gMap = gNewMap;
			}
        }
        // 根据实例id获取实例的租约
		Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
		// Retain the last dirty timestamp without overwriting it, if there is already a lease
		if (existingLease != null && (existingLease.getHolder() != null)) {
			Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
			Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
			logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

			// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
			// InstanceInfo instead of the server local copy.
			if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
				logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
						" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
				logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
				registrant = existingLease.getHolder();
			}
		} else {
            // The lease does not exist and hence it is a new registration
            // 如果租约不存在,这是一个新的注册实例
			synchronized (lock) {
				if (this.expectedNumberOfRenewsPerMin > 0) {
					// Since the client wants to cancel it, reduce the threshold
					// (1
                    // for 30 seconds, 2 for a minute)
                    // 自我保护机制
					this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
					this.numberOfRenewsPerMinThreshold =
							(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
				}
			}
			logger.debug("No previous lease information found; it is new registration");
        }
        // 创建新的租约
		Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
		if (existingLease != null) {
			lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
		}
		gMap.put(registrant.getId(), lease);
		synchronized (recentRegisteredQueue) {
			recentRegisteredQueue.add(new Pair<Long, String>(
					System.currentTimeMillis(),
					registrant.getAppName() + "(" + registrant.getId() + ")"));
		}
		// This is where the initial state transfer of overridden status happens
		if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
			logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
							+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
			if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
				logger.info("Not found overridden id {} and hence adding it", registrant.getId());
				overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
			}
		}
		InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
		if (overriddenStatusFromMap != null) {
			logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
			registrant.setOverriddenStatus(overriddenStatusFromMap);
		}

		// Set the status based on the overridden status rules
		InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
		registrant.setStatusWithoutDirty(overriddenInstanceStatus);

		// If the lease is registered with UP status, set lease service up timestamp
		if (InstanceStatus.UP.equals(registrant.getStatus())) {
			lease.serviceUp();
		}
		registrant.setActionType(ActionType.ADDED);
		recentlyChangedQueue.add(new RecentlyChangedItem(lease));
		registrant.setLastUpdatedTimestamp();
		invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
		logger.info("Registered instance {}/{} with status {} (replication={})",
				registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
	} finally {
		read.unlock();
	}
}
复制代码

为了避免本文篇幅太长,建议想了解这部分的原理请看朱荣鑫老师的书《Spring Cloud微服务架构进阶》

这里总结下Eureka Server启动的整个过程:

  • 提供服务注册
  • 接收服务心跳
  • 服务剔除
  • 服务下线
  • 集群同步:Eureka Server启动时,从它的peer节点拉取注册表,每个Eureka Server对本地注册表进行操作时,它将遍历Eureka Server的peer节点,发送同步请求。
  • 提供获取注册表信息。

以上,本文结束。

原文  https://juejin.im/post/5ddb524ce51d45236665e6da
正文到此结束
Loading...