文档库 最新最全的文档下载
当前位置:文档库 › activMQ相关知识

activMQ相关知识

一、基础流程
1、使用activemq需要启动服务的主要过程:按照JMS的规范,我们首先需要获得一个JMS connection factory.,
通过这个connection factory来创建connection.在这个基础之上我们再创建session, destination, producer和consumer。因此主要的几个步骤如下:
1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。
2. 利用factory构造JMS connection
3. 启动connection
4. 通过connection创建JMS session.
5. 指定JMS destination.
6. 创建JMS producer或者创建JMS message并提供destination.
7. 创建JMS consumer或注册JMS message listener.
8. 发送和接收JMS message.
9. 关闭所有JMS资源,包括connection, session, producer, consumer等。

二、ActiveMQ关于文件传输需要注意哪些方面?
1.在消息中间件的消息的路由的算法怎么实现的。
如A、B、C、D、E 五个节点上都是提供消息服务的 AMQ ,五个节点连通成一个网络,连接到 A 节点的客户端可以发消息到五个节点中的任意一个节点,
AMQ 会自动寻找一条最佳路径传递消息到目标节点。如从A 发消息到D,有两条连通路径:A->B->C->D 和A->E->D,其中 A->E->D 经过的节点最少,
该路径为最佳路径, AMQ 将根据此路径对消息进行传递。
2.在消息中间件中消息的存储最好采用哪种,各自的优缺点是什么,商用的消息中间件采用推荐采用哪种消息存储?
目前消息的存储有基于文件系统,基于内存,基于数据库的。基于文件系统:以文件方式保存消息,配置非常简单并具有较高的性能,但是可靠性比较差。
在可靠性要求不高但对性能要求很高的环境下可以使用这种消息存储方式。
基于数据库:使用关系数据库保存消息。对可靠性要求很高的应用环境下建议使用这种消息存储方式。jdbc消息存储方式的缺点是配置较复杂。
基于内存:将消息保存在内存中,可靠性比较差,性能比较高,当消息生产者快速生产和消息消息者消费慢时候,消息堆积造成对内存占用严重,造成宕机等,当重新启动时候,内存的尚未消息的消息消失,不可靠。
3. 超大文件的文件传输断点续传
1.基于单队列实现,消息生产者产生消息发送给客户端然后处于消息等待状态,客户段接受消息成功之后,发送消息给消息生产者,消息生产者再开始发送消息给消息的消费者直到整个文件发送成功,然后消息客户端将消息合并成一个文件。
2.基于多线程的实现,在发送消息时候,消息生产者创建多个线程,分别发送消息客户端,客户端接受消息并签收,如果客户端接受失败(备注这里失败,仅指消息没有签收),发送消息给消息生产者一个特定队列发送消息

请求重新继续发送。
4.提供可扩展的消息中间实现便于消息中间件的替换。
如目前采用ActiveMQ实现,以后可以替换为其他的消息中间件,如Hornetq等。
5.针对各种消息服务和消息相关的监控(Brige,Connection,Queue)的状态和数量的实时监控等。
主要采用JMX实现底层的监控功能。针对ActiveMQ提供JMX的MBean可以直接实现。
6.针对目前消息中间件在某些情况下,比较消耗内存,必须监控实时监控流量。
具体实现思路没有想出来,针对,ActiveMQ可以使用SysUsage,StoreUage等,但是其他的MQ,但是针对其他的MQ怎么将空。

三、ActiveMQ
1、ActiveMQ支持消息过滤设置规则和用法
selector支持下列几种方式:
(1) String literals: "color ='blue'"
(2) Byte strings: “myBytes <> "0X0AFC23"”
(3) Numeric values: "NoltemsInStock > 20"
(4) Boolean literals TRUE or FALSE: "AcctDetails=TRUE"
(5) Composite expressions: "Type='car' AND (color = 'blue' OR color ='green') AND weight> 2500"
使用方式:
(1)创建生产者在send函数之前添加过滤属性:
message->setStringProperty("Color",pColor);
producer->send(message);
(2)在创建消费者createConsumer时进行如下设置:
std::string MessageFilter="color ='blue'"
m_pConsumer = m_pSession->createConsumer(m_pDestination, m_MessageFilter);
m_pConsumer->setMessageListener(this);
2、错误:Channel was inactive for too long (服务器消息较长时间没有消息发送时,客户端会报这个错误)
解决方法:在建立连接的Uri中加入: wireFormat.maxInactivityDuration=0
3、采用failover方式连接导致卡死
解决方法:不采用failover连接
4、ActiveMQ发送模式
(1)ActiveMQ异步发送,只需新增参数https://www.wendangku.net/doc/fc19109019.html,eAsyncSend=true,具体如下:
BrokerUri = "tcp://127.0.0.1:61616?https://www.wendangku.net/doc/fc19109019.html,eAsyncSend=true"
(2)ActiveMQ同步发送,只需新增参数https://www.wendangku.net/doc/fc19109019.html,eAsyncSend=false,具体如下:
BrokerUri = "tcp://127.0.0.1:61616?https://www.wendangku.net/doc/fc19109019.html,eAsyncSend=false"
其实activeMQ在默认情况下就是同步发送,所以在同步发送时可以简写为:
BrokerUri = "tcp://127.0.0.1:61616"
5、错误:The Session is closed(网络异常时客户端会报出这个错误)
解决办法:在建立连接的Uri中加入: maxReconnectDelay=10000
maxReconnectDelay 最大重连间隔
6、ActiveMQ负载均衡
对broker采取了负载均衡和基于共享文件系统的主备配置,这个时候,客户端生产者和消费者的URI中用
failover:(tcp://192.168.1.117:61616,tcp://192.168.1.118:61616,tcp://broker3:61616)
6、ActiveMQ的ACK设置
根据不同的需要可以将ACK设置为Session::CLIENT_ACKNOWLEDGE 或 Session::AUTO_ACKNOWLEDGE 默认为Session::AUTO_ACKNOWLEDGE 如:
if( clientAck )
{
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE);
}
else
{
session = connectio

n->createSession( Session::AUTO_ACKNOWLEDGE);
}
7、ActiveMQ的Topic设置
根据不同的需要可以将Topic设置为Topic或 Queue默认为Queue如:
if(useTopic)
{
destination = session->createTopic(destURI);
}
else
{
destination = session->createQueue(destURI);
}
8、ActiveMQ的DeliveryMode设置(生产者时设置)
根据不同的需要可以将DeliveryMode设置为DeliveryMode::NON_PERSISTENT或 DeliveryMode::PERSISTENT默认为DeliveryMode::NON_PERSISTENT如:
if(usePersistent)
{
producer->setDeliveryMode( DeliveryMode::PERSISTENT);
}
else
{
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT);
}

相关文档