Redis实现延迟任务的具体实现思路

问题思路

1.为什么采用Redis的ZSet实现延迟任务?

zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序

2.为什么任务需要存储在MySQL数据库中?

延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。

3.在添加zset数据的时候,为什么只存储未来5分钟内的任务?

任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。

准备实体类

Dto:Task对象用于存储每个任务的数据,可以在微服务之间传输任务数据时使用

package cn.lskiot.flow.compose.vo.android;

import lombok.Data;




@Data
public class Task{

    /**
     * 任务id
     */
    private Long taskId;
    
    /**
     * 类型
     */
    private Integer taskTopic;

    /**
     * 执行时间
     */
    private long executeTime;

    /**
     * task参数
     */
    private String parameters;
    
}

 定义任务状态常量类:

package cn.lskiot.flow.model.constance;

/**
 * task状态
 */
public class ScheduleConstants {

    //初始化状态
    public static final int SCHEDULED = 0;
    //已执行状态
    public static final int EXECUTED = 1;
    //已取消状态
    public static final int CANCELLED = 2;
    //上牌
    public static final int TASK_TOPIC_FLOW_UPPER = 3;
    //下牌
    public static final int TASK_TOPIC_FLOW_LOWER = 4;
    //翻牌
    public static final int TASK_TOPIC_FLOW_TURN = 5;
    //结束迎宾
    public static final int TASK_TOPIC_GREET_OVER = 6;
    //跨排位翻牌
    public static final int TASK_TOPIC_CROSS_TURN = 7;

}

定义Redis常量类



public abstract class RedisConstants {
    public static String TASK_TOPIC_PREFIX = "task_topic_";
}

定义任务详情类



import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;

@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private String parameters;

    /**
     * 任务主题
     */
    @TableField("task_topic")
    private Integer taskTopic;

}

定义任务执行记录类

package cn.lskiot.flow.model.entity;

import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;

@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private String parameters;

    /**
     * 任务主题
     */
    @TableField("task_topic")
    private Integer taskTopic;

    /**
     * 版本号,用乐观锁
     */
    @Version
    private Integer version;

    /**
     * 状态 0=int 1=EXECUTED 2=CANCELLED
     */
    @TableField("status")
    private Integer status;

}

准备实体类对应的mapper和service

Taskinfo
package cn.lskiot.flow.application.mapper;

import cn.lskiot.flow.model.entity.Taskinfo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {
}
package cn.lskiot.flow.proxy.service;

import cn.lskiot.flow.model.entity.Taskinfo;
import com.jbangit.application.base.BaseService;

import java.util.Date;
import java.util.List;

public interface TaskinfoService extends BaseService<Taskinfo> {
    List<Taskinfo> selectList(Date futureDate);
}
package cn.lskiot.flow.application.service.impl;

import cn.lskiot.flow.application.mapper.TaskinfoMapper;
import cn.lskiot.flow.model.entity.Taskinfo;
import cn.lskiot.flow.proxy.service.TaskinfoService;
import com.jbangit.application.base.ServiceImplPlus;
import org.apache.dubbo.config.annotation.DubboService;

import java.util.Date;
import java.util.List;

@DubboService
public class TaskinfoServiceImpl extends ServiceImplPlus<TaskinfoMapper, Taskinfo> implements TaskinfoService {
    @Override
    public List<Taskinfo> selectList(Date futureDate) {
        return super.lambdaQuery().le(Taskinfo::getExecuteTime, futureDate).list();
    }
}
TaskinfoLogs
package cn.lskiot.flow.application.mapper;

import cn.lskiot.flow.model.entity.TaskinfoLogs;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {
}
package cn.lskiot.flow.proxy.service;

import cn.lskiot.flow.model.entity.TaskinfoLogs;
import com.jbangit.application.base.BaseService;

public interface TaskinfoLogsService extends BaseService<TaskinfoLogs> {
}
package cn.lskiot.flow.application.service.impl;

import cn.lskiot.flow.application.mapper.TaskinfoLogsMapper;
import cn.lskiot.flow.model.entity.TaskinfoLogs;
import cn.lskiot.flow.proxy.service.TaskinfoLogsService;
import com.jbangit.application.base.ServiceImplPlus;
import org.apache.dubbo.config.annotation.DubboService;

@DubboService
public class TaskinfoLogsServiceImpl extends ServiceImplPlus<TaskinfoLogsMapper, TaskinfoLogs> implements TaskinfoLogsService {
}

操作定时发布相关方法

/**
     * 添加延迟任务
     *
     * @param task
     * @return
     */
    public Long addTask(@RequestBody Task task) {
        //把任务添加到DB
        addTaskToDb(task);
        //把任务添加Redis
        addTaskToCache(task);
        return task.getTaskId();
    }


    /**
     * 更新延迟任务
     *
     * @param task
     * @return
     */
    public Long updateTask(@RequestBody Task task) {
        updateTaskToDb(task);
        updateTaskToCache(task);
        return task.getTaskId();
    }

    /**
     * 删除延时任务
     *
     * @param task
     * @return
     */
    private Long removeTask(Task task) {
        removeTaskToDb(task);
        removeTaskToCache(task);
        return task.getTaskId();
    }

添加任务

    /**
     * 把任务添加Redis
     *
     * @param task
     */
    private void addTaskToCache(Task task) {
        //获取当前时间的未来5分钟的时间
        long futureTime = DateTime.now().plusMinutes(5).getMillis();
        //判断任务的执行时间是否在未来5分钟以内
        if (task.getExecuteTime() <= futureTime) {
            String key = FlowRedisConstance.TASK_TOPIC_PREFIX + task.getTaskTopic();
            redisTemplate.opsForZSet().add(key, JsonUtil.toJson(task), task.getExecuteTime());
        }
    }


    /**
     * 把任务添加到DB
     *
     * @param task
     */
    private void addTaskToDb(Task task) {
        try {
            //添加任务表
            Taskinfo taskinfo = new Taskinfo();
            BeanUtils.copyProperties(task, taskinfo);
            taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
            taskinfo = taskinfoService.saveAndReturnT(taskinfo);
            //把新产生的任务ID赋值给Task对象
            task.setTaskId(taskinfo.getTaskId());
            //添加任务日志表
            TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
            BeanUtils.copyProperties(taskinfo, taskinfoLogs);
            taskinfoLogs.setVersion(1);
            taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
            taskinfoLogsService.save(taskinfoLogs);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

更新任务

    /**
     * 更新Redis中的任务
     *
     * @param task
     */
    private void updateTaskToCache(Task task) {
        String key = FlowRedisConstance.TASK_TOPIC_PREFIX + task.getTaskTopic();
        redisTemplate.opsForZSet().add(key, JsonUtil.toJson(task), task.getExecuteTime());
    }


    /**
     * 更新DB中的任务
     *
     * @param task
     */
    private void updateTaskToDb(Task task) {
        try {
            //添加任务表
            Taskinfo taskinfo = new Taskinfo();
            BeanUtils.copyProperties(task, taskinfo);
            taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
            taskinfoService.updateById(taskinfo);
            //添加任务日志表
            TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
            BeanUtils.copyProperties(taskinfo, taskinfoLogs);
            taskinfoLogsService.updateById(taskinfoLogs);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

删除任务

        /**
     * 从缓存删除延时任务
     *
     * @param task
     */
    private void removeTaskToCache(Task task) {
        String key = FlowRedisConstance.TASK_TOPIC_PREFIX + task.getTaskTopic();
        redisTemplate.opsForZSet().remove(key, JsonUtil.toJson(task));
    }




/**
     * 从数据库删除延时任务
     *
     * @param task
     */
    private void removeTaskToDb(Task task) {
        try {
            //删除任务表
            taskinfoService.removeById(task.getTaskId());
            //更新任务日志表状态
            TaskinfoLogs taskinfoLogs = taskinfoLogsService.getById(task.getTaskId());
            //标记为已执行
            taskinfoLogs.setStatus(ScheduleConstants.CANCELLED);
            taskinfoLogsService.updateById(taskinfoLogs);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException();
        }
    }

一分钟同步一次数据

    /**
     * 定时从MySQL导入任务到缓存
     */
    @Scheduled(cron = " 0/60 * * * * ? ")
    public void importTaskToCache() {
        log.debug("从MySQL导入任务到缓存...");
        //从任务表中查询未来5分钟将要执行任务
        Date futureDate = DateTime.now().plusMinutes(5).toDate();
        List<Taskinfo> taskinfoList = taskinfoService.selectList(futureDate);
        if (CollectionUtil.isEmpty(taskinfoList)) {
            return;
        }
        for (Taskinfo taskinfo : taskinfoList) {
            Task task = new Task();
            BeanUtils.copyProperties(taskinfo, task);
            task.setExecuteTime(taskinfo.getExecuteTime().getTime());
            //添加到redis中
            addTaskToCache(task);
        }
    }

从Redis拉取任务进行消费

    /**
     * 消费延迟任务
     */
    public List<Task> pollTask(Integer taskTopic) {
        //在Redis中查询符合执行条件的任务
        String key = FlowRedisConstance.TASK_TOPIC_PREFIX + taskTopic;
        Set<String> taskSet = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis());
        List<Task> taskList = new ArrayList<>();
        if (!CollectionUtils.isEmpty(taskSet)) {
            for (String taskStr : taskSet) {
                Task task = JSONUtil.toBean(taskStr, Task.class);
                //清理Db的消费过的任务
                clearTaskDb(task);
                //清理缓存的消费过的任务
                clearTaskCache(key, taskStr);
                taskList.add(task);
            }
        }
        return taskList;
    }

清理缓存数据

    /**
     * 清理缓存中消费过的任务
     *
     * @param taskStr
     */
    private void clearTaskCache(String key, String taskStr) {
        redisTemplate.opsForZSet().remove(key, taskStr);
    }

清理数据库数据

    /**
     * 清理缓存中消费过的任务
     *
     * @param task
     */
    private void clearTaskDb(Task task) {
        try {
            //删除任务表
            taskinfoService.removeById(task.getTaskId());
            //更新任务日志表状态
            TaskinfoLogs taskinfoLogs = taskinfoLogsService.getById(task.getTaskId());
            //标记为已执行
            taskinfoLogs.setStatus(ScheduleConstants.EXECUTED);
            taskinfoLogsService.updateById(taskinfoLogs);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

在业务代码中使用定时发布功能

例如在我的业务中指定员工休息, 超出时间会做矿工处理

 /**
     * 消费超时自动下牌任务
     */
    @Scheduled(fixedRate = 1000)// 每隔1秒1次
    public void publishAutoLowerTask() {
        //查询Redis符合条件的任务
        List<Task> taskList = pollTask(ScheduleConstants.TASK_TOPIC_FLOW_LOWER);

        if (CollectionUtil.isNotEmpty(taskList) && taskList.size() != 0) {
            for (Task task : taskList) {
                //取出任务参数
                String param = task.getParameters();
                FlowRankIntervalItem flowRankIntervalItem = JSONUtil.toBean(param, FlowRankIntervalItem.class);
                flowRankIntervalItem = flowRankIntervalItemService.getById(flowRankIntervalItem.getId());
                Long rankIntervalId = flowRankIntervalItem.getRankIntervalId();
                FlowRankInterval flowRankInterval = flowRankIntervalService.getById(rankIntervalId);
                Response<Long> response = rankController.employeeLower(flowRankInterval.getRotationRecordId(), EnumIsDefault.yes.getCode());
                ArrayList<Long> flowRankIds = new ArrayList<>();
                flowRankIds.add(response.getData());
                flowRankIds.add(flowRankService.getFlowRankByName(flowRankInterval.getStoreId(),EnumFlowRank.other.getMsg()).getId());
                WebSocketMsg webSocketMsg = new WebSocketMsg()
                        .setSignal(EnumWebsocket.websocket_flush)
                        .setData(JSONUtil.toJsonStr(flowRankIds))
                        .setToUser(flowRankIntervalItem.getUserid());
                flowProjectCommon.webSocketSend(webSocketMsg);
                log.info("倒计时结束自动超时下牌");
            }
        }
    }

这里出现了一个业务需求, 服务器要去自动发送刷新请求, 经过讨论使用的是websocket长连接

我参考了网上搭建websocket聊天室的教程, 搭建了一个websocket服务, 地址如下:

Java WebSocket实现网络聊天室(群聊+私聊)_几人憔悴几人泪的博客-CSDN博客_java websocket 聊天室

贴一下我自己编写的代码

package cn.lskiot.flow.compose.webSocket;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;


/**
 * @author hqr
 */
@Slf4j
@ServerEndpoint("/webSocket/{userId}")
@Component
public class WebSocketServer {

    /**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
     */
    private static int onlineCount = 0;
    /**
     * webSocketMap,用来存放每个客户端对应的MyWebSocket对象。
     */
    public static final ConcurrentHashMap<String, WebSocketServer> WEBSOCKET_MAP = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 接收userId
     */
    private String userId = "";

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        session.setMaxIdleTimeout(60*1000);
        this.session = session;
        this.userId = userId;

        if (WEBSOCKET_MAP.containsKey(userId)) {
            WEBSOCKET_MAP.remove(userId);
            WEBSOCKET_MAP.put(userId, this);
        } else {
            WEBSOCKET_MAP.put(userId, this);
            addOnlineCount();
        }

        log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());

        try {
            sendMessage("连接成功");
        } catch (IOException e) {
            log.error("用户:" + userId + ",网络异常!!!!!!");
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        try {
            if (WEBSOCKET_MAP.containsKey(userId)) {
                WEBSOCKET_MAP.remove(userId);
                subOnlineCount();
            }
            log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
        } catch (Exception e) {
            log.error("退出异常");
        }
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
//        log.info("用户:" + userId + ",报文:" + message);
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

定义消息实体类

package cn.lskiot.flow.compose.vo.android;

import enums.EnumWebsocket;
import lombok.Data;
import lombok.experimental.Accessors;

@Data
@Accessors(chain = true)
public class WebSocketMsg {
    private EnumWebsocket signal;
    private String toUser;
    private String data;
}

发送消息公共方法

 /**
     * 使用webSocket发送消息
     */
    public void webSocketSend(WebSocketMsg webSocketMsg) {
        try {
            String toUser = webSocketMsg.getToUser();
            String key;
            if (toUser.contains(":")) {
                key = toUser.split(":")[0];
            } else {
                key = toUser.split("%3A")[0];
            }
            ConcurrentHashMap<String, WebSocketServer> websocketMap = WebSocketServer.WEBSOCKET_MAP;
            websocketMap.forEach((k, webSocketServer) -> {
                if (k.contains(":")) {
                    k = toUser.split(":")[0];
                } else {
                    k = toUser.split("%3A")[0];
                }
                if (key.equals(k)) {
                    try {
                        webSocketServer.sendMessage(JSONUtils.toString(webSocketMsg));
                    } catch (IOException e) {
                        log.error("消息发送失败");
                    }
                }

            });
        } catch (Exception e) {
            e.printStackTrace();
            throw new ServiceException("websocket异常");
        }
    }

利用websocket实现了扫码登录

 @PostMapping("checkEmployeeId")
    @ApiOperation("扫码校验")
    public Response<String> checkEmployeeId(
            @ApiParam("员工id") @RequestParam("employeeId") Long employeeId,
            @ApiParam("水牌机用户id") @RequestParam("userId") String userId,
            @ApiParam("上下牌判断") @RequestParam("flag") boolean flag) {
        EnumWebsocket EnumWebsocket = flag ? enums.EnumWebsocket.websocket_upper : enums.EnumWebsocket.websocket_lower;
        long temp = sessionHelper.getOrganizationEmployeeId();
        if (Objects.equals(temp, employeeId)) {
        HashMap<String, String> data = new HashMap<>();
        data.put("employeeId", String.valueOf(employeeId));
        data.put("userId", userId);
        WebSocketMsg webSocketMsg = new WebSocketMsg()
                .setData(JSONUtil.toJsonStr(data))
                .setToUser(userId)
                .setSignal(EnumWebsocket);
        flowProjectCommon.webSocketSend(webSocketMsg);
        return onSuccess();
        } else {
            return onFailure(ErrorCode.normal, "请扫描自己的二维码");
        }
    }

以上就是我在项目实战中遇到的一些难点以及解决方案, 希望对你有帮助, 码字不易, 请点赞支持一下哦!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐