工作需要,要在SpringBoot项目中整合IBM MQ,查阅了一些资料已成功整合,这里做下整理,知识嘛,还是需要系统的梳理一下的;

一、导入依赖

<!-- ibm mq -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>5.2.9.RELEASE</version>
</dependency>
<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>javax.jms-api</artifactId>
    <version>2.0.1</version>
</dependency>
<dependency>
    <groupId>com.ibm.mq</groupId>
    <artifactId>com.ibm.mq.allclient</artifactId>
    <version>9.1.1.0</version>
</dependency>

二、yml中添加IBM MQ配置

自己适配.properties

ibmmq:
  host: 127.0.0.1           # 地址
  port: 3001                # 断口
  username: admin           # 账号
  password: 123456          # 密码
  channel: CHANNEL1         # 通道名称
  manager: TEST_MANAGER     # 队列管理器
  ccsid: 1208               # 编码id
  receive-timeout: 1000     # 接收超时时间
  concurrency: 10-100       # 并发数 下限-上限
  queue-test: test_queue    # 队列名称

三、创建JMS配置类

import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * ibm mq配置
 */
@Configuration
public class JmsConfig {

    @Value("${ibmmq.host}")
    private String host;

    @Value("${ibmmq.port}")
    private Integer port;

    @Value("${ibmmq.username}")
    private String username;

    @Value("${ibmmq.password}")
    private String password;

    @Value("${ibmmq.channel}")
    private String channel;

    @Value("${ibmmq.manager}")
    private String manager;

    @Value("${ibmmq.ccsid}")
    private Integer ccsid;

    @Value("${ibmmq.receive-timeout}")
    private Long receiveTimeout;

    @Value("${ibmmq.concurrency}")
    private String concurrency;

    /**
     * 配置连接工厂:
     * CCSID要与连接到的队列管理器一致,Windows下默认为1381,
     * Linux下默认为1208。1208表示UTF-8字符集,建议把队列管理器的CCSID改为1208
     *
     * @return
     */
    @Bean("mqQueueConnectionFactory")
    public MQQueueConnectionFactory mqQueueConnectionFactory() {
        MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
        mqQueueConnectionFactory.setHostName(host);
        try {
            mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            mqQueueConnectionFactory.setCCSID(ccsid);
            mqQueueConnectionFactory.setChannel(channel);
            mqQueueConnectionFactory.setPort(port);
            mqQueueConnectionFactory.setQueueManager(manager);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return mqQueueConnectionFactory;
    }

    /**
     * 配置连接认证:
     * 如不需要账户密码链接可以跳过此步,直接将mqQueueConnectionFactory注入下一步的缓存连接工厂。
     *
     * @param mqQueueConnectionFactory
     * @return
     */
//    @Bean("userCredentialsConnectionFactoryAdapter")
    UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(MQQueueConnectionFactory mqQueueConnectionFactory) {
        UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
        userCredentialsConnectionFactoryAdapter.setUsername(username);
        userCredentialsConnectionFactoryAdapter.setPassword(password);
        userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory);
        return userCredentialsConnectionFactoryAdapter;
    }

    /**
     * 配置缓存连接工厂:
     * 不配置该类则每次与MQ交互都需要重新创建连接,大幅降低速度。
     */
    @Bean("cachingConnectionFactory")
    public CachingConnectionFactory cachingConnectionFactory(MQQueueConnectionFactory mqQueueConnectionFactory) {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setTargetConnectionFactory(mqQueueConnectionFactory);
        cachingConnectionFactory.setSessionCacheSize(500);
        cachingConnectionFactory.setReconnectOnException(true);
        return cachingConnectionFactory;
    }

    /**
     * 配置事务管理器:
     * 不使用事务可以跳过该步骤。
     * 如需使用事务,可添加注解@EnableTransactionManagement到程序入口类中,
     * 事务的具体用法可参考Spring Trasaction。
     * 需要注意:这里配置的事务会影响到@Service层的数据库事务,故暂时不使用
     *
     * @param cachingConnectionFactory
     * @return
     */
//    @Bean("jmsTransactionManager")
    public PlatformTransactionManager jmsTransactionManager(CachingConnectionFactory cachingConnectionFactory) {
        JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
        jmsTransactionManager.setConnectionFactory(cachingConnectionFactory);
        return jmsTransactionManager;
    }

    /**
     * 配置JMS模板:
     * 重要:不设置setReceiveTimeout时,
     * 当队列为空,从队列中取出消息的方法将会一直挂起直到队列内有消息
     *
     * @param cachingConnectionFactory
     * @return
     */
    @Bean("jmsOperations")
    public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(cachingConnectionFactory);
        return jmsTemplate;
    }

    /**
     * 配置DefaultJmsListenerContainerFactory,
     * 用@JmsListener注解来监听队列消息时候,尤其存在多个监听的时候,
     * 通过实例化配置DefaultJmsListenerContainerFactory来控制消息分发
     *
     * @param cachingConnectionFactory
     * @return
     */
    @Bean(name = "jmsListenerContainerFactory")
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        //设置连接数,如果对消息消费有顺序要求,这里建议设置为"1-1"
        factory.setConcurrency(concurrency);
        //重连间隔时间
        factory.setRecoveryInterval(receiveTimeout);
        return factory;
    }

}

注意:如果消费消息时有顺序要求,则配置监听工厂类时,要把并发数设置为"1-1",即factory.setConcurrency("1-1");

四、生产者

Service层

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsOperations;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class IbmMQSend {

    @Autowired
    private JmsOperations jmsOperations;

    @Value("${ibmmq.queue-test}")
    private String testQueue;

    public void sendTest(Object object) {
        jmsOperations.convertAndSend(testQueue, object);
        log.info("{} 队列消息发送成功!", testQueue);
    }

}

Controller层

import com.dao.AlarmMapper;
import com.ibmmq.IbmMQSend;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.text.SimpleDateFormat;
import java.util.Map;

@RestController
@RequestMapping("/mq")
@Api(tags = {"IBM MQ操作接口"})
public class IbmmqSenderController {

    @Autowired
    private IbmMQSend ibmMQSend;

    @Autowired
    AlarmMapper alarmMapper;

    @PostMapping("/send/message")
    @ApiOperation(value = "发送消息")
    public String cacheSend(@ApiParam(value = "消息内容") @RequestParam("message") String message) {
        ibmMQSend.sendTest(message);
        return "发送成功";
    }

}

五、消费者

这里使用@JmsListener注解来实现

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

import javax.jms.Message;
import javax.jms.TextMessage;

/**
 * IBM MQ消费者
 */
@Component
@Slf4j
public class ReceiveMessage extends MessageListenerAdapter {

    /**
     * destination:监听的队列名称;containerFactory:监听的工厂类,为配置类中所配置的名字
     *
     * @param message
     */
    @Override
    @JmsListener(destination = "test_queue", containerFactory = "jmsListenerContainerFactory")
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;  //转换成文本消息
        try {
            String text = textMessage.getText();
            log.info("{} 接收到信息:{}", "test_queue", text);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

这里比较尴尬的是@JmsListener注解中属性destination的值只能这么写死。。。

现在已经整合完成了,解决@JmsListener注解支持配置文件读取的问题请转阅:

SpringBoot中@JmsListener注解属性destination支持配置化

记得关注呀!

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐