不为有趣之事,何遣有涯之生
不失其所者久,死而不亡者寿

AMQ简明教程(9) AMQ消息持久化存储

AMQ消息存储持久化

概述

ActiveMQ不仅支持persistent和non-persistent两种方式,还支持消息的恢复( recovery )方式。

PTP

Queue的存储是很简单的,就是一个FIFO的Queue

PUB/SUB

对于持久化订阅主题,每一个消费者将获得一个消息的复制。

ActiveMQ提供了一个插件式的消息存储,类似于消息的多点传播,主要实现了如下几种:
1:AMQ消息存储-基于文件的存储方式,是以前的默认消息存储
2:KahaDB消息存储-提供了容量的提升和恢复能力,是现在的默认存储方式
3:JDBC消息存储-消息基于JDBC存储的
4:Memory 消息存储-基于内存的消息存储

KahaDB Message Store概述

KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。

KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。

在Kaha中,数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。

KahaDB基本配置例子

<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

可用的属性有:
1:director:KahaDB存放的路径,默认值activemq-data
2:indexWriteBatchSize: 批量写入磁盘的索引page数量,默认值1000
3:indexCacheSize:内存中缓存索引page的数量,默认值10000
4:enableIndexWriteAsync:是否异步写出索引,默认false
5:journalMaxFileLength:设置每个消息data log的大小,默认是32MB
6:enableJournalDiskSyncs:设置是否保证每个没有事务的内容,被同步写入磁盘,JMS持久化的时候需要,默认为true
7:cleanupInterval:在检查到不再使用的消息后,在具体删除消息前的时间,默认30000
8:checkpointInterval:checkpoint的间隔时间,默认5000
9:ignoreMissingJournalfiles:是否忽略丢失的消息日志文件,默认false
10:checkForCorruptJournalFiles:在启动的时候,将会验证消息文件是否损坏,默认false
11:checksumJournalFiles:是否为每个消息日志文件提供checksum,默认false
12:archiveDataLogs: 是否移动文件到特定的路径,而不是删除它们,默认false
13:directoryArchive:定义消息已经被消费过后,移动data log到的路径,默认null
14:databaseLockedWaitDelay:获得数据库锁的等待时间(used by shared master/slave),默认10000
15:maxAsyncJobs:设置最大的可以存储的异步消息队列,默认值10000,可以和concurrentMessageProducers 设置成一样的值
16:concurrentStoreAndDispatchTransactions:是否分发消息到客户端,同时事务存储消息,默认true
17:concurrentStoreAndDispatchTopics:是否分发Topic消息到客户端,同时进行存储,默认true
18:concurrentStoreAndDispatchQueues:是否分发queue消息到客户端,同时进行存储,默认true

在Java中内嵌使用Broker,使用KahaDB的例子

public class EmbeddedBrokerUsingKahaDBStoreExample {
BrokerService createEmbeddedBroker() throws Exception {
BrokerService broker = new BrokerService();
File dataFileDir = new File("target/amq-in-action/kahadb");
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(dataFileDir);
// Using a bigger journal file
kaha.setJournalMaxFileLength(1024*100);
// small batch means more frequent and smaller writes
kaha.setIndexWriteBatchSize(100);
// do the index write in a separate thread
kaha.setEnableIndexWriteAsync(true);
broker.setPersistenceAdapter(kaha);
//create a transport connector
broker.addConnector("tcp://localhost:61616");
//start the broker
broker.start();
return broker;
}
}

AMQ Message Store概述

AMQ Message Store是ActiveMQ5.0缺省的持久化存储,它是一个基于文件、事务存储设计为快速消息存储的一个结构,该结构是以流的形式来进行消息交互的。

这种方式中,Messages被保存到data logs中,同时被reference store进行索引以提高存取速度。

Date logs由一些单独的data log文件组成,缺省的文件大小是32M,如果某个消息的大小超过了data log文件的大小,那么可以修改配置以增加
data log文件的大小。如果某个data log文件中所有的消息都被成功消费了,那么这个data log文件将会被标记,以便在下一轮的清理中被删除或者归档。

AMQ Message Store配置示例:

<broker brokerName="broker" persistent="true" useShutdownHook="false">
<persistenceAdapter>
<amqPersistenceAdapter directory="${activemq.base}/data"
maxFileLength="32mb"/>
</persistenceAdapter>
</broker>

使用JDBC来持久化消息

ActiveMQ支持使用JDBC来持久化消息,预定义的表如下:
1:消息表,缺省表名为ACTIVEMQ_MSGS,queue和topic都存在里面,结构如下:

2:ACTIVEMQ_ACKS表存储持久订阅的信息和最后一个持久订阅接收的消息ID,结构如下:

3:锁定表,缺省表名为ACTIVEMQ_LOCK,用来确保在某一时刻,只能有一个
ActiveMQ broker实例来访问数据库,结构如下:

使用JDBC来持久化消息的配置示例:

<beans>
<broker brokerName="test-broker" persistent=true
xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource=“#mysql-ds"/>
</persistenceAdapter>
</broker>
<bean name="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName"><value>org.gjt.mm.mysql.Driver</value></property>
<property
name="url"><value>jdbc:mysql://192.168.1.100:3306/test?useUnicode=true&characterEncodi
ng=UTF-8</value></property>
<property name="username"> <value>root</value> </property>
<property name="password" value="cc"/>
</bean>

JDBC Message Store with ActiveMQ Journal

这种方式克服了JDBC Store的不足,使用快速的缓存写入技术,大大提高了性能。配置示例如下:
<beans>
<broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#derby-ds"
dataDirectory="activemq-data" />
</persistenceFactory>
</broker>
</beans>

JDBC Store和JDBC Message Store with ActiveMQ Journal的区别
1:Jdbc with journal的性能优于jdbc
2:Jdbc用于master/slave模式的数据库分享
3:Jdbc with journal不能用于master/slave模式
4:一般情况下,推荐使用jdbc with journal

Memory Message Store

内存消息存储主要是存储所有的持久化的消息在内存中。这里没有动态的缓存存在,所以
你必须注意设置你的broker所在的JVM和内存限制

Memory Message Store配置示例:

<beans>
<broker brokerName="test-broker" persistent="false"
xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:61635"/>
</transportConnectors>
</broker>
</beans>

在Java中内嵌使用Broker,使用Memory的例子:

public void createEmbeddedBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector("tcp://localhost:61616");
broker.start();
}
未经允许不得转载:菡萏如佳人 » AMQ简明教程(9)

欢迎加入极客江湖

进入江湖关于作者