转载

RabbitMQ Simplest Queue

RabbitMQ 简单使用

RabbitMQ是消息的代理,最主要的作用就是接收消息并转发,可以把它看成一个邮局,而你要发送的消息就像是信件,你只需要在信件上填上收件人地址,然后把信件扔进邮筒,邮递员就会为你邮递信件。

所以也可以这么说 RabbitMQ 就是一个邮箱、邮局、快递员的组合体。

现在我们来看看最简单的队列模式,包含

producer

producer我们把产生消息的实体称为 producer ,比如说投递邮件的人,文中用 P 来表示。

RabbitMQ Simplest Queue

queue

queue存放消息的地方称为 queue ,例如邮筒或者邮箱。

RabbitMQ Simplest Queue

consumer

consumer消息的消费者,例如邮递员,文中用 C 表示。

RabbitMQ Simplest Queue

最简单的队列模式

生产者将需要传递的消息存入队列,然后另一端的消费者使用该消息

RabbitMQ Simplest Queue

代码演示HelloWorld

下载 RabbitMQ 客户端

  1. 点击 amqp-client-4.0.0.jar 下载 jar 包
  2. Maven

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>4.0.0</version>
    </dependency>
    
  3. Gradle

    dependencies {
      compile 'com.rabbitmq:amqp-client:4.0.0'
    }
    

Sending

我们创建一个 Send 类,该类连接到 RabbitMQ ,发送一条消息,然后退出

public class Send {

private static final Logger LOG = LoggerFactory.getLogger(Send.class);

private Channel channel;
private Connection connection;
private static final String QUEUE_NAME = "hello_world";
private static final String host = "localhost";

public Send() {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    try {
        this.connection = factory.newConnection();
        this.channel = connection.createChannel();
    } catch (IOException e) {
        LOG.error(e.getMessage(), e);
    } catch (TimeoutException e) {
        LOG.error(e.getMessage(), e);
    }
}

public void producer(String message) {
    try {
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
    } catch (IOException e) {
        LOG.error(e.getMessage(), e);
    }
}

public void close() {
    try {
        this.channel.close();
        this.connection.close();
    } catch (IOException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }
}

public static void main(String[] args) throws IOException, TimeoutException {

    Send send = new Send();
    send.producer("Hello World");
    send.close();
}

}

Receiving

从队列中获取消息,会持有对队列消息的监听

public class Recv {

private final static String QUEUE_NAME = "hello_world";

private static final String host = "localhost";

public static void consume() throws java.io.IOException,
        java.lang.InterruptedException, TimeoutException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);
}

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
    consume();
}

}

运行 Send、运行 Recv

不出意外的话,会在控制台打印 [x] Received Hello World

Note

先开启队列服务。

如果在队列服务器中不存在定义的队列名称,则会自动创建。

发送的消息如果没有被消费,会一直存在队列中,例如我们可以一直运行 Send ,会发现队列中的 Ready 消息会累加,当运行 Recv 后,消费掉消息才会进行清除。

Waring存在的问题:如果消息消费者在消费信息的过程中出现意外,( its channel is closed, connection is closed, or TCP connection is lost ),那么消息可能还没处理完,但是队列服务器中该消息已经不存在了,就相当于邮递员给你送信,但是送信的途中发生了意外,那么你的信就丢失了。

处理消息丢失问题

上面的队列存在问题,由于 ack == true ,当生产者发送的消息到达消费者端时,那么该条消息就立刻从内存中清除掉了,而不管你消费者是否已经成功处理,比如生产者发送了 100 条消息,当消费者连接上后,这 100 条消息就被从内存中清除,而消费者可能仅仅只成功处理了 10 条。

解决上述问题,我们需要在消息被成功处理后,手动告知队列,我们已经处理好了,现在可以清除了。如果处理的过程中失败了,那么这条消息会再次进行处理,直到成功处理为止。

更改上述代码

生产者将队列名称改成

private static final String QUEUE_NAME = &quot;hello_world_ack_false&quot;;

其他保持不变。

消费者:

Receiving

从队列中获取消息,会持有对队列消息的监听

public class Recv {

private final static String QUEUE_NAME = "hello_world_ack_false";

private static final String host = "localhost";

public static void consume() throws java.io.IOException,
        java.lang.InterruptedException, TimeoutException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    channel.basicQos(1); //设置每个消费者同时只处理一条消息
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(envelope.getDeliveryTag(), false); // 在消息被正确处理后,手动 ack 
        }
    };
    channel.basicConsume(QUEUE_NAME, false, consumer);// 自动 ack 设置为 false
}

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
    consume();
}

}

然后再次进行运行,会发现,如果在消息处理阶段出现异常导致没处理成功时,在消息队列中,这条消息还将存在,标志位 Unacked,如下图。

RabbitMQ Simplest Queue

使用场景

以上我们就完成了 RabbitMQ 的最简单的使用,利用这个我们可以在 新用户注册时候发送邮件通知,或者秒杀活动修改订单数据等场景下使用。

当然 RabbitMQ 的更高级特性,例如 发布/订阅 ,Topic 我们以后有时间可以去了解,当然也不难的。

比如我们可以利用 Topic 搭建日志系统,将 error 输出在文件中,将 info 打印在控制台等等。

关于更高级点的功能,下次再一探究竟。

本文参照: RabbitMQ HelloWorld 详解

原文  http://kuka.im/2016/12/17/RabbitMQ_Simplest_Queue/
正文到此结束
Loading...