这里的场景为1对1,就是A发送B接收。

而【不是1对多】,A发送、B、C接收

首先在springBoot程序下pom导入相关依赖,activemq的、以及提升效率的连接池的。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
   <version>5.12.1</version>
</dependency>

然后在application.yml中配置成自己的activeMQ的服务地址

server:
  port: 8080 #springBoot项目访问端口

spring:
  activemq:
    broker-url: tcp://192.168.31.43:61616  #你activeMQ的ip和端口号
    user: admin   #activeMq账号
    password: admin #activeMq密码
    queue-name: active.queue #队列名
    pool:
      enabled: true       #连接池启动
      max-connections: 10 #最大连接数

在SpringBoot的启动类,类上添加注解@EnableJms

@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
//	@EnableAsync

@EnableJms
public class BaseApplication {

	public static void main(String[] args) {
		SpringApplication.run(BaseApplication.class, args);
	}

}

创建配置类ActiveMQConfig,读取yml中的内容,并且创建对象


import org.apache.activemq.ActiveMQConnectionFactory;

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;





/**
 * 配置类
 */
@Configuration

public class ActiveMQConfig {

    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String userName;

    @Value("${spring.activemq.password}")
    private String password;

    @Value("${spring.activemq.queue-name}")
    private String queueName;

    @Bean(name = "queue")
    public Queue queue() {
        return new ActiveMQQueue(queueName);
    }

    @Bean
    public ConnectionFactory connectionFactory(){
        return new ActiveMQConnectionFactory(userName, password, brokerUrl);
    }

    // 在Queue模式中,对消息的监听需要对containerFactory进行配置
    @Bean("queueListener")
    public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }



}

创建队列的生产者

package ljqc.Controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.Destination;
import javax.jms.Queue;


@RestController
public class ProducerController {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    @PostMapping("/queue/test")
    public String sendQueue(@RequestBody String str) {
        this.sendMessage(this.queue, str);
        return "success";
    }

    // 发送消息,destination是发送到的队列,message是待发送的消息
    private void sendMessage(Destination destination, final String message){
        jmsMessagingTemplate.convertAndSend(destination, message);
    }

}

创建队列的消费者(消费者会自动监听着,一旦生产者发送数据来了,他就会进行消费)

package ljqc.Controller;
import org.springframework.jms.annotation.JmsListener;

@RestController
public class QueueConsumerListener {
    //queue模式的消费者
    @JmsListener(destination="${spring.activemq.queue-name}", containerFactory="queueListener")
    public void readActiveQueue(String message) {
        System.out.println("queue接受到:" + message);
    }
}

如果想要【多个消费者】进行消费则注解为:

@JmsListener(destination="${spring.activemq.queue-name}", containerFactory="queueListener",concurrency="3")

concurrency="3",代表消费者个数

测试,使用post发送请求

 

 消费完毕!

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐