概述:如果涉及到性能方面,关联很多因素,例如网络,系统,java虚拟机,硬件等等

这里只是从ActiveMQ本身设置来提高性能

13.1.一般的技术

有两种一般方式:

第一种:使用not-persistent(非持久)消息模式

第二种:使用事务批量处理

13.1.1.持久VS非持久消息

非持久消息快于持久消息两个原因

  • 消息发送是异步的,也就是不用等到回复 (缺点,可能会丢失消息)
  • 持久消息需要进行磁盘数据存储

如下是持久消息请求过程


设置传输模式

MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

13.1.2.事务

对于有消息保证不丢失下,可以通过事务批量处理

将许多消息进行打包之后commit()【提交】
事务与非事务例子

public void sendTransacted() throws JMSException
{
/*
 * create a default connection - we'll assume a broker is running
 * with its default configuration
 */
	ActiveMQConnectionFactory	cf		= new ActiveMQConnectionFactory();
	Connection			connection	= cf.createConnection();
	connection.start();
/* create a transacted session */
	Session		session		= connection.createSession( true, Session.SESSION_TRANSACTED );
	Topic		topic		= session.createTopic( "Test.Transactions" );
	MessageProducer producer	= session.createProducer( topic );
	int		count		= 0;
	for ( int i = 0; i < 1000; i++ )
	{
		Message message = session.createTextMessage( "message " + i );
		producer.send( message );
/* commit every 10 messages */
		if ( i != 0 && i % 10 == 0 )
		{
			session.commit();
		}
	}
}


public void sendNonTransacted() throws JMSException
{
/*
 * create a default connection - we'll assume a broker is running
 * with its default configuration
 */
	ActiveMQConnectionFactory	cf		= new ActiveMQConnectionFactory();
	Connection			connection	= cf.createConnection();
	connection.start();
/* create a default session (no transactions) */
	Session		session		= connection.createSession( false, Session.AUTO_ACKNOWELDGE );
	Topic		topic		= session.createTopic( "Test.Transactions" );
	MessageProducer producer	= session.createProducer( topic );
	int		count		= 0;
	for ( int i = 0; i < 1000; i++ )
	{
		Message message = session.createTextMessage( "message " + i );
		producer.send( message );
	}
}


13.1.3、内嵌代理器

内嵌代理器不用通过网络,所以不需要序列化对象

引用《ActiveMQ in Action》


创建一个队列服务

/*
 * By default a broker always listens on vm://<broker name>
 * so we don't need to set up an explicit connector for
 * vm:// connections - just the tcp connector
 */
BrokerService broker = new BrokerService();
broker.setBrokerName( "service" );
broker.setPersistent( false );
broker.addConnector( "tcp://localhost:61616" );
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( "vm://service" );
cf.setCopyMessageOnSend( false );
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE );
/*
 * we will need to respond to multiple destinations - so use null
 * as the destination this producer is bound to
 */
final MessageProducer producer = session.createProducer( null );
/* create a Consumer to listen for requests to service */
Queue		queue		= session.createQueue( "service.queue" );
MessageConsumer consumer	= session.createConsumer( queue );
consumer.setMessageListener( new MessageListener()
			     {
				     public void onMessage( Message msg )
				     {
					     try {
						     TextMessage textMsg = (TextMessage) msg;
						     String payload = "REPLY: " + textMsg.getText();
						     Destination replyTo;
						     replyTo = msg.getJMSReplyTo();
						     textMsg.clearBody();
						     textMsg.setText( payload );
						     producer.send( replyTo, textMsg );
					     } catch ( JMSException e ) {
						     e.printStackTrace();
					     }
				     }
			     } );

连接上QueueRequestor

ActiveMQConnectionFactory	cf		= new ActiveMQConnectionFactory( "tcp://localhost:61616" );
QueueConnection			connection	= cf.createQueueConnection();
connection.start();
QueueSession	session		= connection.createQueueSession( false, Session.AUTO_ACKNOWLEDGE );
Queue		queue		= session.createQueue( "service.queue" );
QueueRequestor	requestor	= new QueueRequestor( session, queue );
for ( int i = 0; i < 10; i++ )
{
	TextMessage	msg	= session.createTextMessage( "test msg: " + i );
	TextMessage	result	= (TextMessage) requestor.request( msg );
	System.err.println( "Result = " + result.getText() );
}
减少复制消息

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setCopyMessageOnSend(false);
13.1.4、微调OpenWire 协议

tcpNoDelayEnabled :默认值false,设置true ,用在网络慢发送大量很小消息能提高性能

cacheEnabled : 默认值true 缓存打开

cacheSize : 默认值1024 缓存大小, 最大不应该超过Short.MAX_VALUE/2

tightEncodingEnabled:默认值true 压缩消息

设置OpenWire 选项

String uri = "failover://(tcp://localhost:61616?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(url);
cf.setAlwaysSyncSend(true);


13.1.5、微调TCP传输

socketBufferSize : socket缓存大小, 默认值65536

tcpNoDelay: 默认值为false,延迟

String url = "failover://(tcp://localhost:61616?tcpNoDelay=true)";
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(url);
cf.setAlwaysSyncSend(true);

13.2、最优消息生产者

13.2.1、异步发送

设置发送模式

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setUseAsyncSend(true);

13.2.2、生产者流控制

其实就是当生产者发送消息过多,代理器会通知生产者等一下,仓库满了,消费者也会跟代理器说消费情况

引用《ActiveMQ in Action》


设置生产者发送消息大小

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setProducerWindowSize(1024000);
如何流程控制不可用

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="10mb">
                <dispatchPolicy>
                    <strictOrderDispatchPolicy />
                </dispatchPolicy>
                <subscriptionRecoveryPolicy>
                    <lastImageSubscriptionRecoveryPolicy />
                </subscriptionRecoveryPolicy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

13.3、最优消息消费者

13.3.1、限制发送消息大小


队列消费默认值 prefetch 大小=1000

队列浏览消费 默认值prefetch大小=500

持久话题消费者默认 prefetch 大小=100

非持久话题消费者默认prefetch 大小=32766

设置prefetch 规则

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
Properties props = new Properties();
props.setProperty("prefetchPolicy.queuePrefetch", "1000");
props.setProperty("prefetchPolicy.queueBrowserPrefetch", "500");
props.setProperty("prefetchPolicy.durableTopicPrefetch", "100");
props.setProperty("prefetchPolicy.topicPrefetch", "32766");
cf.setProperties(props);
在目的地设置prefetch规则

Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
MessageConsumer consumer = session.createConsumer(queue);

13.3.3异步分发

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setAlwaysSessionAsync(false);

13.4、将这些集合在一起




创建一个内嵌代理器

//By default a broker always listens on vm://<broker name>
BrokerService broker = new BrokerService();
broker.setBrokerName("fast");
broker.getSystemUsage().getMemoryUsage().setLimit(64*1024*1024);
//Set the Destination policies
PolicyEntry policy = new PolicyEntry();
//set a memory limit of 4mb for each destination
policy.setMemoryLimit(4 * 1024 *1024);
//disable flow control
policy.setProducerFlowControl(false);
PolicyMap pMap = new PolicyMap();
//configure the policy
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
broker.addConnector("tcp://localhost:61616");
broker.start();

创建一个生产者

//tell the connection factory to connect to an embedded broker named fast.
//if the embedded broker isn't already created, the connection factory will
//create a default embedded broker named "fast"
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://fast");
//disable message copying
cf.setCopyMessageOnSend(false);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic");
final MessageProducer producer = session.createProducer(topic);
//send non-persistent messages
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i =0; i < 1000000;i++) {
TextMessage message = session.createTextMessage("Test:"+i);
producer.send(message);
}

创建一个消费者

//set up the connection factory to connect the the producer's embedded broker
//using tcp://
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover://(tcp://localhost:61616)");
//configure the factory to create connections
//with straight through processing of messages
//and optimized acknowledgement
cf.setAlwaysSessionAsync(false);
cf.setOptimizeAcknowledge(true);
Connection connection = cf.createConnection();
connection.start();
//use the default session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//set the prefetch size for topics - by parsing a configuration parameter in
// the name of the topic
Topic topic = session.createTopic("test.topic?consumer.prefetchSize=32766");
MessageConsumer consumer = session.createConsumer(topic);
//setup a counter - so we don't print every message
final AtomicInteger count = new AtomicInteger();
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
try {
//only print every 10,000th message
if (count.incrementAndGet()%10000==0)
System.err.println("Got = " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});

以上图截至《ActiveMQ in Action》


Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐