在阅读异步执行器设计部分时,很清楚,架构受消息队列的启发。异步执行程序的设计方式是,可以轻松使用消息队列来接管线程池的作业和异步作业的处理。

基准测试表明,对于线程池支持的异步执行程序,使用消息队列是优越的吞吐量。然而,它确实带有一个额外的架构组件,这当然会使设置,维护和监视更加复杂。对于许多用户来说,线程池支持的异步执行器的性能绰绰有余。然而,知道如果所需的性能增长还有其他选择是很好的。

目前,开箱即用支持的唯一选项是Spring的JMS。之所以支持Spring是因为Spring有一些非常好的功能,可以减轻线程和处理多个消息使用者的痛苦。但是,集成非常简单,可以轻松移植到任何消息队列实现或协议(Stomp,AMPQ等)。对于下一个应用程序的反馈意见是值得赞赏的。

当引擎创建一个新的异步作业时,消息被放在一个包含作业标识符的消息队列中(在事务提交事务监听器中,所以我们确信作业条目在数据库中)。消息使用者然后使用该作业标识符来获取作业,并执行作业。异步执行程序不会再创建线程池。它将从一个单独的线程中插入和查询定时器。当一个定时器触发时,它被移动到异步作业表,现在意味着消息也被发送到消息队列。该复位过期的线程也将解开的工作像往常一样,消息队列,可能失败了。现在不要解锁工作,而是要重新发送一条消息。异步执行程序不会再轮询异步作业。

该实现由两个类组成:

  • org.flowable.engine.impl.asyncexecutor.JobManager接口的实现,它将消息放入消息队列中,而不是将消息传递给线程池。
  • javax.jms.MessageListener实现,它使用消息队列中的消息,使用消息中的作业标识来获取并执行作业。

首先,将flowable-jms-spring-executor依赖项添加到您的项目中:

<dependency>
<groupId>org.flowable</groupId>
<artifactId>flowable-jms-spring-executor</artifactId>
<version>${flowable.version}</version>
</dependency>

要启用基于消息队列的异步执行程序,在进程引擎配置中,需要执行以下操作:

  • 像往常一样,asyncExecutorActivate必须设置为true
  • asyncExecutorMessageQueueMode需要设置为true
  • 该org.flowable.spring.executor.jms.MessageBasedJobManager必须注射的JobManager

以下是使用ActiveMQ作为消息队列代理的基于Java的配置的完整示例。

有些事情要注意:

  • 该MessageBasedJobManager需要一个JmsTemplate的被注入配置有一个正确的connectionFactory。
  • 我们使用Spring 的MessageListenerContainer概念,因为这简化了线程和多个消费者。
@Configuration
public class SpringJmsConfig {
@Bean
public DataSource dataSource() {
// Omitted
}
@Bean(name = "transactionManager")
public PlatformTransactionManager transactionManager() {
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
transactionManager.setDataSource(dataSource());
return transactionManager;
}
@Bean
public SpringProcessEngineConfiguration processEngineConfiguration() {
SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
configuration.setDataSource(dataSource());
configuration.setTransactionManager(transactionManager());
configuration.setDatabaseSchemaUpdate(SpringProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
configuration.setAsyncExecutorMessageQueueMode(true);
configuration.setAsyncExecutorActivate(true);
configuration.setJobManager(jobManager());
return configuration;
}
@Bean
public ProcessEngine processEngine() {
return processEngineConfiguration().buildProcessEngine();
}
@Bean
public MessageBasedJobManager jobManager() {
MessageBasedJobManager jobManager = new MessageBasedJobManager();
jobManager.setJmsTemplate(jmsTemplate());
return jobManager;
}
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
activeMQConnectionFactory.setUseAsyncSend(true);
activeMQConnectionFactory.setAlwaysSessionAsync(true);
return new CachingConnectionFactory(activeMQConnectionFactory);
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setDefaultDestination(new ActiveMQQueue("flowable-jobs"));
jmsTemplate.setConnectionFactory(connectionFactory());
return jmsTemplate;
}
@Bean
public MessageListenerContainer messageListenerContainer() {
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer.setConnectionFactory(connectionFactory());
messageListenerContainer.setDestinationName("flowable-jobs");
messageListenerContainer.setMessageListener(jobMessageListener());
messageListenerContainer.setConcurrentConsumers(2);
messageListenerContainer.start();
return messageListenerContainer;
}
@Bean
public JobMessageListener jobMessageListener() {
JobMessageListener jobMessageListener = new JobMessageListener();
jobMessageListener.setProcessEngineConfiguration(processEngineConfiguration());
return jobMessageListener;
}
}

在上面的代码中,JobMessageListener和MessageBasedJobManager是flowable -jms-spring-executor模块中唯一的类。所有其他的代码都是来自Spring。因此,当想要将其移植到其他队列/协议时,必须移植这些类。

上面文章来自盘古BPM研究院:http://vue.pangubpm.com/
文章翻译提交:https://github.com/qiudaoke/flowable-userguide
了解更多文章可以关注微信公众号:在这里插入图片描述

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐