转载

ActiveMQ - 消息持久化

消息存储

ActiveMQ有点对点和发布订阅两种方式,这两种的消息存储还是有稍微一点区别。

点对点

队列的存储比较简单,就是先进先出(FIFO),只有当该消息已被消费和确认可以删除消息存储。如果没有被确认,其他消费者是不能获取消息的。

ActiveMQ - 消息持久化

看看下面的例子:

生产者发送了10条消息:

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageProducer producer = null;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createQueue("test.persistence");
        // 创建一个生产者
        producer = session.createProducer(destination);
        // 创建消息
        for (int i = 0; i < 10; i++) {
            Message message = session.createTextMessage("this is test.persistence" + i);
            // 发送消息
            producer.send(message);
        }

    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        try {
            if (producer != null) {
                producer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消费者1消费5条,但是暂时没确认

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageConsumer consumer = null;

    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.CLIENT_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createQueue("test.persistence");
        // 创建一个消费者
        consumer = session.createConsumer(destination);
        // 接收消息
        for (int i = 0; i < 5; i++) {
            Message message = consumer.receive();
            TimeUnit.SECONDS.sleep(1);
            System.out.println("consumer1 receive:" + ((TextMessage) message).getText());
            if (i == 4) {
                TimeUnit.SECONDS.sleep(10);
                message.acknowledge();
            }
        }
    } catch (JMSException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        try {
            if (consumer != null) {
                consumer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消费者2此时就阻塞在获取消息上面,直到消费者1确认后才接收到消息

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageConsumer consumer = null;

    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.CLIENT_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createQueue("test.persistence");
        // 创建一个消费者
        consumer = session.createConsumer(destination);
        // 接收消息
        for (int i = 0; i < 5; i++) {
            Message message = consumer.receive();
            System.out.println("consumer1 receive:" + ((TextMessage) message).getText());
            if (i == 4) {
                message.acknowledge();
            }
        }
    } catch (JMSException e) {
        e.printStackTrace();
    }  finally {
        try {
            if (consumer != null) {
                consumer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

发布订阅

在发布订阅中,每个消费者都会获取到消息的拷贝,为了节约空间,broker只存储了一份消息,并存储了每个消费者所消费的信息,这样每个消费者虽然有不同的消费进度,最终还是能一次获取到消息。如果消息被所有订阅者消费完了,broker就可以删除这个消息。

ActiveMQ - 消息持久化

存储方式

ActiveMQ提供了多种存储方式,比如AMQ、KahaDB、JDBC、内存。

AMQ

参考官网

AMQ是早期版本的默认持久化存储方式,基于文件的事务存储,对于消息的存储进行了调优,速度还是非常快的。默认大小32M。当消息被成功使用时,就会被标记为清理或者存档,这个操作将在下个清理时发送。基本配置如下(其他参数详见官网):

<broker persistent="true" xmlns="http://activemq.apache.org/schema/core">
...
<persistenceAdapter>
<amqPersistenceAdapter/>
</persistenceAdapter>
...
</broker>

KahaDB

5.4以后默认的持久化存储方式,也是基于文件的,与AMQ不同的是,KahaDB采用了B-Tree存储的布局。拥有高性能和可扩展性等特点。基本配置如下:

<broker brokerName="broker" persistent="true" useShutdownHook="false">
...
<persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
</persistenceAdapter>
...
</broker>

JDBC

JDBC有三个表,两个表存储消息,还有一个表当做锁用,保证只能一个代理访问broker。配置如下:

先把默认的kahaDB注释掉,再用下面的替换,注意这个配置是在broker下面。

<persistenceAdapter>
    <jdbcPersistenceAdapter  dataSource="#mysql-ds"/>
</persistenceAdapter>

然后再增加数据库配置,注意,这个配置是broker外面,跟其他bea同级。

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"
    destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="123456"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>

在启动activemq.bat,之前,先建数据库,并且编码设置为latin1,不然ACTIVEMQ_ACKS表会创建失败导致启动不了。然后根据报错信息一次在lib下加入commons-dbcp-1.4.jar、commons-pool-1.5.4.jar、mysql-connector-java-5.1.36.jar包。如下所示,帮我们建了三个表:

ActiveMQ - 消息持久化

activemq_acks用于保存持久化订阅信息。

ActiveMQ - 消息持久化

activemq_lock,用于broker集群时的Master选举。

ActiveMQ - 消息持久化

activemq_msgs,用于存储消息信息。

ActiveMQ - 消息持久化

点对点

代码参考之前的点对点

演示步骤如下:

1、 启动生产者,发送消息

2、 查看表数据如下:

ActiveMQ - 消息持久化

3、 启动消费者,消息消费后表数据被删除

发布订阅

代码参考之前的发布订阅

演示步骤如下:

1、 启动生产者,发送消息。

2、 查看表数据,并没有持久化,所以发布订阅默认不持久化的。

3、 启动消费者,没有消息被消费。

为了让消息可以被消费者消费,我们可以这样做:

消费者代码如下

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    // 这边的Topic
    Topic destination;
    TopicSubscriber consumer = null;
    Message message;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = connectionFactory.createConnection();
        // 设置ClientId
        connection.setClientID("ZhangSan");
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createTopic("test.topic.mysql");
        // 创建一个消费者,调用createDurableSubscriber方法
        consumer = session.createDurableSubscriber(destination,"my");
        // 接收一个消息
        while (null != (message = consumer.receive())) {
            System.out.println("consumer receive:" + ((TextMessage) message).getText());
        }
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {

    }
}

启动两个消费者者,activemq_acks表数据如下,已经有了两个数据

ActiveMQ - 消息持久化

activemq的subscribers界面如下,也有两个数据,如果我们消费者下线了,就会到Offline Durable Topic Subscribers列表

ActiveMQ - 消息持久化

修改消费者的代码后,生产者发送消息的时候,如果消费者在线,就直接消费,如果不在线,上线后还可以继续消费,下图是消费者消费了几次后,LAST_ACKED_ID变成了4。

ActiveMQ - 消息持久化

发布订阅默认不持久化,所以生产者代码可以这样修改,加下面一句话

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

生产者发送消息后,activemq_msgs表数据如下,与点对点不一样的是,这个消息被消费后,不会被删除。

ActiveMQ - 消息持久化

日志类型的JDBC

由于JDBC的性能相对比较差,所以activemq还提供了日志类型的jdbc,确保了JMS事务的一致性。因为它整合了非常快的信息写入与缓存技术,它可以显着提高性能。配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
        <journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
        </persistenceAdapter>
    </broker>
    <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
        <property name="databaseName" value="derbydb"/>
        <property name="createDatabase" value="create"/>
    </bean>
</beans>

虽然性能比jdbc快,但是他不支持master/slave。

内存

把消息存储在内存中,所以没有持久化的功能,因此要保证内存足够大,来缓存消息。

<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker"
    persistent="false"
    xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors>
            <transportConnector uri="tcp://localhost:61635"/>
        </transportConnectors>
    </broker>
</beans>
原文  https://segmentfault.com/a/1190000022481554
正文到此结束
Loading...