转载

Eureka源码剖析之一:初始化-启动

Eureka源码剖析之一:初始化-启动

点击上方蓝色字关注我们~

Eureka启动的过程有client端和server端, Eureka client端入口是DiscoveryClient类, Eureka server端入口是EurekaBootStrap类, 接下来我们就从源码看下它们做了什么吧!

〓Eureka Client端启动

1)看下DiscoveryClient类图:  

Eureka源码剖析之一:初始化-启动

由此看出DiscoveryClient实现了EurekaClient、LookupService接口,并且定义了内部类: DiscoverClientOptionalArgs,可选参数类,源码里实现为空,是默认实现,具体的需要去查看AbstractDiscoveryClientOptionalArgs这个抽象类; EurekaTransport类,封装了Client请求的类; CacheFreshThread,刷新缓存线程,提供定时拉取服务列表等; HeartbeatThread,心跳线程,提供定时向服务端续约服务等。

// DiscoveryClient类是一个单例,实现了EurekaClient接口
@Singleton
public class DiscoveryClient implements EurekaClient {
    // DiscoveryClient类的构造函数
    @Inject // 构造器注入
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }

        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();

        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }

        this.backupRegistryProvider = backupRegistryProvider;

        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());
        // 拉取服务计数器:单调地增加生成计数器,以确保陈旧的线程不会将注册表重置为旧版本。
        fetchRegistryGeneration = new AtomicLong(0);

        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

        // 过时注册统计
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
        // 过时心跳统计
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

        // 既不需要注册到Eureka也不拉取注册服务
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);

            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());

            return;  // no need to setup up an network tasks and we are done
        }

        try {
            // 定义了2个线程大小的定时线程池:一个是刷新缓存CacheFreshThread,一个是心跳线程HeartbeatThread
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
            // 心跳线程池
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
            // 刷新缓存线程池
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
            // 实例化EurekaTransport
            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);

            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        // 在启动时需要拉取注册服务列表,增量拉取之后如果失败就会从备份里面再次拉取
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }
        // 初始化定时任务
        initScheduledTasks();

        try {
            // 监控DiscoverClient
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    }

    /**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
        // 需要拉取注册服务,则定时拉取刷新缓存CacheRefreshThread,可以看#Eureka服务拉取 一篇
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            // 默认30秒,定时拉取服务
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
        // 需要注册到Eureka,则定时心跳请求服务端保持客户端存活,即服务续约。可以看#Eureka服务续约 一篇
        if (clientConfig.shouldRegisterWithEureka()) {
            // 默认30秒,定时进行服务续约
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator 当前实例节点复制器实例化
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    // 实例信息复制间隔,默认30秒
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            // 状态变化监听器
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
            // 实例节点复制器启动
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
}

class InstanceInfoReplicator implements Runnable {

    // 启动,延迟40秒启动
    public void start(int initialDelayMs) {
        // CAS保证启动一次
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            // 定时任务定时调用实例信息复制器(线程),逻辑看run方法
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

     public void run() {
        try {
            // 刷新实例信息
            discoveryClient.refreshInstanceInfo();
            // 获取实例信息的脏时间戳,如果存在则进行服务注册,服务注册可以看 #Eureka服务注册 一篇
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                //Client进行注册操作
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            // 定时每30秒进行刷新或注册请求
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
}

Eureka Client启动时会开启 三个定时任务

①刷新缓存定时服务,即定时拉取服务列表,默认每30秒进行定时拉取服务列表;同时②开启心跳线程定时服务,即定时向服务端进行服务续约,默认每30秒进行定时续约。③启动实例信息复制器进行刷新服务实例信息或服务注册请求。

〓Eureka Server端启动

EurekaBootStrap是server端启动入口,
PeerAwareInstanceRegistryImpl是真正的核心类,我们看下其类图:

Eureka源码剖析之一:初始化-启动

// 继承Servlet上下文监听器,说明Eureka Server是基于Servlet
public class EurekaBootStrap implements ServletContextListener {
    ...
    @Override
    public void contextInitialized(ServletContextEvent event) {
        try {
            // 初始化环境
            initEurekaEnvironment();
            // 初始化Euraka Server Context
            initEurekaServerContext();

            ServletContext sc = event.getServletContext();
            sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
        } catch (Throwable e) {
            logger.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }


    /**
     * init hook for server context. Override for custom logic.
     */
    protected void initEurekaServerContext() throws Exception {
        // Eureka Server读取配置
        EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();

        // For backward compatibility
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);

        logger.info("Initializing the eureka client...");
        logger.info(eurekaServerConfig.getJsonCodecName());
        // Server默认加载编码JSON、XML
        ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);

        ApplicationInfoManager applicationInfoManager = null;
        // EurekaClient初始化
        if (eurekaClient == null) {
            EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
                    ? new CloudInstanceConfig()
                    : new MyDataCenterInstanceConfig();

            applicationInfoManager = new ApplicationInfoManager(
                    instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

            EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
            eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
        } else {
            applicationInfoManager = eurekaClient.getApplicationInfoManager();
        }
        // 如果是使用AWS平台,这里不涉及
        PeerAwareInstanceRegistry registry;
        if (isAws(applicationInfoManager.getInfo())) {
            registry = new AwsInstanceRegistry(
                    eurekaServerConfig,
                    eurekaClient.getEurekaClientConfig(),
                    serverCodecs,
                    eurekaClient
            );
            awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
            awsBinder.start();
        } else {
            // 初始化集群实例,父类AbstractInstanceRegistry 
            registry = new PeerAwareInstanceRegistryImpl(
                    eurekaServerConfig,
                    eurekaClient.getEurekaClientConfig(),
                    serverCodecs,
                    eurekaClient
            );
        }

        // 初始化集群节点实例
        PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
                registry,
                eurekaServerConfig,
                eurekaClient.getEurekaClientConfig(),
                serverCodecs,
                applicationInfoManager
        );

        // 初始化EurekaSeverContext实例
        serverContext = new DefaultEurekaServerContext(
                eurekaServerConfig,
                serverCodecs,
                registry,
                peerEurekaNodes,
                applicationInfoManager
        );

        // 使用非注入的方式Holder保持EurekaServerContext实例
        EurekaServerContextHolder.initialize(serverContext);

        // EurekaServerContext初始化
        serverContext.initialize();
        logger.info("Initialized server context");

        // 复制注册列表到集群上的其它节点
        int registryCount = registry.syncUp();
        registry.openForTraffic(applicationInfoManager, registryCount);

        // Register all monitoring statistics.
        EurekaMonitors.registerAllStats();
    }

    // 初始化获得集群节点
    protected PeerEurekaNodes getPeerEurekaNodes(PeerAwareInstanceRegistry registry, EurekaServerConfig eurekaServerConfig, EurekaClientConfig eurekaClientConfig, ServerCodecs serverCodecs, ApplicationInfoManager applicationInfoManager) {
        PeerEurekaNodes peerEurekaNodes = new PeerEurekaNodes(
                registry,
                eurekaServerConfig,
                eurekaClientConfig,
                serverCodecs,
                applicationInfoManager
        );

        return peerEurekaNodes;
    }
    ...

}

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    /** 
     * 从eureka节点peer封装注册信息,进行节点信息的注册同步,如果操作失败则会进入重试
     * Populates the registry information from a peer eureka node. This
     * operation fails over to other nodes until the list is exhausted if the
     * communication fails.
     */
    @Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;
        // 默认最大5次进行同步注册
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    // 重试等待30秒
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            // 节点注册
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        // 每分钟服务续约的数量:每个节点服务续约每30秒一次,那么多个节点需要count*2次
        this.expectedNumberOfRenewsPerMin = count * 2;
        // 服务续约最小百分阈值默认为0.85。即最小阈值为最小预约数乘以0.85
        this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
        logger.info("Got " + count + " instances from neighboring DS node");
        logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        // 调用父类的定时启动剔除任务定时启动剔除任务,一旦达到剔除条件则会调用服务下线接口,可以看 #Eureka服务下线 一篇
        super.postInit();
    }
}

// PeerAwareInstanceRegistryImpl父类
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    ...
    /**
     * Create a new, empty instance registry.
     */
    protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
        this.serverConfig = serverConfig;
        this.clientConfig = clientConfig;
        this.serverCodecs = serverCodecs;
        // 近期下线队列recentCanceledQueue
        this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
        // 近期注册队列recentRegisteredQueue
        this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);

        // 近期1分钟的续约计量统计任务
        this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);

        // 定时任务:定期清除近期变更队列
        this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
                serverConfig.getDeltaRetentionTimerIntervalInMs(),
                serverConfig.getDeltaRetentionTimerIntervalInMs());
    }

    private TimerTask getDeltaRetentionTask() {
        return new TimerTask() {

            @Override
            public void run() {
                Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
                while (it.hasNext()) {
                    if (it.next().getLastUpdateTime() <
                            System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                        it.remove();
                    } else {
                        break;
                    }
                }
            }

        };
    }
    ...

    // 定时启动剔除任务
    protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }

     /**
     * Registers a new instance with a given duration.
     * 给定一个租期时间注册一个新的实例
     * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
     */
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            // 通过appname获取实例注册数据
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            // 注册计数器+1
            REGISTER.increment(isReplication);
            // 如果实例对应数据不存在,则进行初始化
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            // 通过实例id获得实例租约信息,如果租约信息存在,那么会比较LastDirtyTimestamp,如果租约信息大于传进来的实例的LastDirtyTimestamp,那么则直接将使用缓存中的注册实例
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // 租约信息不存在则作为一个新的注册
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            // 根据注册者和租约时间,实例化租约实例化信息
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            // 将注册者添加到最新注册队列
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            // 注册者重写状态不为UNKNOWN,并且重写状态Map不包含实例,则将其put到Map中
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            // 如果租约被注册后为UP状态,那么标记服务启动时间戳,而且只是首次才会进行设置
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            // 将租约存放到近期变更队列
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            // 使对应实例缓存失效
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        } 
    }

}

@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
    ...
    @PostConstruct
    @Override
    public void initialize() throws Exception {
        logger.info("Initializing ...");
        peerEurekaNodes.start();
        registry.init(peerEurekaNodes);
        logger.info("Initialized");
    }
    ...
}

// 集群节点:定时更新集群节点
@Singleton
public class PeerEurekaNodes {
    ...

    public void start() {
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
            // 预先更新集群节点
            updatePeerEurekaNodes(resolvePeerUrls());
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  " + node.getServiceUrl());
        }
    }

    /**
     * Given new set of replica URLs, destroy {@link PeerEurekaNode}s no longer available, and
     * create new ones. 新增集群备份节点,移除不再可用的节点,并且创建新的node节点
     *
     * @param newPeerUrls peer node URLs; this collection should have local node's URL filtered out
     */
    protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
        // 参数newPeerUrls为空则不处理
        if (newPeerUrls.isEmpty()) {
            logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
            return;
        }
        // 即将关闭的节点集合
        Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
        // 因为新增的节点可能在即将关闭的节点集合内,所以先从中移除。
        toShutdown.removeAll(newPeerUrls);
        // 即将新增的节点集合
        Set<String> toAdd = new HashSet<>(newPeerUrls);
        // 从新增节点集合中移除本地节点集合
        toAdd.removeAll(peerEurekaNodeUrls);
        // 如果即将关闭的节点集合为空并且即将新增的节点也为空,则说明没有变化,不需要处理,立即返回。
        if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
            return;
        }

        // 移除不再可能的节点,当前包含全部已有节点
        List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
        // 即将关闭的节点集合不为空,则将当前节点集合中移除对应节点
        if (!toShutdown.isEmpty()) {
            logger.info("Removing no longer available peer nodes {}", toShutdown);
            int i = 0;
            while (i < newNodeList.size()) {
                PeerEurekaNode eurekaNode = newNodeList.get(i);
                if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                    // 移除节点,节点进行关闭shutdown操作
                    newNodeList.remove(i);
                    eurekaNode.shutDown();
                } else {
                    i++;
                }
            }
        }

        // 新增节点
        if (!toAdd.isEmpty()) {
            logger.info("Adding new peer nodes {}", toAdd);
            for (String peerUrl : toAdd) {
                newNodeList.add(createPeerEurekaNode(peerUrl));
            }
        }
        // 重新赋值
        this.peerEurekaNodes = newNodeList;
        this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
    }

    // 实时获取排除自身节点所剩下集群的节点地址
    protected List<String> resolvePeerUrls() {
        InstanceInfo myInfo = applicationInfoManager.getInfo();
        String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
        List<String> replicaUrls = EndpointUtils
                .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

        int idx = 0;
        while (idx < replicaUrls.size()) {
            if (isThisMyUrl(replicaUrls.get(idx))) {
                replicaUrls.remove(idx);
            } else {
                idx++;
            }
        }
        return replicaUrls;
    }
    ...
}

1)Eureka Server端启动入口类是继承ServletContextListener的EurekaBoostrap,首先会初始化eureka环境和初始化上下文,初始化上下文时会进行EurekaServer配置的初始化、JSON、XML编码器转化注册、初始化集群注册实例、和集群节点实例等。

2)调用EurekaServerContext上下文进行初始化: 启动默认每10分钟定时更新集群节点数据、响应缓存初始化、启动默认每15分钟定时任务更新服务续约最小期望数量和最小续约阈值。

3)集群节点的同步注册: 支持失败最大5次重试,进行集群节点间的相互注册。初始化服务续约最小期望数量和最小续约阈值。

4)服务端注册表维护了近期下线环形队列recentCanceledQueue、近期注册环形队列recentRegisteredQueue、近期变化队列recentlyChangeQueue。

5)启动默认每1分钟定时任务EvictionTask从最新变化队列中清除过期项(内存记录节点数据)。

总 结 

〓Eureka client

启动核心类DiscoveryClient,启动时会开启三个定时任务:

①刷新缓存定时服务,即定时拉取服务列表,默认每30秒进行定时拉取服务列表;同时②开启心跳线程定时服务,即定时向服务端进行服务续约,默认每30秒进行定时续约。③启动实例信息复制器进行刷新服务实例信息或服务注册请求。

〓Eureka Server

启动入口类是EurekaBootStrap,核心类是PeerAwareInstanceRegistryImpl,

初始化:首先会初始化eureka环境和初始化上下文,初始化上下文时会进行EurekaServer配置的初始化、JSON、XML编码器转化注册、初始化集群注册实例、和集群节点实例等。

更新节点信息定时任务、定时更新续约数据: 调用EurekaServerContext上下文进行初始化: 启动默认每10分钟定时更新集群节点数据、响应缓存initializedResponseCache初始化、启动默认每15分钟定时任务更新服务续约最小期望数量和最小续约阈值。

集群同步 :集群节点的同步注册:支持失败最大5次重试,进行集群节点间的相互注册。

初始化服务续约最小期望数量和最小续约阈值。

本地缓存数据 :服务端注册表维护了近期下线环形队列recentCanceledQueue、近期注册环形队列recentRegisteredQueue、近期变化队列recentlyChangeQueue。

剔除任务 :启动默认每1分钟定时任务EvictionTask从最新变化队列中清除过期项(内存记录节点数据)。

这里只是针对Eureka启动初始化做了简要剖析,更多详细的篇章请看后面的分析。

PS:若哪里写的有误或者不明白的,请多多指教!

原文  http://mp.weixin.qq.com/s?__biz=MzA5NTUzNTA2Mw==&mid=2454934152&idx=1&sn=5e9fbf75d832f21bd953027bb4b785da
正文到此结束
Loading...