一、并发框架disruptor介绍

1、概念:同一个jvm进程中线程间异步通信的框架

2、环形数组RingBuffer:disruptor的核心存储容器

2.1、环形数组中的元素采用覆盖方式,避免了jvm的GC
2.2、数组的大小为2的n次方,这样元素定位可以通过位运算效率会更高,其实和hashmap的index运算一样,不一样的是hashmap会扩容,而这个RingBuffer不扩容而去覆盖原来的数据在这里插入图片描述

3、SequenceBarrier:

是起屏障作用的类,因为在往RingBuffer放的过程中,生产者和消费者的存取速度不一致会造成错误。这时用SequenceBarrier可以来限制过快的存或者取,来达到速度的一致,保证不出错。原理是每次消费者取的时候会把取到的数据的位置返给生产者,生产者通过这个位置来判断什么时候往RingBuffer中放数据

4、工作流程:

生产者往RingBuffer中放数据,disruptor把数据推给消费者

5、工作模式:

统一消费、分组消费、顺序消费、多支线顺序消费

详细介绍: https://blog.csdn.net/zhouzhenyong/article/details/81303011

二、springboot整合disruptor

1、消息体

package com.example.demo.disruptor;

import lombok.Data;

/**
 * 消息体
 */
@Data
public class MessageModel {
    private String message;
}

2、事件工厂

package com.example.demo.disruptor;

import com.lmax.disruptor.EventFactory;

/**
 * 事件工厂
 */
public class MessageEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

3、消费者

package com.example.demo.disruptor;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * 消费者
 */
@Slf4j
public class MsgConsumer implements EventHandler<MessageModel> , WorkHandler<MessageModel> {
    private String name;
    public MsgConsumer(String name){
        this.name = name;
    }

    @Override
    public void onEvent(MessageModel msgEvent, long l, boolean b) throws Exception {
        try {
            //这里停止1000ms是为了确定消费消息是异步的
            log.info("消费者"+name+"处理消息开始");
            if (msgEvent != null) {
                log.info("消费者"+name+"消费的信息是:{}",msgEvent.getMessage());
            }
        } catch (Exception e) {
            log.info("消费者"+name+"处理消息失败");
        }
        log.info("消费者"+name+"处理消息结束");
    }


    @Override
    public void onEvent(MessageModel messageModel) throws Exception {
        try {
            //这里停止1000ms是为了确定消费消息是异步的
            Thread.sleep(1000);
            if (messageModel != null) {
                log.info("消费者"+name+"消费的信息是:{}",messageModel.getMessage());
            }
        } catch (Exception e) {
            log.info("消费者"+name+"处理消息失败");
        }
        log.info("消费者"+name+"处理消息结束");
    }
}

4、定义RingBuffer

package com.example.demo.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

@Configuration
public class MQManager {

    @Bean()
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程池来触发consumer的事件处理。
//        ExecutorService executor = Executors.newFixedThreadPool(3);   //这么写就认定只有3个消费者
        ThreadFactory executor = Executors.defaultThreadFactory();

        //指定事件工厂
        MessageEventFactory factory = new MessageEventFactory();

        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;

        //单生产者模式,当多个生产者时可以用ProducerType.MULTI
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
                ProducerType.SINGLE, new BlockingWaitStrategy());


        //定义消费者
        MsgConsumer msg1 = new MsgConsumer("1");
        MsgConsumer msg2 = new MsgConsumer("2");
        MsgConsumer msg3 = new MsgConsumer("3");
        MsgConsumer msg4 = new MsgConsumer("4");
        MsgConsumer msg5 = new MsgConsumer("5");


        //定义消费者执行模式(在这里一个消费者也就是一个线程,消费者执行模式也就是线程的执行模式)
//        disruptor.handleEventsWith(msg1, msg2, msg3, msg4);  //统一消费:一个消息会被所有消费者消费

//        disruptor.handleEventsWithWorkerPool(msg1, msg2);  //分组消费:一个消息只能被一个消费者消费,多消费者轮询处理

//        disruptor.handleEventsWith(msg1, msg3).then(msg2);   //顺序消费:1、3先并行处理,然后2再处理

        disruptor.handleEventsWith(msg1, msg3);  //多支线顺序消费:消费者1和消费者3一个支线,消费者2和消费者4一个支线,消费者3和消费者4消费完毕后,消费者5再进行消费
        disruptor.handleEventsWith(msg2, msg4);
        disruptor.after(msg3, msg4).handleEventsWith(msg5);


        // 启动disruptor线程
        disruptor.start();

        //获取ringbuffer环,用于接取生产者生产的事件
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }

}

5、生产者

package com.example.demo.disruptor;

import com.lmax.disruptor.RingBuffer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 生产者
 */
@Slf4j
@Component
public class MsgProducer {
    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;


    public void send(String message) {
        //获取下一个Event槽的下标
        long sequence = messageModelRingBuffer.next();
        try {
            //给Event填充数据
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("往消息队列中添加消息:{}", event);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
        } finally {
            //发布Event,激活消费者去消费,将sequence传递给消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }
    }

}

6、测试

package com.example.demo.disruptor;

import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("disruptor")
public class Test {

    @Autowired
    private MsgProducer msgProducer;

    /**
     * 测试
     */
    @PostMapping("/test")
    public String test() {
        msgProducer.send("test");
        return "ok";
    }
}

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐