rocketMQ(4.6.1)系列教程–namesrv 篇

Namesrv

Namesrv 架构设计

namesrv 作用

namesrv 充当服务注册中心的作用,向 producerconsumer 提供 broker 的信息,并将不可用的 broker 及时剔除。有点类似 eureka-server 的作用。与 eureka-server 不大一样的是, namesrv 集群之间信息不共享 ,也无法同步。broker 注册的时候,是同时向 namesrv 集群进行注册。eureka-client 则是向集群中的一台 eureka-server 注册。eureka-server 再进行集群间的消息同步。

namesrv 架构设计图

rocketMQ(4.6.1)系列教程--namesrv 篇

namesrv 启动流程

初始化资源

namesrv 启动时,主要是加载配置文件,初始化 NettyServer,NamesrvController。并由 NamesrvController 创建 2 个线程池

NamesrvController#initialize()

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

   @Override
   public void run() {
     NamesrvController.this.routeInfoManager.scanNotActiveBroker();
   }
 }, 5, 10, TimeUnit.SECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

  @Override
  public void run() {
    NamesrvController.this.kvConfigManager.printAllPeriodically();
  }
}, 1, 10, TimeUnit.MINUTES);

10 秒 扫描不可用的 broker。以及每 10 分钟 打印 KV 配置。

注册 JVM 钩子,释放资源

在启动后,注册 JVM 钩子函数,在程序停止时,优雅的关闭资源

NamesrvStartup#start(final NamesrvController controller)

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
  @Override
  public Void call() throws Exception {
    // controller 为 NamesrvController
    controller.shutdown();
    return null;
  }
}));

路由注册、故障剔除

路由注册、故障剔除,简称对 broker 信息在并发下的 CURD

路由元信息

RouteInfoManager

基本属性

// 读写
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 主题队列表,初始化时,topic 默认有 4 个队列
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// broker 信息。包含集群名称、broker 名称、主备 broker 地址
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// broker 集群信息。存储集群中所有的名称
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// broker 状态信息,namesrv 每次收到心跳包,会替换该信息
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// broker FilterService 列表,用于类模式消息过滤
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

QueueData

private String brokerName;
private int readQueueNums;
private int writeQueueNums;
// 读写权限,参考 permName
private int perm;
private int topicSynFlag;

BrokerData

private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

BrokerLiveInfo

// 最后一次更新时间, namesrv 收到 broker 心跳后,会更新该信息。
private long lastUpdateTimestamp;
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;

路由信息 CURD(滑稽)

路由注册

broker 发送心跳包

Broker 在启动 10 秒 后,每隔 30 秒 会向注册过的所有 namesrv 发送心跳包

BrokerController#start()

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
// brokerConfig.getRegisterNameServerPeriod()  = 30000

BrokerOuterAPI@registerBrokerAll()

// 使用 CountDownLatch 等到向所有 namesrv 注册完毕,才会放行
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
  brokerOuterExecutor.execute(new Runnable() {
    @Override
    public void run() {
      try {
        RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
        if (result != null) {
          registerBrokerResultList.add(result);
        }

        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
      } catch (Exception e) {
        log.warn("registerBroker Exception, {}", namesrvAddr, e);
      } finally {
        countDownLatch.countDown();
      }
    }
  });
}

try {
  countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}

namesrv 接收心跳包

请求被 DefaultRequestProcessor 接收,code = REGISTER_BROKER

DefaultRequestProcessor#processRequest()

switch (request.getCode()) {
     ...
  ...
    case RequestCode.REGISTER_BROKER:
     Version brokerVersion = MQVersion.value2Version(request.getVersion());
     if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
       return this.registerBrokerWithFilterServer(ctx, request);
     } else {
       return this.registerBroker(ctx, request);
     }
  ...
  ...
 }

接着由 RouteInfoManager 执行注册的操作,维护 broker 信息,主要维护以下几个Map 的信息

clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable、filterServerTable

RouterInfoManager#registerBroker()

...
...
// 拿到 写锁
this.lock.writeLock().lockInterruptibly();

// 维护 clusterAddrTable 
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
    brokerNames = new HashSet<String>();
    this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);

// 维护 brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
    registerFirst = true;
    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
    this.brokerAddrTable.put(brokerName, brokerData);
}

// 维护 topicQueueTable
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
  if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
      || registerFirst) {
    ConcurrentMap<String, TopicConfig> tcTable =
      topicConfigWrapper.getTopicConfigTable();
    if (tcTable != null) {
      for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
        // 真正做维护操作的方法
        this.createAndUpdateQueueData(brokerName, entry.getValue());
      }
    }
  }
}

// 维护 brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
    System.currentTimeMillis(),
    topicConfigWrapper.getDataVersion(),
    channel,
    haServerAddr));

...
...
// 释放写锁
this.lock.writeLock().unlock();

路由剔除

Namesrv 路由剔除触发时机:

  • Namesrv 每隔 10 秒 扫描 brokerLiveInfo。检查 lastUpdateTimestamp 与当前系统时间差 120s,则剔除该broker
  • Broker 正常关闭,触发 JVM 钩子函数,执行 unregisterBroker 操作

路由剔除的操作, 同样无非是对 RouterInfoManager 中维护的 broker 信息进行 CURD。不做过多介绍

路由发现

RocketMQ 路由发现非实时,当 Topic 路由发生变化后,Namesrv 不主动推送给客户端,而是由客户端拉取主题最新的路由

根据主题名称拉取路由信息的 code = RequestCode.GET_ROUTEINTO_BY_TOPIC

RouteInfoManager#pickupTopicRouteData()

try {
  // 获取读锁
  this.lock.readLock().lockInterruptibly();
  
  // 从 topicQueueTable 拿到topic 对应的 队列数据
  List<QueueData> queueDataList = this.topicQueueTable.get(topic);
  if (queueDataList != null) {
    topicRouteData.setQueueDatas(queueDataList);
    foundQueueData = true;

    Iterator<QueueData> it = queueDataList.iterator();
    while (it.hasNext()) {
      QueueData qd = it.next();
      brokerNameSet.add(qd.getBrokerName());
    }
      
    // 从 brokerAddrTable  获取 broker 对应的地址
    for (String brokerName : brokerNameSet) {
      BrokerData brokerData = this.brokerAddrTable.get(brokerName);
      if (null != brokerData) {
        BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());
        brokerDataList.add(brokerDataClone);
        foundBrokerData = true;
        for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
          List<String> filterServerList = this.filterServerTable.get(brokerAddr);
          filterServerMap.put(brokerAddr, filterServerList);
        }
      }
    }
  }
} finally {
  this.lock.readLock().unlock();
}

主要是操作 topicQueueTable, brokerAddrTable, filterServerMap,填充 topicRouteData。

总结

  • QueueData、BrokerData、BrokerLiveInfo;需要知道这些类的数据结构,有助于理解 路由注册、剔除、发现的基本原理
  • 只要知道 RouterInfoManager 中的几个 Map。基本能知道,路由注册、剔除、发现的基本原理,因为本质都是对这几个集合进行操作

知识扩展阅读

Q: namesrv 之间数据不共享,那么会造成消息的发送失败吗?

A: 该问题留到 消息发送时说。

Q: RouterInfoManager 中,操作 Map 为什么使用读写锁?面对可能并发操作集合,为什么不使用 ConcurrentHashMap

A:

  • 读写需要互斥。写操作涉及多个Map。因此即使集合使用 ConcurrentHashMap 也没有。因为需要保证多个Map 的写操作是原子性。
  • 读多写少。

原文 

https://segmentfault.com/a/1190000022103204

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » rocketMQ(4.6.1)系列教程–namesrv 篇

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址