转载

RabbitMQ 基础教程(2) - Work Queue

RabbitMQ 基础教程(2) - Work Queue

注:本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。

在上一篇文章 RabbitMQ 基础教程(1) - Hello World 中,我们已经简单的介绍了 RabbitMQ 以及如何发送和接收一个消息。接下来我们将继续深入 RabbitMQ ,研究一下 消息队列(Work Queue)

消息队列

消息的 发布者 发布一个消息到 消息队列 中,然后信息的 消费者 取出消息进行消费。

                                 queue  +-------------+      +--+--+--+--+--+--+     +-------------+  |   producer  |----->|m1|m2| ... |  |  |---->|   consumer  |  +-------------+      +--+--+--+--+--+--+     +-------------+ 

但是实际情况往往比这个要复杂,假如我们有多个信息的发布者和多个信息的消费者,那 RabbitMQ 又将会是怎么工作呢?

+--------------+                              +--------------+ |   producer1  +-                           / |  consumer1   | +--------------+ /-          queue         /- +--------------+ +--------------+   /- +---+---+---+----+ /-   +--------------+ |   producer2  +---->X|m1 |m2 |m3 |... |/---->|  consumer2   | +--------------+   /- +---+---+---+----+ /-   +--------------+ +--------------+ /-                        /- +--------------+ |      ...     |/                           / |      ...     | +--------------+                              +--------------+ 

Round-robin 分发算法

RabbitMQ 中,如果有多个消费者同时消费同一个消息队列,那么就通过 Round-robin 算法将消息队列中的消息均匀的分配给每一个消费者。

这个算法其实很简单,每收到一个新的消息,就将这个消息分发给上下一个消费者。比如上一个消费者是 consumer-n ,那么有新消息来的时候就将这个新的消息发布到 consumer-n+1 ,以此类推,如果到了最后一个消费者,那么就又从第一个开始。即: consumer-index = (consumer-index + 1) mod consumer-number

为了演示,首先来做几项准备工作。

定义任务 task.js

/**  * 创建一个任务   * @param taskName 任务名字  * @param costTime 任务话费的时间  * @param callback 任务结束以后的回调函数  * @constructor  */ function Task(taskName ,costTime , callback){       if(typeof(costTime) !== 'number')         costTime = 0; // no delay there      setTimeout(function () {         console.log(taskName+" finished");         if(callback && typeof (callback) === 'function')             callback();     } , 1000*costTime); }; 

串行化的消息任务结构

任务发布者负责将该结构发布到队列中,然后消费者取出消息,新建任务开始执行。

{     taskName : 'taskname',     costTime : 1 } 

创建任务消息 task-producer.js

var amqp = require('amqplib/callback_api');  // 连接上RabbitMQ服务器 amqp.connect('amqp://localhost', function(err, conn) {       conn.createChannel(function(err, ch) {         var q = 'tasks';          // 得到发送消息的数目,默认发送4个         var name;         var cost;          (function () {             if(process.argv.length < 4 )             {                 console.error('ERROR : usage - node rabbit-producer <taskname> <costtime>');                 process.exit(-1);             }              name = process.argv[2];             cost = +process.argv[3];         })();          // 新建队列,然后将队列中的消息持久化取消         ch.assertQueue(q, {durable: true});         // 将任务串行化存入Buffer中,并推入队列         ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true});         console.log(" [x] Sent "+name);         setTimeout(function () {             process.exit(0);         },500);     }); }); 

消费任务消息 task-consumer.js

var amqp = require('amqplib/callback_api');   var Task = require('./task.js');  amqp.connect('amqp://localhost', function(err, conn) {       conn.createChannel(function(err, ch) {         var q = 'tasks';          ch.assertQueue(q, {durable: true});         console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);         // 监听队列上面的消息         ch.consume(q, function(msg) {             var obj = JSON.parse(msg.content.toString('utf8'));             console.log('Get the task '+obj.taskName);             // 定义新的任务             new Task(obj.taskName,obj.costTime);         }, {noAck: true});     }); }); 

现在开启两个消费者进程来等待消费 tasks 队列中的消息

# shell1 node task-consumer.js  # shell2 node task-consumer.js   

然后向队列中推入三个消息

# shell3 node task-producer.js task1 0   node task-producer.js task2 0   node task-producer.js task3 0   

运行结果

# shell1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1   task1 finished   Get the task task3   task3 finished  # shell2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2   task2 finished  # 已经通过Round-robin算法将消息队列中的消息分配到连接的消费者中了. 

消息,队列持久化

细心的读者可能已经发现了我们在 声明队列发送消息 的代码块中改动了一小部分的代码,那就是

// 声明队列 ch.assertQueue(q, {durable: true});  // 发送信息 ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true});   

通过将队列的 durable 配置参数生命为 true 可以保证在 RabbitMQ 服务器退出或者异常终止的情况下不会丢失 消息队列 ,注意这里只是不会丢失消息队列,并不是消息队列中没有被消费的 消息 不会丢失。

为了保证消息队列中的 消息 不会丢失,就需要在发送消息时指定 persistent 选项,这里并不能百分之百的保证消息不会丢失,因为从队列中有新的消息,到将队列中消息持久化到磁盘这一段时间之内是无法保证的。

消息的应答

现在存在这样一种场景,消费者取到消息,然后创建任务开始执行。但是任务执行到一半就抛出异常,那么这个任务算是没有被成功执行的。

在我们之前的代码实现中,都是消息队列中有新的消息,马上就这个消息分配给消费者消费,不管消费者对消息处理结果如何,消息队列会马上将已经分配的消息从消息队列中删除。如果这个任务非常重要,或者一定要执行成功,那么一旦任务在执行过程中抛出异常,那么这个任务就再也找不回来了,这是非常可怕的事情。

还好在 RabbitMQ 中我们可以为已经分配的消息和消息队列之间创建一个应答关系:

  • 如果消息处理 成功 ,那么就发送一个答复给消息队列,告诉它:我已经成功处理消息,不再需要这条消息了,你可以删除了,于是消息队列就将已经应答的消息从消息队列中删除。
  • 如果处理 失败 ,也就是没有收到应答,那么就将这条消息重新发送给该队列的其他消费者。

要在消费者和消息队列之间建立这种应答关系我们只需要将 channelconsume 函数的 noAck 参数设成 false 就可以了。

 ch.consume(q, function(msg) {             var obj = JSON.parse(msg.content.toString('utf8'));             console.log('Get the task '+obj.taskName);             // 定义新的任务             new Task(obj.taskName,obj.costTime);         }, {noAck: false}); // 这里设置成false 

下面我们就模拟一下消息处理失败的场景:

var amqp = require('amqplib/callback_api');   var Task = require('./task.js');  amqp.connect('amqp://localhost', function(err, conn) {       conn.createChannel(function(err, ch) {         var q = 'tasks';          ch.assertQueue(q, {durable: true});         console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);         // 监听队列上面的消息         ch.consume(q, function(msg) {             var obj = JSON.parse(msg.content.toString('utf8'));             console.log('Get the task '+obj.taskName);             // 定义新的任务             new Task(obj.taskName,obj.costTime,function(){                 if(obj.taskName === 'task2')                     throw new Error("Test error");                 else                     ch.ack(msg);             }); // 如果是任务二,那么就抛出异常。         }, {noAck: false});     }); }); 

按照上面的脚本执行顺序,我们在执行一遍脚本: consumer2 得到执行 task2 消息,然后马上抛出异常退出进行,然后消息队列再将这个消息分配给 cosumer1 ,接着也执行失败了,退出进程,最终消息队列中将只会有一个 task2 的消息存在。

启动消费者等待消息

# shell1 开启消费者1  node rabbit-consumer.js  # shell2 开启消费者2 node rabbit-consumer.js   

创建消息

node rabbit-producer.js task1 0   node rabbit-producer.js task2 10   node rabbit-producer.js task3 0   

我们能来看一下结果:

# shell2 消费者2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2   task2 finished # 消费者2执行任务2的时候抛出异常,task2将会重新发送给消费者1   ... throw  new Error('Error test');   # shell1 消费者1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1   task1 finished   Get the task task3   task3 finished   Get the task task2 # 消费者1接收到任何2   task2 finished   ... throw  new Error('Error test'); # 也抛出异常了 

最终会在消息队列中剩下一条未消费的信息。

这里有一点需要注意,如果你将 noAck 选项设置成了 false ,那么如果消息处理成功,一定要进行应答,负责消息队列中的消息会越来越多,直到撑爆内存。

更加均衡的负载

在上文中我们听到过消息队列通过 Round-robin 算法来将消息分配给消费者,但是这个分配过程是盲目的。比如现在有两个消费者, consumer1consumer2 ,按照 Round-robin 算法就会将 奇数 编号的任务发配给 consumer1 ,将 偶数 编号的任务分配给 consumer2 ,但是这些任务恰好有一个特性, 奇数 编号的任务比较繁重,而 偶数 编号的任务就比较简单。

那么这就会造成一个问题,那就是 consumer1 会被累死,而 consumer2 会被闲死。造成了负载不均衡。要是每一个消息都被成功消费以后告诉消息队列,然后消息队列再将新的消息分配给空闲下来的消费者不就好了。

RabbitMQ 中的确有这样的一个配置选项。那就是 ch.prefetch(1);

我们现在就来模拟一下

var amqp = require('amqplib/callback_api');   var Task = require('./task.js');  amqp.connect('amqp://localhost', function(err, conn) {       conn.createChannel(function(err, ch) {         var q = 'tasks';          ch.assertQueue(q, {durable: true});         console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);         // 监听队列上面的消息         ch.prefetch(1); // 添加这一行         ch.consume(q, function(msg) {             var obj = JSON.parse(msg.content.toString('utf8'));             console.log('Get the task '+obj.taskName);             new Task(obj.taskName,obj.costTime ,function () {                 ch.ack(msg);             });         }, {noAck: false});     }); }); 

启动消费者等待消息

# shell1 开启消费者1  node rabbit-consumer.js  # shell2 开启消费者2 node rabbit-consumer.js   

创建消息

node rabbit-producer.js task1 0   node rabbit-producer.js task2 20   node rabbit-producer.js task3 0   node rabbit-producer.js task4 20   
# shell1 开启消费者1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 # 任务马上结束   task1 finished   Get the task task3 # 任务马上结束   task3 finished   Get the task task4 # 任务四被分配到consumer1中了   task4 finished  # shell2 开启消费者2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2   task2 finished   
原文  http://blog-qeesung.rhcloud.com/rabbitmq-ji-chu-jiao-cheng-2-work-queue/
正文到此结束
Loading...