本文的目的:
1.同步发送和异步发送原理解析
2.浅谈RocketMQ的架构设计
Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.
Apache RocketMQ 是一个分布式消息传递和流媒体平台,具有低延迟,高性能和可靠性,万亿级容量和灵活的可扩展性。
高可用、高性能
DefaultMQProducer producer = new DefaultMQProducer("SYNC_PRODUCER_GROUP");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 只需要在发送前初始化一次
producer.start();
// 构建消息实体
Message msg = new Message("SYNC_MSG_TOPIC", "TagA", ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送同步消息
SendResult sendResult = producer.send(msg);
复制代码
主要校验producerGroup属性是否满足 复制代码
创建MQClientlnstance实例 MQClientlnstance封装了RocketMQ网络处理API,是消息生产者( Producer)、消息消费者(Consumer)与NameServer、Broker打交道的网络通道 复制代码
将当前生产者加入到MQClientlnstance管理中,方便后续调用网络请求、进行心跳检测等 复制代码
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
复制代码
这块做的事情有点多 首先 this.mQClientAPIImpl.start()
org.apache.rocketmq.remoting.netty.NettyRemotingClient#start 通过romoting (netty客户端的实现)去建立连接 (反正这块可以理解为通过这个操作,可以服务通信了) 复制代码
再者我们了解下startScheduledTask大心脏
private void startScheduledTask() {
// 定时校验nameSrv 保证地址不为空来维持后续服务的可用性
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 定时刷新topic路由信息到客户端实例上
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// 定时清理离线的broker 并发送心跳保活
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// 定时获取所有消费进度
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 定时调整线程池
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
复制代码
接着启动了拉取消息服务
通过pullRequestQueue队列来维护拉取的消息 复制代码
this.rebalanceService.start
内部定时轮询做负载均衡 复制代码
给所有broker发送心跳并且加锁 复制代码
注册相关的shutDown钩子 复制代码
start()的流程主要步骤
绿色块的是核心步骤,主要围绕这几块核心阐述一下 这边就不贴具体的代码了 避免篇幅过长
消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息 要发送到具体的 Broker节点
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 从本地缓存读取尝试获取
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 通过topic获取配置
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 如果没有获取到配置,通过默认的topic去找路由配置信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
复制代码
步骤如下
udateTopicRoutelnfoFromNameServer这个方法的功能是消息生产者更新和维护路由缓存,其内部会对比路由信息和本地的缓存路由信息,以此判断是否需要更新路由信息
其实在这之前有行代码也值得关注一下 这块同步发送,mq本身是有个重试的次数可配置 默认x+1 然后根据发送的次数进行按需重试,如果失败就continue进入for循环
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; 复制代码
然后我们具体看一下是这么去选择消息队列的
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//是否开启故障延时机制
if (this.sendLatencyFaultEnable) {
try {
// 通过ThreadLocal保存上一次发送的消息队列下标
int index = tpInfo.getSendWhichQueue().getAndIncrement();
// 循环topic下所有的消息队列 确保所在Broker是正常的
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 判断当前消息队列是否可用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
复制代码
首先在一次消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName 就是上一次选择的执行发送消息失败的Broker.
第一次执行消息队列选择时,lastBrokerName为null,此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列个数取模,返回该位置的MessageQueue(selectOneMessageQueue()方法),如果消息发送再失败的话, 下次进行消息队列选择时规避上次 MesageQueue 所在的Broker,否则还是很有可能再次失败.
该算法在一次消息发送过程中能成功规避故障的Broker,但如果Broker若机,由于路由算法中的消息队列是按Broker排序的,如果上一次根据路由算法选择的是宕机的 Broker的第一个队列,那么随后的下次选择的是宕机Broker的第二个队列,消息发送很有可能会失败,再次引发重试,带来不必要的性能损耗,那么有什么方法在一次消息发送失败后,暂时将该Broker排除在消息队列选择范围外呢?或许有朋友会问,Broker不可用后路由信息中为什么还会包含该Broker的路由信息呢?其实这不难解释:首先, NameServer检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(1Os);
其次,NameServer不会检测到Broker宕机后马上推送消息给消息生产者,而是消息生产者每隔30s更新一次路由信息,所以消息生产者最快感知Broker最新的路由信息也需要30s。 如果能引人一种机制,在Broker宕机期间,如果一次消息发送失败后,可以将该 Broker暂时排除在消息队列的选择范围中.
/**
* 更新故障延迟
*
* @param brokerName
* @param currentLatency
* @param isolation
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
/**
* 计算不可用间隔时间
*
* @param currentLatency
* @return
*/
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
复制代码
如果isolation为true,则使用30s作为computeNotAvailableDuration方法的参数;
如果isolation为false,则使用本次消息发送时延作为 computeNotAvailableDuration方法的参数,那computeNotAvailableDuration的作用是计算因本次消息发送故障需要将Broker规避的时长,也就是接下来多久的时间内该 Broker将不参与消息发送队列负载.
具体算法:从latencyMax数组尾部开始寻找,找到第一个比currentLatency小的下标,然后从notAvailableDuration数组中获取需要规避的时长,该方法最终调用 LatencyFaultTolerance的updateFaultltem。
DefaultMQProducer producer = new DefaultMQProducer("SYNC_PRODUCER_GROUP");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 只需要在发送前初始化一次
producer.start();
for (int i = 0; i < 1; i++) {
try {
// 构建消息实体
Message msg = new Message("SYNC_MSG_TOPIC", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送同步消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
复制代码
producer发送消息前,优先从本地路由表中读取,没有就从Name Server获取路由信息,更新本地信息表,并且producer每隔30s从Name Server同步路由信息
发送高可用的两个方式: 重试机制 、 故障规避机制
重试机制 就是在发送失败时,x+1的重试次数,尽可能的保障能把消息成功发出去。
故障规避 就是在消息发送过程中发现错误,那么就把这个broker加上规避时间,这段时间内都不会去选择这个broker发消息,提高发送消息的成功率。
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
// 这块执行的时候,优先去获取可配置的公用线程池,如果有可用的就使用,没有就跑在当前线程中
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
复制代码
int tmp = curTimes.incrementAndGet();
if (needRetry && tmp <= timesTotal) {
String retryBrokerName = brokerName;//by default, it will send to the same broker
if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
retryBrokerName = mqChosen.getBrokerName();
}
String addr = instance.findBrokerAddressInPublish(retryBrokerName);
log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
retryBrokerName);
try {
request.setOpaque(RemotingCommand.createNewRequestId());
sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
timesTotal, curTimes, context, producer);
} catch (InterruptedException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingConnectException e1) {
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, true, producer);
} catch (RemotingTooMuchRequestException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingException e1) {
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, true, producer);
}
} else {
if (context != null) {
context.setException(e);
context.getProducer().executeSendMessageHookAfter(context);
}
try {
sendCallback.onException(e);
} catch (Exception ignored) {
}
}
复制代码
这里很好理解:在remotingClient.invokeAsync这块有异常的话递归重试,并且规避不可用的broker
异步发送的超时语义跟同步发送略有不同
同步在哪里返回?
当前设计线程池提交任务的时候就返回了,是毕竟符合逻辑的; 但是对比4.2的老版本 是在NettyRemotingClient.invokeAsync返回的,严格意义来说 老版本这样设计就已经不能称为严格意义上的异步了;
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
复制代码
最多发送一次 最少发送一次 精确发送一次
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
}
复制代码
nameServer将broker丢过来的数据搞成自己维护的一套
RocketMQ通过主从结构来实现消息冗余,master接收来自producer发送来的消息,然后同步消息到slave,根据master的role不同,同步的时机可分为两种不同的情况:
SYNC_MASTER和ASYNC_MASTER传输数据到salve的过程是一致的,只是时机上不一样。SYNC_MASTER接收到producer发送来的消息时候,会同步等待消息也传输到salve。
直观感受,IO操作性能是比较低的
基于ECS乞丐版linux环境测试 无cache写入和有cache
写入819M的操作,cache操作只需要7s 无cache操作就需要80s左右;更何况这是台烂机器的配置,好点的机器应该能相差个百倍;
所以如果按照第一个写入模型 如果写入1条耗时1ms => 1000/s 利用OS Cache的话 假设写入1条耗时0.01ms => 10w/s 这样一来每秒支持几十万已经初步实现
page cache了解链接
数据源:磁盘读到缓存(OS cache) cache copy -> 进程缓存 -> cache copy -> SOCKET
其实性能已经可以了 但是多了两步不必要的拷贝 那么如果没有这两步拷贝是不是就无敌了呢 答案:是
不难理解对Socket缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从os cache中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能
若有缘下期见