springboot整合disruptor(多种消费者执行模式)
/定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程池来触发consumer的事件处理。//这么写就认定只有3个消费者 ThreadFactory executor = Executors . defaultThreadFactory();//指定事件工厂 MessageEventFactory factory =
·
一、并发框架disruptor介绍
1、概念:同一个jvm进程中线程间异步通信的框架
2、环形数组RingBuffer:disruptor的核心存储容器
2.1、环形数组中的元素采用覆盖方式,避免了jvm的GC
2.2、数组的大小为2的n次方,这样元素定位可以通过位运算效率会更高,其实和hashmap的index运算一样,不一样的是hashmap会扩容,而这个RingBuffer不扩容而去覆盖原来的数据
3、SequenceBarrier:
是起屏障作用的类,因为在往RingBuffer放的过程中,生产者和消费者的存取速度不一致会造成错误。这时用SequenceBarrier可以来限制过快的存或者取,来达到速度的一致,保证不出错。原理是每次消费者取的时候会把取到的数据的位置返给生产者,生产者通过这个位置来判断什么时候往RingBuffer中放数据
4、工作流程:
生产者往RingBuffer中放数据,disruptor把数据推给消费者
5、工作模式:
统一消费、分组消费、顺序消费、多支线顺序消费
详细介绍: https://blog.csdn.net/zhouzhenyong/article/details/81303011
二、springboot整合disruptor
1、消息体
package com.example.demo.disruptor;
import lombok.Data;
/**
* 消息体
*/
@Data
public class MessageModel {
private String message;
}
2、事件工厂
package com.example.demo.disruptor;
import com.lmax.disruptor.EventFactory;
/**
* 事件工厂
*/
public class MessageEventFactory implements EventFactory<MessageModel> {
@Override
public MessageModel newInstance() {
return new MessageModel();
}
}
3、消费者
package com.example.demo.disruptor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
/**
* 消费者
*/
@Slf4j
public class MsgConsumer implements EventHandler<MessageModel> , WorkHandler<MessageModel> {
private String name;
public MsgConsumer(String name){
this.name = name;
}
@Override
public void onEvent(MessageModel msgEvent, long l, boolean b) throws Exception {
try {
//这里停止1000ms是为了确定消费消息是异步的
log.info("消费者"+name+"处理消息开始");
if (msgEvent != null) {
log.info("消费者"+name+"消费的信息是:{}",msgEvent.getMessage());
}
} catch (Exception e) {
log.info("消费者"+name+"处理消息失败");
}
log.info("消费者"+name+"处理消息结束");
}
@Override
public void onEvent(MessageModel messageModel) throws Exception {
try {
//这里停止1000ms是为了确定消费消息是异步的
Thread.sleep(1000);
if (messageModel != null) {
log.info("消费者"+name+"消费的信息是:{}",messageModel.getMessage());
}
} catch (Exception e) {
log.info("消费者"+name+"处理消息失败");
}
log.info("消费者"+name+"处理消息结束");
}
}
4、定义RingBuffer
package com.example.demo.disruptor;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@Configuration
public class MQManager {
@Bean()
public RingBuffer<MessageModel> messageModelRingBuffer() {
//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程池来触发consumer的事件处理。
// ExecutorService executor = Executors.newFixedThreadPool(3); //这么写就认定只有3个消费者
ThreadFactory executor = Executors.defaultThreadFactory();
//指定事件工厂
MessageEventFactory factory = new MessageEventFactory();
//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
int bufferSize = 1024 * 256;
//单生产者模式,当多个生产者时可以用ProducerType.MULTI
Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
ProducerType.SINGLE, new BlockingWaitStrategy());
//定义消费者
MsgConsumer msg1 = new MsgConsumer("1");
MsgConsumer msg2 = new MsgConsumer("2");
MsgConsumer msg3 = new MsgConsumer("3");
MsgConsumer msg4 = new MsgConsumer("4");
MsgConsumer msg5 = new MsgConsumer("5");
//定义消费者执行模式(在这里一个消费者也就是一个线程,消费者执行模式也就是线程的执行模式)
// disruptor.handleEventsWith(msg1, msg2, msg3, msg4); //统一消费:一个消息会被所有消费者消费
// disruptor.handleEventsWithWorkerPool(msg1, msg2); //分组消费:一个消息只能被一个消费者消费,多消费者轮询处理
// disruptor.handleEventsWith(msg1, msg3).then(msg2); //顺序消费:1、3先并行处理,然后2再处理
disruptor.handleEventsWith(msg1, msg3); //多支线顺序消费:消费者1和消费者3一个支线,消费者2和消费者4一个支线,消费者3和消费者4消费完毕后,消费者5再进行消费
disruptor.handleEventsWith(msg2, msg4);
disruptor.after(msg3, msg4).handleEventsWith(msg5);
// 启动disruptor线程
disruptor.start();
//获取ringbuffer环,用于接取生产者生产的事件
RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
return ringBuffer;
}
}
5、生产者
package com.example.demo.disruptor;
import com.lmax.disruptor.RingBuffer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 生产者
*/
@Slf4j
@Component
public class MsgProducer {
@Autowired
private RingBuffer<MessageModel> messageModelRingBuffer;
public void send(String message) {
//获取下一个Event槽的下标
long sequence = messageModelRingBuffer.next();
try {
//给Event填充数据
MessageModel event = messageModelRingBuffer.get(sequence);
event.setMessage(message);
log.info("往消息队列中添加消息:{}", event);
} catch (Exception e) {
log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
} finally {
//发布Event,激活消费者去消费,将sequence传递给消费者
//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
messageModelRingBuffer.publish(sequence);
}
}
}
6、测试
package com.example.demo.disruptor;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("disruptor")
public class Test {
@Autowired
private MsgProducer msgProducer;
/**
* 测试
*/
@PostMapping("/test")
public String test() {
msgProducer.send("test");
return "ok";
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)