Eureka源码之二:服务注册

Eureka源码之二:服务注册

点击上方蓝色字关注我们~

Eureka源码剖析之一:初始化-启动

EurekaClient在启动时会进行一系列初始化操作,本篇文章主要解析EurekaClient端向EurekaServer端发起注册请求的具体过程,具体分为EurekaClient端发送请求和EurekaServer端接收请求。在较新的版本看到代码进行了优化,所以还是以某一版本为准进行剖析。这里是1.X版本最新版本1.9.15。

〓Eureka Client发出注册请求

    // DiscoveryClient构造器中:
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
//配置了开启注册和强制注册初始化时,则会立马调用注册方法
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}

-----------------------------------
// 实例信息复制器初始化,并且启动。
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
// InstanceInfoReplicator.java实现了RUNNABLE,接着看start方法
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
// 30秒后执行当前对象任务
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}

public void run() {
try {
// 刷新实例信息
discoveryClient.refreshInstanceInfo();
// 判断实例信息是否发生改变
// isInstanceInfoDirty默认为false,实例信息发生改变时会被设为true
// 被设为ture,则dirtyTimestamp不为null,则发起注册请求
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 注册
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
// 40秒后执行
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}

------------------------------------
/**
* 通过REST调用注册eureka服务
*/

boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
// 根据响应状态码204判断是否注册成功
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

Eureka Client启动时会进行注册:

①根据配置是否立即进行注册请求;

②启动实例信息复制器时会延迟30秒进行注册操作,之后每隔40秒延迟之后判断实例信息是否改变而继续执行。


〓Eureka Server接收注册请求

// 从eureka-core包看EurekaBootStrap可以看出这是服务端启动的入口,
// 不出意外看到EurekaBootStrap继承了ServletContextListener,
// 从而得知随着Web容器的启动,Eureka服务端也同时启动,并且进行了一系列的初始化动作,
// 下面看下PeerAwareInstanceRegistryImpl这个类,这是后端服务的核心类。
// 查看其register方法,跟踪进去可以发现是被ApplicationResource调用,
// 也说明ApplicationResource提供了给客户端进行注册的http接口,跟上面client端请求http接口对应上了。
public class ApplicationResource {
...

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION
) String isReplication)
{
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
// 参数校验
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}

// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
// 服务端处理注册的入口
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
...
}

// 服务端注册 peerAwareInstanceRegistryImpl#register
public void register(final InstanceInfo info, final boolean isReplication) {
// 默认租约时间90秒
// 自己设置的租约有效时间
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 注册
super.register(info, leaseDuration, isReplication);
// 注册信息复制到其它eureka节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

// 父类注册:AbstractInstanceRegistry#register,指定续约时间进行注册
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
// 获得读
read.lock();
// 从注册表中获取实例信息映射
// 根据实例名称获取租约信息
// registry是ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>存储结构
// 由于同一名称的实例有可能有多个,所以registry的Key设计为实例的名称,具体的实例信息用Value里的Id区分
// Key为实例集合的名称,Value是一个Map,Value的Key是实例的Id,Value的Value是实例的租约信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
// 注册计数
REGISTER.increment(isReplication);
// 首次注册,初始化对应注册表信息
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 获得存在的续约实例信息
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
// 如果EurekaServer端已存在实例信息且时间戳大于新传入实例信息中的时间戳,则用EurekaServer端的数据替换新传入的实例信息
// 防止过期的实例信息注册到EurekaServer端
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
// 续约信息不存在
synchronized (lock) {
// 自我保护机制
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
// 客户端发送续约数增1,后更新其阈值
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 包装租赁实例
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 将租约实例放在内存中
gMap.put(registrant.getId(), lease);
// 将注册信息添加到近期注册队列
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// This is where the initial state transfer of overridden status happens
// 这是发生覆盖状态的初始状态转移的地方,将覆盖状态进行存储到内存中,即最终状态
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
// 更新注册者的覆盖状态
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}

// Set the status based on the overridden status rules
// 基于覆盖状态规则设置状态
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);

// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
// 设置服务启动时间
lease.serviceUp();
}
// 设置行为类型为新增
registrant.setActionType(ActionType.ADDED);
// 最近变化续约队列增加当前续约实例信息
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 设置最后更新时间戳
registrant.setLastUpdatedTimestamp();
// 使响应缓存失效
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
// 释放读锁
read.unlock();
}
}

/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
* 复制全部eureka操作复制到其它对等节点,除了当前节点本身
*/

private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication
)
{
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
// 复制计数
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
// 已经复制过则无需再次复制
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 遍历eureka节点
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
// 排除复制自身
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 复制实例行为到对等节点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}

/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/

private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node
)
{
try {
InstanceInfo infoFromRegistry;
CurrentRequestVersion.set(Version.V2);
// 复制行为
switch (action) {
case Cancel: // 取消
node.cancel(appName, id);
break;
case Heartbeat: // 心跳
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register: // 注册
node.register(info);
break;
case StatusUpdate: // 状态更新
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride: // 删除
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
} finally {
CurrentRequestVersion.remove();
}
}

// 节点注册:PeerEurekaNode#register
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
// 任务批次分发器
batchingDispatcher.process(
// 生成任务id
taskId("register", info),
// 复制任务
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
// 使用HTTP调用注册接口
return replicationClient.register(info);
}
},
expiryTime
);
}

// 使用Jersey调用注册接口:AbstractJerseyEurekaHttpClient#register
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}

总结





Eureka Client



1)注册1:
DisoverClient构造器里会根据是否开启立即调用注册Eureka Server接口进行注册;

2)注册2:
实例信息复制器启动时会使用定时调度30秒延迟调用注册方法,如果实例信息发生改变,则之后会每隔40秒进行重新注册;

3)注册方式:
使用Jersey HTTP向Eureka Server发起注册请求。





Eureka Server



1)启动:
EurekaBootStrap启动入口,继承ServletContextListener。
核心类:
PeerAwareInstanceRegistryImpl;

2)Eureka Server注册逻辑:
服务端本地缓存(队列)进行处理注册,比如:
状态的更新、添加到近期变化队列等;

3)复制:
集群环境下,节点之间需要进行注册信息的同步
除了本身节点,将会复制注册信息到其它对等节点,最后同样是使用Jersey进行请求同步注册。

原文 

http://mp.weixin.qq.com/s?__biz=MzA5NTUzNTA2Mw==&mid=2454934154&idx=1&sn=e8187c0192b7d81bac288eccbed6a34c

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » Eureka源码之二:服务注册

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址