不为有趣之事,何遣有涯之生
不失其所者久,死而不亡者寿

AMQ简明教程(10) AMQ的静态网络链接与容错链接

AMQ的静态网络链接与容错链接

一台机器上启动多个broker步骤如下:

1:把整个conf文件夹复制一份,比如叫做conf2
2:修改里面的activemq.xml文件
(1)里面的brokerName 不能跟原来的重复
(2)数据存放的文件名称不能重复,比如:

<kahaDB directory="${activemq.data}/kahadb_2"/>

(3)所有涉及的transportConnectors 的端口,都要跟前面的不一样
3:修改jetty.xml,主要就是修改端口,比如:

<property name=“port” value=“8181”/> 端口必须和前面的不一样

4:到bin下面,复制一个activemq,比如叫做activemq2:
(1)修改程序的id,不能和前面的重复ACTIVEMQ_PIDFILE="$ACTIVEMQ_DATA/activemq2-hostname.pid"
(2)修改配置文件路径ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf2"
(3)修改端口,里面有个tcp的61616的端口,要改成不一样的,最好跟activemq.xml里面的tcp的端口一致
(4)然后就可以执行了,如果执行没有权限的话,就授权:chmod 751 activemq2

ActiveMQ的networkConnector是什么

在某些场景下,需要多个ActiveMQ的Broker做集群,那么就涉及到Broker到Broker的通信,这个
被称为ActiveMQ的networkConnector。
ActiveMQ的networkConnector默认是单向的,一个Broker在一端发送消息,另一Broker在另一
端接收消息。这就是所谓的“桥接”。 ActiveMQ也支持双向链接,创建一个双向的通道对于两个
Broker,不仅发送消息而且也能从相同的通道来接收消息,通常作为duplex connector来映射,如下:

“discovery“的概念
一般情况下,discovery是被用来发现远程的服务,客户端通常想去发现所有可利用的brokers;另一层意思,它是基于现有的网络Broker去发现其他可用的Brokers。

有两种配置Client到Broker的链接方式,一种方式:Client通过Statically配置的方式去连接Broker,一种方式:Client通过discovery agents来dynamically的发现Brokers

Static networks

Static networkConnector是用于创建一个静态的配置对于网络中的多个Broker。这
种协议用于复合url,一个复合url包括多个url地址。格式如下:
static:(uri1,uri2,uri3,...)?key=value
1:配置示例如下:

<networkConnectors>
<networkConnector name="local network" uri="static://(tcp://remotehost1:61616,tcp://remotehost2:61616)"/>
</networkConnectors>

Static networkConnector的基本原理示意图:

上图中,两个Brokers是通过一个static的协议来网络链接的。一个Consumer链接到brokerB的一个地址上 ,当Producer在brokerA上以相同的地址
发送消息时,此时它将被转移到brokerB上。也就是,BrokerA会转发消息到BrokerB上。

networkConnector配置的可用属性:
1:name:默认是bridge
2:dynamicOnly:默认是false,如果为true, 持久订时才创建对应的网路持久订阅。默认是启动时激活
3:decreaseNetworkConsumerPriority:默认是false。设定消费者优先权,如果为true,网络的消费者优先级降低为-5。如果为false,则默认跟本地消费者一样为0

一般的静态桥接的消费者消费的比本地消费者来的更快(桥接方的消费者被偏爱了),这一点要注意

4:networkTTL :默认是1 ,网络中用于消息和订阅消费的broker数量
5:messageTTL :默认是1 ,网络中用于消息的broker数量
6:consumerTTL:默认是1 ,网络中用于消费的broker数量
7:conduitSubscriptions :默认true,是否把同一个broker的多个consumer当做一个来处理

负载均衡的时候一般设置为false,否则负载均衡可能失效,因为所有的消费者被看成一个后,可能所有消息都给某一个消费者处理了

8:dynamicallyIncludedDestinations :默认为空,要包括的动态消息地址,类似于excludedDestinations,过滤一部分消息转发给桥接的broker
过滤要转移的消息如:

<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>

9:staticallyIncludedDestinations :默认为空,要包括的静态消息地址。类似于excludedDestinations,如:

<staticallyIncludedDestinations>
<queue physicalName="always.include.queue"/>
</staticallyIncludedDestinations>

10:excludedDestinations :默认为空,指定排除的地址,示例如下:

<networkConnectors>
<networkConnector uri="static://(tcp://localhost:61617)"
name="bridge" dynamicOnly="false" conduitSubscriptions="true"
decreaseNetworkConsumerPriority="false">
<excludedDestinations>
<queue physicalName="exclude.test.foo"/>
<topic physicalName="exclude.test.bar"/>
</excludedDestinations>
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
<staticallyIncludedDestinations>
<queue physicalName="always.include.queue"/>
<topic physicalName="always.include.topic"/>
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>

11:duplex :默认false,设置是否能双向通信

两个broker消息互通

12:prefetchSize :默认是1000,持有的未确认的最大消息数量,必须大于0,因为网络消费者不能自己轮询消息
13:suppressDuplicateQueueSubscriptions:默认false,如果为true, 重复的订阅关系一产生即被阻止
14:bridgeTempDestinations :默认true,是否广播advisory messages来创建临时destination
15:alwaysSyncSend :默认false,如果为true,非持久化消息也将使用request/reply方式代替oneway方式发送到远程broker。
16:staticBridge :默认false,如果为true,只有staticallyIncludedDestinations中配置的destination可以被处理。

多线程consumer访问集群

static静态桥接的consumer具有的优先更高, 例子:A--->B 消息发送到A上,A上的消费者与B上的消费者相比,B上的优先级更高

集群下的消息回流功能

“丢失”的消息
有这样的场景,broker1和broker2通过networkConnector连接,一些consumers连接到broker1,
消费broker2上的消息。消息先被broker1从broker2上消费掉,然后转发给这些consumers。不幸的是转
发部分消息的时候broker1重启了,这些consumers发现broker1连接失败,通过failover连接到broker2
上去了,但是有一部分他们还没有消费的消息被broker2已经分发到了broker1上去了。这些消息,就好
像是消失了,除非有消费者重新连接到broker1上来消费。(即时配置成双向连接,也是取不到的)

怎么办呢?
从5.6版起,在destinationPolicy上新增的选项replayWhenNoConsumers。这个选项使得broker1
上有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为
false,为了防止消息回流后被当做重复消息而不被分发,示例如下:

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

注意:需要回流的broker中双方都要配置

容错的连接Failover Protocol

前面讲述的都是Client配置链接到指定的broker上。但是,如果Broker的链接失败怎么办呢?此时,Client有两个选项:要么立刻死掉,要么去连接到其它的broker上。

Failover协议实现了自动重新链接的逻辑。
这里有两种方式提供了稳定的brokers列表对于Client链接。
第一种方式:提供一个static的可用的Brokers列表。第二种方式:提供一个dynamic 发现的可用Brokers。
Failover Protocol 的配置方式

failover:(uri1,...,uriN)?key=value 或者 failover:uri1,...,uriN

Failover Protocol 的默认配置
默认情况下,这种协议用于随机的去选择一个链接去链接,如果链接失败了,那么会链接到其他的Broker上。

默认的配置定义了延迟重新链接,意味着传输将会在10秒后自动的去重新链接可用的broker。当然所有的重新链接参数都可以根据应用的需要而配置。

Failover Protocol 的使用示例,在客户端程序里面:

ConnectionFactory connectionFactory = newActiveMQConnectionFactory("failover:(tcp://192.168.1.106:61679,tcp://192.168.1.106:61819)?randomize=false");

如果randomize = true 比较平均的交给可以用的broker,但是在false的情况下,只要第一个broker有效,就会一直往第一个发

Failover Protocol 可用的配置参数

1:initialReconnectDelay:在第一次尝试重连之前等待的时间长度(毫秒),默认10
2:maxReconnectDelay:最长重连的时间间隔(毫秒),默认30000
3:useExponentialBackOff:重连时间间隔是否以指数形式增长,默认true
4:backOffMultiplier:递增倍数,默认2.0
5:maxReconnectAttempts: 默认-1|0,自版本5.6起:-1为默认值,代表不限重试次数;0代表从不重试
(只尝试连接一次,并不重连),5.6以前的版本:0为默认值,代表不限重试次数所有版本:如果设置为大于0的数,代表最大重试次数
6:startupMaxReconnectAttempts:初始化时的最大重连次数。一旦连接上,将使用maxReconnectAttempts的配置,默认0
7:randomize:使用随机链接,以达到负载均衡的目的,默认true
8:backup:提前初始化一个未使用连接,以便进行快速失败转移,默认false
9:timeout:设置发送操作的超时时间(毫秒),默认-1
10:trackMessages:设置是否缓存[故障发生时]尚未传送完成的消息,当broker一旦重新连接成功,便将这些缓存中的消息刷新到新连接的代理中,使得消息可以在broker切换前后顺利传送,默认false
11:maxCacheSize:当trackMessages启用时,缓存的最大字节,默认为128*1024bytes
12:updateURIsSupported:设定是否可以动态修改broker uri(自版本5.4起),默认true

未经允许不得转载:菡萏如佳人 » AMQ简明教程(10)

欢迎加入极客江湖

进入江湖关于作者