文档库 最新最全的文档下载
当前位置:文档库 › RocketMQ集群部署

RocketMQ集群部署

RocketMQ集群部署
RocketMQ集群部署

RocketMQ集群部署

网络部署图

RocketMQ 网络部署特点

?Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

?Broker 部署相对复杂,Broker 分为Master 与Slave,一个Master 可以对应多个Slave,但是一个Slave 只能对应一个Master,Master 与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0 表示Master,非0 表示Slave。Master 也可以部署

多个。每个Broker 与Name

?Producer 与Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从Name Server 取Topic 路由信息,并向提供Topic 服务的Master 建立长连接,且定时向Master 发送心跳。

Producer 完全无状态,可集群部署。

?Consumer 与Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从Name Server 取Topic 路由信息,并向提供Topic 服务的Master、Slave 建立长连接,且定时向

Master、Slave 发送心跳。Consumer既可以从Master 订阅消息,也可以从Slave 订阅消息,订阅规则由Broker 配置决定。

Broker集群部署方式主要有以下几种:(Slave 不可写,但可读)

单个Master

这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。

多Master模式

一个集群无Slave,全是Master,例如 2 个Master 或者 3 个Master。

优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为RAID10 时,即使机器宕机不可恢复情况下,由于RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。

缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。

先启动NameServer,例如机器IP 为:192.168.1.101:9876

nohup sh mqnamesrv &

?在机器A,启动第一个Master

nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/ conf/2m-noslave/broker-a.properties &

?在机器B,启动第二个Master

nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/ conf/2m-noslave/broker-b.properties &

多Master多Slave模式,异步复制

每个Master 配置一个Slave,有多对Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。

优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master 宕机后,消费者仍然可以从Slave 消费,此过程对应用透明。不需要人工干预。性能同多Master 模式几乎一样。

缺点:Master宕机,磁盘损坏情况,会丢失少量消息。

先启动两台服务器的NameServer,例如机器IP 为:192.168.1.101:9876 和192.168.1.102:9876

nohup sh mqnamesrv 1>$ROCKETMQ_HOME/log/ng.log2>$ROCKETMQ _HOME/log/ng-error.log &

?在机器A,启动第一个Master

nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/ conf/2m-2s-async/broker-a.properties >$ROCKETMQ_HOME/log/ mq.log&

?在机器B,启动第二个Master

nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/ conf/2m-2s-async/broker-b.properties >$ROCKETMQ_HOME/log/ mq.log&

?在机器C,启动第一个Slave

nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/ conf/2m-2s-async/broker-a-s.properties >$ROCKETMQ_HOME/lo g/mq.log&

?在机器D,启动第二个Slave

nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/ conf/2m-2s-async/broker-b-s.properties >$ROCKETMQ_HOME/lo g/mq.log&

多Master多Slave模式,同步双写

每个Master 配置一个Slave,有多对Master-Slave,HA 采用同步双写方式,主备都写成功,向应用返回成功。

优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高

缺点:性能比异步复制模式略低,大约低10%左右,发送单个消息的RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。

先启动两台服务器的NameServer,例如机器IP 为:192.168.1.101:9876 和192.168.1.102:9876

nohup sh mqnamesrv 1>$ROCKETMQ_HOME/log/ng.log2>$ROCKETMQ _HOME/log/ng-error.log &

?在机器A,启动第一个Master

nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/ conf/2m-2s-sync/broker-a.properties >$ROCKETMQ_HOME/log/m q.log&

?在机器B,启动第二个Master

nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/ conf/2m-2s-sync/broker-b.properties >$ROCKETMQ_HOME/log/m q.log&

?在机器C,启动第一个Slave

nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/ conf/2m-2s-sync/broker-a-s.properties >$ROCKETMQ_HOME/log /mq.log&

?在机器D,启动第二个Slave

nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/ conf/2m-2s-sync/broker-b-s.properties >$ROCKETMQ_HOME/log /mq.log&

以上Broker 与Slave 配对是通过指定相同的brokerName 参数来配对,Master 的BrokerId 必须是0,Slave 的BrokerId 必须是大与0 的数。另外一个Master 下面可以挂载多个Slave,同一Master 下的多个

Slave 通过指定不同的BrokerId 来区分。

除此之外,nameserver也需要集群。

下面以配置一主一备(同步),2个nameserver为例测试。

1、环境两台机器:

?192.168.36.101 为主

?192.168.36.102 为备

同时在2台机器个启动一个nameserver。安装RocketMq请参考:

https://www.wendangku.net/doc/fa13989208.html,/zhu_tianwei/article/details/40948447

2、修改配置

(1)创建目录

mkdir /usr/local/alibaba-rocketmq/log#创建日志目录

mkdir -p /usr/local/alibaba-rocketmq/data/store/commitlog

#创建数据存储目录

更改日志目录

cd /usr/local/alibaba-rocketmq/conf

sed -i 's#${user.home}#${user.home}/alibaba-rocketmq#g'*. xml

(2)修改主配置

vi ./conf/2m-2s-sync/broker-a.properties

#Broker所属哪个集群,默认【DefaultCluster】

brokerClusterName=DefaultCluster

#本机主机名

brokerName=broker-a

#BrokerId,必须是大等于0的整数,0表示Master,>0表示Slave,一个Master可以挂多个Slave,Master与Slave通过BrokerName来配对,默认【0】

brokerId=0

#删除文件时间点,默认凌晨4点

deleteWhen=04

#文件保留时间,默认48小时

fileReservedTime=48

#Broker的角色 - ASYNC_MASTER 异步复制Master - SYNC_MASTER 同步双写Master - SLAVE

brokerRole=SYNC_MASTER

#刷盘方式 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH

#Name Server地址

namesrvAddr=192.168.1.101:9876;192.168.1.102:9876

#Broker对外服务的监听端口,默认【10911】

listenPort=10911

defaultTopicQueueNums=4

#是否允许Broker自动创建Topic,建议线下开启,线上关闭,默认【true】autoCreateTopicEnable=true

#是否允许Broker自动创建订阅组,建议线下开启,线上关闭,默认【true】autoCreateSubscriptionGroup=true

mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=50000000 destroyMapedFileIntervalForcibly=120000 redeleteHangedFileInterval=120000

diskMaxUsedSpaceRatio=88

storePathRootDir=/usr/local/alibaba-rocketmq/data/store

storePathCommitLog=/usr/local/alibaba-rocketmq/data/store /commitlog

maxMessageSize=65536

flushCommitLogLeastPages=4

flushConsumeQueueLeastPages=2

flushCommitLogThoroughInterval=10000 flushConsumeQueueThoroughInterval=60000

checkTransactionMessageEnable=false sendMessageThreadPoolNums=128 pullMessageThreadPoolNums=128

(3)修改备配置

vi ./conf/2m-2s-sync/broker-a-s.properties

#Broker所属哪个集群,默认【DefaultCluster】

brokerClusterName=DefaultCluster

#本机主机名

brokerName=broker-a

#BrokerId,必须是大等于0的整数,0表示Master,>0表示Slave,一个Master可以挂多个Slave,Master与Slave通过BrokerName来配对,默认【0】

brokerId=1

#删除文件时间点,默认凌晨4点

deleteWhen=04

#文件保留时间,默认48小时

fileReservedTime=48

#Broker的角色 - ASYNC_MASTER 异步复制Master - SYNC_MASTER 同步双写Master - SLAVE

brokerRole=SLAVE

#刷盘方式 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH

#Name Server地址

namesrvAddr=192.168.1.101:9876;192.168.1.102:9876

#Broker对外服务的监听端口,默认【10911】

listenPort=10911

defaultTopicQueueNums=4

#是否允许Broker自动创建Topic,建议线下开启,线上关闭,默认【true】autoCreateTopicEnable=true

#是否允许Broker自动创建订阅组,建议线下开启,线上关闭,默认【true】autoCreateSubscriptionGroup=true

mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=50000000 destroyMapedFileIntervalForcibly=120000 redeleteHangedFileInterval=120000

diskMaxUsedSpaceRatio=88

storePathRootDir=/usr/local/alibaba-rocketmq/data/store

storePathCommitLog=/usr/local/alibaba-rocketmq/data/store /commitlog

maxMessageSize=65536 flushCommitLogLeastPages=4 flushConsumeQueueLeastPages=2 flushCommitLogThoroughInterval=10000 flushConsumeQueueThoroughInterval=60000

checkTransactionMessageEnable=false sendMessageThreadPoolNums=128 pullMessageThreadPoolNums=128

实例:

1.生产者Producer.java ,TransactionMQProducer使用

package com.somnus.rocketmq;

import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.exception.MQClientExce ption;

import com.alibaba.rocketmq.client.producer.LocalTransact ionExecuter;

import com.alibaba.rocketmq.client.producer.LocalTransact ionState;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.client.producer.TransactionCh eckListener;

import com.alibaba.rocketmq.client.producer.TransactionMQ Producer;

import https://www.wendangku.net/doc/fa13989208.html,mon.message.Message;

import https://www.wendangku.net/doc/fa13989208.html,mon.message.MessageExt; public class Producer {

public static void main(String[] args) throws MQClientE xception, InterruptedException {

/**

* 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例

* 注意:ProducerGroupName需要由应用来保证唯一,一类Produ cer集合的名称,这类Producer通常发送一类消息,

* 且发送逻辑一致

* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,

* 因为服务器会回查这个Group下的任意一个Producer

*/

final TransactionMQProducer producer = new Transact ionMQProducer("ProducerGroupName");

// nameserver服务

producer.setNamesrvAddr("172.16.235.77:9876;172.16. 235.78:9876");

producer.setInstanceName("Producer");

/**

* Producer对象在使用之前必须要调用start初始化,初始化一次即可

* 注意:切记不可以在每次发送消息时,都调用start方法

*/

producer.start();

// 服务器回调Producer,检查本地事务分支成功还是失败

producer.setTransactionCheckListener(new Transactio nCheckListener() {

public LocalTransactionState checkLocalTransacti onState(

MessageExt msg) {

System.out.println("checkLocalTransactionSta te --" + new String(msg.getBody()));

return https://www.wendangku.net/doc/fa13989208.html,MIT_MESSAGE; }

});

/**

* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。

* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,

* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,

* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。

*/

for (int i = 0; i < 10; i++) {

try {

{

Message msg = new Message("TopicTest1", / / topic

"TagA", // tag

"OrderID001", // ke y消息关键词,多个Key用KEY_SEPARATOR隔开(查询消息使用)

("Hello MetaQA").getBytes()); // body

SendResult sendResult = producer.sendMess ageInTransaction(

msg, new LocalTransactionExecuter() {

public LocalTransactionState ex ecuteLocalTransactionBranch(Message msg, Object arg) {

LocalTransactionBranch--msg=" + new String(msg.getBody ()));

System.out.println("execute LocalTransactionBranch--arg=" + arg);

return LocalTransactionStat https://www.wendangku.net/doc/fa13989208.html,MIT_MESSAGE;

}

},

"$$$");

System.out.println(sendResult);

}

{

Message msg = new Message("TopicTest2", / / topic

"TagB", // tag

"OrderID0034", // ke y 消息关键词,多个Key用KEY_SEPARATOR隔开(查询消息使用)

("Hello MetaQB").getBytes()); // body

SendResult sendResult = producer.sendMess ageInTransaction(

msg, new LocalTransactionExecuter() {

public LocalTransactionState ex ecuteLocalTransactionBranch(Message msg, Object arg) {

LocalTransactionBranch--msg=" + new String(msg.getBody ()));

System.out.println("execute LocalTransactionBranch--arg=" + arg);

return LocalTransactionStat https://www.wendangku.net/doc/fa13989208.html,MIT_MESSAGE;

}

},

"$$$");

System.out.println(sendResult);

}

{

Message msg = new Message("TopicTest3", / / topic

"TagC", // tag

"OrderID061", // ke y

("Hello MetaQC").getBytes()); // body

SendResult sendResult = producer.sendMess ageInTransaction(

msg, new LocalTransactionExecuter() {

public LocalTransactionState ex ecuteLocalTransactionBranch(Message msg, Object arg) {

System.out.println("execute LocalTransactionBranch--msg=" + new String(msg.getBody ()));

LocalTransactionBranch--arg=" + arg);

return LocalTransactionStat https://www.wendangku.net/doc/fa13989208.html,MIT_MESSAGE;

}

},

"$$$");

System.out.println(sendResult);

}

} catch (Exception e) {

e.printStackTrace();

}

https://www.wendangku.net/doc/fa13989208.html,LISECONDS.sleep(1000);

}

/**

* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从M etaQ服务器上注销自己

* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法

*/

// producer.shutdown();

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

public void run() {

producer.shutdown(); }

}));

System.exit(0);

} // 执行本地事务,由客户端回调

}

相关文档