RabbitMQ 简单使用
RabbitMQ是消息的代理,最主要的作用就是接收消息并转发,可以把它看成一个邮局,而你要发送的消息就像是信件,你只需要在信件上填上收件人地址,然后把信件扔进邮筒,邮递员就会为你邮递信件。
所以也可以这么说 RabbitMQ 就是一个邮箱、邮局、快递员的组合体。
现在我们来看看最简单的队列模式,包含
producer我们把产生消息的实体称为 producer ,比如说投递邮件的人,文中用 P 来表示。
queue存放消息的地方称为 queue ,例如邮筒或者邮箱。
consumer消息的消费者,例如邮递员,文中用 C 表示。
生产者将需要传递的消息存入队列,然后另一端的消费者使用该消息
Maven
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.0</version> </dependency>
Gradle
dependencies {
compile 'com.rabbitmq:amqp-client:4.0.0'
}
我们创建一个 Send 类,该类连接到 RabbitMQ ,发送一条消息,然后退出
public class Send {
private static final Logger LOG = LoggerFactory.getLogger(Send.class);
private Channel channel;
private Connection connection;
private static final String QUEUE_NAME = "hello_world";
private static final String host = "localhost";
public Send() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
try {
this.connection = factory.newConnection();
this.channel = connection.createChannel();
} catch (IOException e) {
LOG.error(e.getMessage(), e);
} catch (TimeoutException e) {
LOG.error(e.getMessage(), e);
}
}
public void producer(String message) {
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
public void close() {
try {
this.channel.close();
this.connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException, TimeoutException {
Send send = new Send();
send.producer("Hello World");
send.close();
}
}
从队列中获取消息,会持有对队列消息的监听
public class Recv {
private final static String QUEUE_NAME = "hello_world";
private static final String host = "localhost";
public static void consume() throws java.io.IOException,
java.lang.InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
consume();
}
}
不出意外的话,会在控制台打印 [x] Received Hello World
先开启队列服务。
如果在队列服务器中不存在定义的队列名称,则会自动创建。
发送的消息如果没有被消费,会一直存在队列中,例如我们可以一直运行 Send ,会发现队列中的 Ready 消息会累加,当运行 Recv 后,消费掉消息才会进行清除。
Waring存在的问题:如果消息消费者在消费信息的过程中出现意外,( its channel is closed, connection is closed, or TCP connection is lost ),那么消息可能还没处理完,但是队列服务器中该消息已经不存在了,就相当于邮递员给你送信,但是送信的途中发生了意外,那么你的信就丢失了。
上面的队列存在问题,由于 ack == true ,当生产者发送的消息到达消费者端时,那么该条消息就立刻从内存中清除掉了,而不管你消费者是否已经成功处理,比如生产者发送了 100 条消息,当消费者连接上后,这 100 条消息就被从内存中清除,而消费者可能仅仅只成功处理了 10 条。
解决上述问题,我们需要在消息被成功处理后,手动告知队列,我们已经处理好了,现在可以清除了。如果处理的过程中失败了,那么这条消息会再次进行处理,直到成功处理为止。
更改上述代码
生产者将队列名称改成
private static final String QUEUE_NAME = "hello_world_ack_false";
其他保持不变。
消费者:
从队列中获取消息,会持有对队列消息的监听
public class Recv {
private final static String QUEUE_NAME = "hello_world_ack_false";
private static final String host = "localhost";
public static void consume() throws java.io.IOException,
java.lang.InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1); //设置每个消费者同时只处理一条消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), false); // 在消息被正确处理后,手动 ack
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);// 自动 ack 设置为 false
}
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
consume();
}
}
然后再次进行运行,会发现,如果在消息处理阶段出现异常导致没处理成功时,在消息队列中,这条消息还将存在,标志位 Unacked,如下图。
以上我们就完成了 RabbitMQ 的最简单的使用,利用这个我们可以在 新用户注册时候发送邮件通知,或者秒杀活动修改订单数据等场景下使用。
当然 RabbitMQ 的更高级特性,例如 发布/订阅 ,Topic 我们以后有时间可以去了解,当然也不难的。
比如我们可以利用 Topic 搭建日志系统,将 error 输出在文件中,将 info 打印在控制台等等。
关于更高级点的功能,下次再一探究竟。
本文参照: RabbitMQ HelloWorld 详解