第十三章,微调ActiveMQ来提高性能【笔记】
概述:如果涉及到性能方面,关联很多因素,例如网络,系统,java虚拟机,硬件等等这里只是从ActiveMQ本身设置来提高性能13.1.一般的技术有两种一般方式:第一种:使用not-persistent(非持久)消息模式第二种:使用事务批量处理13.1.1.持久VS非持久消息非持久消息快于持久消息两个原因消息发送是异步的,也就是不用等到回复 (缺点,可能会丢失
概述:如果涉及到性能方面,关联很多因素,例如网络,系统,java虚拟机,硬件等等
这里只是从ActiveMQ本身设置来提高性能
13.1.一般的技术
有两种一般方式:
第一种:使用not-persistent(非持久)消息模式
第二种:使用事务批量处理
13.1.1.持久VS非持久消息
非持久消息快于持久消息两个原因
- 消息发送是异步的,也就是不用等到回复 (缺点,可能会丢失消息)
- 持久消息需要进行磁盘数据存储
如下是持久消息请求过程
设置传输模式
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
13.1.2.事务
对于有消息保证不丢失下,可以通过事务批量处理
将许多消息进行打包之后commit()【提交】
事务与非事务例子
public void sendTransacted() throws JMSException
{
/*
* create a default connection - we'll assume a broker is running
* with its default configuration
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
Connection connection = cf.createConnection();
connection.start();
/* create a transacted session */
Session session = connection.createSession( true, Session.SESSION_TRANSACTED );
Topic topic = session.createTopic( "Test.Transactions" );
MessageProducer producer = session.createProducer( topic );
int count = 0;
for ( int i = 0; i < 1000; i++ )
{
Message message = session.createTextMessage( "message " + i );
producer.send( message );
/* commit every 10 messages */
if ( i != 0 && i % 10 == 0 )
{
session.commit();
}
}
}
public void sendNonTransacted() throws JMSException
{
/*
* create a default connection - we'll assume a broker is running
* with its default configuration
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
Connection connection = cf.createConnection();
connection.start();
/* create a default session (no transactions) */
Session session = connection.createSession( false, Session.AUTO_ACKNOWELDGE );
Topic topic = session.createTopic( "Test.Transactions" );
MessageProducer producer = session.createProducer( topic );
int count = 0;
for ( int i = 0; i < 1000; i++ )
{
Message message = session.createTextMessage( "message " + i );
producer.send( message );
}
}
13.1.3、内嵌代理器
内嵌代理器不用通过网络,所以不需要序列化对象
引用《ActiveMQ in Action》
创建一个队列服务
/*
* By default a broker always listens on vm://<broker name>
* so we don't need to set up an explicit connector for
* vm:// connections - just the tcp connector
*/
BrokerService broker = new BrokerService();
broker.setBrokerName( "service" );
broker.setPersistent( false );
broker.addConnector( "tcp://localhost:61616" );
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( "vm://service" );
cf.setCopyMessageOnSend( false );
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE );
/*
* we will need to respond to multiple destinations - so use null
* as the destination this producer is bound to
*/
final MessageProducer producer = session.createProducer( null );
/* create a Consumer to listen for requests to service */
Queue queue = session.createQueue( "service.queue" );
MessageConsumer consumer = session.createConsumer( queue );
consumer.setMessageListener( new MessageListener()
{
public void onMessage( Message msg )
{
try {
TextMessage textMsg = (TextMessage) msg;
String payload = "REPLY: " + textMsg.getText();
Destination replyTo;
replyTo = msg.getJMSReplyTo();
textMsg.clearBody();
textMsg.setText( payload );
producer.send( replyTo, textMsg );
} catch ( JMSException e ) {
e.printStackTrace();
}
}
} );
连接上QueueRequestor
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( "tcp://localhost:61616" );
QueueConnection connection = cf.createQueueConnection();
connection.start();
QueueSession session = connection.createQueueSession( false, Session.AUTO_ACKNOWLEDGE );
Queue queue = session.createQueue( "service.queue" );
QueueRequestor requestor = new QueueRequestor( session, queue );
for ( int i = 0; i < 10; i++ )
{
TextMessage msg = session.createTextMessage( "test msg: " + i );
TextMessage result = (TextMessage) requestor.request( msg );
System.err.println( "Result = " + result.getText() );
}
减少复制消息
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setCopyMessageOnSend(false);
13.1.4、微调OpenWire 协议
tcpNoDelayEnabled :默认值false,设置true ,用在网络慢发送大量很小消息能提高性能
cacheEnabled : 默认值true 缓存打开
cacheSize : 默认值1024 缓存大小, 最大不应该超过Short.MAX_VALUE/2
tightEncodingEnabled:默认值true 压缩消息
设置OpenWire 选项
String uri = "failover://(tcp://localhost:61616?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(url);
cf.setAlwaysSyncSend(true);
13.1.5、微调TCP传输
socketBufferSize : socket缓存大小, 默认值65536
tcpNoDelay: 默认值为false,延迟
String url = "failover://(tcp://localhost:61616?tcpNoDelay=true)";
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(url);
cf.setAlwaysSyncSend(true);
13.2、最优消息生产者
13.2.1、异步发送
设置发送模式
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setUseAsyncSend(true);
13.2.2、生产者流控制
其实就是当生产者发送消息过多,代理器会通知生产者等一下,仓库满了,消费者也会跟代理器说消费情况
引用《ActiveMQ in Action》
设置生产者发送消息大小
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setProducerWindowSize(1024000);
如何流程控制不可用
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="10mb">
<dispatchPolicy>
<strictOrderDispatchPolicy />
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
13.3、最优消息消费者
13.3.1、限制发送消息大小
队列消费默认值 prefetch 大小=1000
队列浏览消费 默认值prefetch大小=500
持久话题消费者默认 prefetch 大小=100
非持久话题消费者默认prefetch 大小=32766
设置prefetch 规则
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
Properties props = new Properties();
props.setProperty("prefetchPolicy.queuePrefetch", "1000");
props.setProperty("prefetchPolicy.queueBrowserPrefetch", "500");
props.setProperty("prefetchPolicy.durableTopicPrefetch", "100");
props.setProperty("prefetchPolicy.topicPrefetch", "32766");
cf.setProperties(props);
在目的地设置prefetch规则
Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
MessageConsumer consumer = session.createConsumer(queue);
13.3.3异步分发
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setAlwaysSessionAsync(false);
13.4、将这些集合在一起
创建一个内嵌代理器
//By default a broker always listens on vm://<broker name>
BrokerService broker = new BrokerService();
broker.setBrokerName("fast");
broker.getSystemUsage().getMemoryUsage().setLimit(64*1024*1024);
//Set the Destination policies
PolicyEntry policy = new PolicyEntry();
//set a memory limit of 4mb for each destination
policy.setMemoryLimit(4 * 1024 *1024);
//disable flow control
policy.setProducerFlowControl(false);
PolicyMap pMap = new PolicyMap();
//configure the policy
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
broker.addConnector("tcp://localhost:61616");
broker.start();
创建一个生产者
//tell the connection factory to connect to an embedded broker named fast.
//if the embedded broker isn't already created, the connection factory will
//create a default embedded broker named "fast"
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://fast");
//disable message copying
cf.setCopyMessageOnSend(false);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic");
final MessageProducer producer = session.createProducer(topic);
//send non-persistent messages
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i =0; i < 1000000;i++) {
TextMessage message = session.createTextMessage("Test:"+i);
producer.send(message);
}
创建一个消费者
//set up the connection factory to connect the the producer's embedded broker
//using tcp://
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover://(tcp://localhost:61616)");
//configure the factory to create connections
//with straight through processing of messages
//and optimized acknowledgement
cf.setAlwaysSessionAsync(false);
cf.setOptimizeAcknowledge(true);
Connection connection = cf.createConnection();
connection.start();
//use the default session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//set the prefetch size for topics - by parsing a configuration parameter in
// the name of the topic
Topic topic = session.createTopic("test.topic?consumer.prefetchSize=32766");
MessageConsumer consumer = session.createConsumer(topic);
//setup a counter - so we don't print every message
final AtomicInteger count = new AtomicInteger();
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
try {
//only print every 10,000th message
if (count.incrementAndGet()%10000==0)
System.err.println("Got = " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
以上图截至《ActiveMQ in Action》
更多推荐
所有评论(0)