使用Redis的zset实现延时发布任务, 并结合websocket发送刷新数据消息
使用redis实现定时发布任务, 可以指定任意执行时间, 而且相比于消息中间件的延迟消息, 优势是可以做任务的管理, 改该项目还结合websocket实现了服务器主动发送请求刷新功能
Redis实现延迟任务的具体实现思路
问题思路
1.为什么采用Redis的ZSet实现延迟任务?
zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序
2.为什么任务需要存储在MySQL数据库中?
延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。
3.在添加zset数据的时候,为什么只存储未来5分钟内的任务?
任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。
准备实体类
Dto:Task对象用于存储每个任务的数据,可以在微服务之间传输任务数据时使用
package cn.lskiot.flow.compose.vo.android;
import lombok.Data;
@Data
public class Task{
/**
* 任务id
*/
private Long taskId;
/**
* 类型
*/
private Integer taskTopic;
/**
* 执行时间
*/
private long executeTime;
/**
* task参数
*/
private String parameters;
}
定义任务状态常量类:
package cn.lskiot.flow.model.constance;
/**
* task状态
*/
public class ScheduleConstants {
//初始化状态
public static final int SCHEDULED = 0;
//已执行状态
public static final int EXECUTED = 1;
//已取消状态
public static final int CANCELLED = 2;
//上牌
public static final int TASK_TOPIC_FLOW_UPPER = 3;
//下牌
public static final int TASK_TOPIC_FLOW_LOWER = 4;
//翻牌
public static final int TASK_TOPIC_FLOW_TURN = 5;
//结束迎宾
public static final int TASK_TOPIC_GREET_OVER = 6;
//跨排位翻牌
public static final int TASK_TOPIC_CROSS_TURN = 7;
}
定义Redis常量类
public abstract class RedisConstants {
public static String TASK_TOPIC_PREFIX = "task_topic_";
}
定义任务详情类
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {
/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;
/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;
/**
* 参数
*/
@TableField("parameters")
private String parameters;
/**
* 任务主题
*/
@TableField("task_topic")
private Integer taskTopic;
}
定义任务执行记录类
package cn.lskiot.flow.model.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {
/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;
/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;
/**
* 参数
*/
@TableField("parameters")
private String parameters;
/**
* 任务主题
*/
@TableField("task_topic")
private Integer taskTopic;
/**
* 版本号,用乐观锁
*/
@Version
private Integer version;
/**
* 状态 0=int 1=EXECUTED 2=CANCELLED
*/
@TableField("status")
private Integer status;
}
准备实体类对应的mapper和service
Taskinfo
package cn.lskiot.flow.application.mapper;
import cn.lskiot.flow.model.entity.Taskinfo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {
}
package cn.lskiot.flow.proxy.service;
import cn.lskiot.flow.model.entity.Taskinfo;
import com.jbangit.application.base.BaseService;
import java.util.Date;
import java.util.List;
public interface TaskinfoService extends BaseService<Taskinfo> {
List<Taskinfo> selectList(Date futureDate);
}
package cn.lskiot.flow.application.service.impl;
import cn.lskiot.flow.application.mapper.TaskinfoMapper;
import cn.lskiot.flow.model.entity.Taskinfo;
import cn.lskiot.flow.proxy.service.TaskinfoService;
import com.jbangit.application.base.ServiceImplPlus;
import org.apache.dubbo.config.annotation.DubboService;
import java.util.Date;
import java.util.List;
@DubboService
public class TaskinfoServiceImpl extends ServiceImplPlus<TaskinfoMapper, Taskinfo> implements TaskinfoService {
@Override
public List<Taskinfo> selectList(Date futureDate) {
return super.lambdaQuery().le(Taskinfo::getExecuteTime, futureDate).list();
}
}
TaskinfoLogs
package cn.lskiot.flow.application.mapper;
import cn.lskiot.flow.model.entity.TaskinfoLogs;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {
}
package cn.lskiot.flow.proxy.service;
import cn.lskiot.flow.model.entity.TaskinfoLogs;
import com.jbangit.application.base.BaseService;
public interface TaskinfoLogsService extends BaseService<TaskinfoLogs> {
}
package cn.lskiot.flow.application.service.impl;
import cn.lskiot.flow.application.mapper.TaskinfoLogsMapper;
import cn.lskiot.flow.model.entity.TaskinfoLogs;
import cn.lskiot.flow.proxy.service.TaskinfoLogsService;
import com.jbangit.application.base.ServiceImplPlus;
import org.apache.dubbo.config.annotation.DubboService;
@DubboService
public class TaskinfoLogsServiceImpl extends ServiceImplPlus<TaskinfoLogsMapper, TaskinfoLogs> implements TaskinfoLogsService {
}
操作定时发布相关方法
/**
* 添加延迟任务
*
* @param task
* @return
*/
public Long addTask(@RequestBody Task task) {
//把任务添加到DB
addTaskToDb(task);
//把任务添加Redis
addTaskToCache(task);
return task.getTaskId();
}
/**
* 更新延迟任务
*
* @param task
* @return
*/
public Long updateTask(@RequestBody Task task) {
updateTaskToDb(task);
updateTaskToCache(task);
return task.getTaskId();
}
/**
* 删除延时任务
*
* @param task
* @return
*/
private Long removeTask(Task task) {
removeTaskToDb(task);
removeTaskToCache(task);
return task.getTaskId();
}
添加任务
/**
* 把任务添加Redis
*
* @param task
*/
private void addTaskToCache(Task task) {
//获取当前时间的未来5分钟的时间
long futureTime = DateTime.now().plusMinutes(5).getMillis();
//判断任务的执行时间是否在未来5分钟以内
if (task.getExecuteTime() <= futureTime) {
String key = FlowRedisConstance.TASK_TOPIC_PREFIX + task.getTaskTopic();
redisTemplate.opsForZSet().add(key, JsonUtil.toJson(task), task.getExecuteTime());
}
}
/**
* 把任务添加到DB
*
* @param task
*/
private void addTaskToDb(Task task) {
try {
//添加任务表
Taskinfo taskinfo = new Taskinfo();
BeanUtils.copyProperties(task, taskinfo);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfo = taskinfoService.saveAndReturnT(taskinfo);
//把新产生的任务ID赋值给Task对象
task.setTaskId(taskinfo.getTaskId());
//添加任务日志表
TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
BeanUtils.copyProperties(taskinfo, taskinfoLogs);
taskinfoLogs.setVersion(1);
taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
taskinfoLogsService.save(taskinfoLogs);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
更新任务
/**
* 更新Redis中的任务
*
* @param task
*/
private void updateTaskToCache(Task task) {
String key = FlowRedisConstance.TASK_TOPIC_PREFIX + task.getTaskTopic();
redisTemplate.opsForZSet().add(key, JsonUtil.toJson(task), task.getExecuteTime());
}
/**
* 更新DB中的任务
*
* @param task
*/
private void updateTaskToDb(Task task) {
try {
//添加任务表
Taskinfo taskinfo = new Taskinfo();
BeanUtils.copyProperties(task, taskinfo);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfoService.updateById(taskinfo);
//添加任务日志表
TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
BeanUtils.copyProperties(taskinfo, taskinfoLogs);
taskinfoLogsService.updateById(taskinfoLogs);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
删除任务
/**
* 从缓存删除延时任务
*
* @param task
*/
private void removeTaskToCache(Task task) {
String key = FlowRedisConstance.TASK_TOPIC_PREFIX + task.getTaskTopic();
redisTemplate.opsForZSet().remove(key, JsonUtil.toJson(task));
}
/**
* 从数据库删除延时任务
*
* @param task
*/
private void removeTaskToDb(Task task) {
try {
//删除任务表
taskinfoService.removeById(task.getTaskId());
//更新任务日志表状态
TaskinfoLogs taskinfoLogs = taskinfoLogsService.getById(task.getTaskId());
//标记为已执行
taskinfoLogs.setStatus(ScheduleConstants.CANCELLED);
taskinfoLogsService.updateById(taskinfoLogs);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException();
}
}
一分钟同步一次数据
/**
* 定时从MySQL导入任务到缓存
*/
@Scheduled(cron = " 0/60 * * * * ? ")
public void importTaskToCache() {
log.debug("从MySQL导入任务到缓存...");
//从任务表中查询未来5分钟将要执行任务
Date futureDate = DateTime.now().plusMinutes(5).toDate();
List<Taskinfo> taskinfoList = taskinfoService.selectList(futureDate);
if (CollectionUtil.isEmpty(taskinfoList)) {
return;
}
for (Taskinfo taskinfo : taskinfoList) {
Task task = new Task();
BeanUtils.copyProperties(taskinfo, task);
task.setExecuteTime(taskinfo.getExecuteTime().getTime());
//添加到redis中
addTaskToCache(task);
}
}
从Redis拉取任务进行消费
/**
* 消费延迟任务
*/
public List<Task> pollTask(Integer taskTopic) {
//在Redis中查询符合执行条件的任务
String key = FlowRedisConstance.TASK_TOPIC_PREFIX + taskTopic;
Set<String> taskSet = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis());
List<Task> taskList = new ArrayList<>();
if (!CollectionUtils.isEmpty(taskSet)) {
for (String taskStr : taskSet) {
Task task = JSONUtil.toBean(taskStr, Task.class);
//清理Db的消费过的任务
clearTaskDb(task);
//清理缓存的消费过的任务
clearTaskCache(key, taskStr);
taskList.add(task);
}
}
return taskList;
}
清理缓存数据
/**
* 清理缓存中消费过的任务
*
* @param taskStr
*/
private void clearTaskCache(String key, String taskStr) {
redisTemplate.opsForZSet().remove(key, taskStr);
}
清理数据库数据
/**
* 清理缓存中消费过的任务
*
* @param task
*/
private void clearTaskDb(Task task) {
try {
//删除任务表
taskinfoService.removeById(task.getTaskId());
//更新任务日志表状态
TaskinfoLogs taskinfoLogs = taskinfoLogsService.getById(task.getTaskId());
//标记为已执行
taskinfoLogs.setStatus(ScheduleConstants.EXECUTED);
taskinfoLogsService.updateById(taskinfoLogs);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
在业务代码中使用定时发布功能
例如在我的业务中指定员工休息, 超出时间会做矿工处理
/**
* 消费超时自动下牌任务
*/
@Scheduled(fixedRate = 1000)// 每隔1秒1次
public void publishAutoLowerTask() {
//查询Redis符合条件的任务
List<Task> taskList = pollTask(ScheduleConstants.TASK_TOPIC_FLOW_LOWER);
if (CollectionUtil.isNotEmpty(taskList) && taskList.size() != 0) {
for (Task task : taskList) {
//取出任务参数
String param = task.getParameters();
FlowRankIntervalItem flowRankIntervalItem = JSONUtil.toBean(param, FlowRankIntervalItem.class);
flowRankIntervalItem = flowRankIntervalItemService.getById(flowRankIntervalItem.getId());
Long rankIntervalId = flowRankIntervalItem.getRankIntervalId();
FlowRankInterval flowRankInterval = flowRankIntervalService.getById(rankIntervalId);
Response<Long> response = rankController.employeeLower(flowRankInterval.getRotationRecordId(), EnumIsDefault.yes.getCode());
ArrayList<Long> flowRankIds = new ArrayList<>();
flowRankIds.add(response.getData());
flowRankIds.add(flowRankService.getFlowRankByName(flowRankInterval.getStoreId(),EnumFlowRank.other.getMsg()).getId());
WebSocketMsg webSocketMsg = new WebSocketMsg()
.setSignal(EnumWebsocket.websocket_flush)
.setData(JSONUtil.toJsonStr(flowRankIds))
.setToUser(flowRankIntervalItem.getUserid());
flowProjectCommon.webSocketSend(webSocketMsg);
log.info("倒计时结束自动超时下牌");
}
}
}
这里出现了一个业务需求, 服务器要去自动发送刷新请求, 经过讨论使用的是websocket长连接
我参考了网上搭建websocket聊天室的教程, 搭建了一个websocket服务, 地址如下:
Java WebSocket实现网络聊天室(群聊+私聊)_几人憔悴几人泪的博客-CSDN博客_java websocket 聊天室
贴一下我自己编写的代码
package cn.lskiot.flow.compose.webSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author hqr
*/
@Slf4j
@ServerEndpoint("/webSocket/{userId}")
@Component
public class WebSocketServer {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* webSocketMap,用来存放每个客户端对应的MyWebSocket对象。
*/
public static final ConcurrentHashMap<String, WebSocketServer> WEBSOCKET_MAP = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收userId
*/
private String userId = "";
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
session.setMaxIdleTimeout(60*1000);
this.session = session;
this.userId = userId;
if (WEBSOCKET_MAP.containsKey(userId)) {
WEBSOCKET_MAP.remove(userId);
WEBSOCKET_MAP.put(userId, this);
} else {
WEBSOCKET_MAP.put(userId, this);
addOnlineCount();
}
log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("用户:" + userId + ",网络异常!!!!!!");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
try {
if (WEBSOCKET_MAP.containsKey(userId)) {
WEBSOCKET_MAP.remove(userId);
subOnlineCount();
}
log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
} catch (Exception e) {
log.error("退出异常");
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
// log.info("用户:" + userId + ",报文:" + message);
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
定义消息实体类
package cn.lskiot.flow.compose.vo.android;
import enums.EnumWebsocket;
import lombok.Data;
import lombok.experimental.Accessors;
@Data
@Accessors(chain = true)
public class WebSocketMsg {
private EnumWebsocket signal;
private String toUser;
private String data;
}
发送消息公共方法
/**
* 使用webSocket发送消息
*/
public void webSocketSend(WebSocketMsg webSocketMsg) {
try {
String toUser = webSocketMsg.getToUser();
String key;
if (toUser.contains(":")) {
key = toUser.split(":")[0];
} else {
key = toUser.split("%3A")[0];
}
ConcurrentHashMap<String, WebSocketServer> websocketMap = WebSocketServer.WEBSOCKET_MAP;
websocketMap.forEach((k, webSocketServer) -> {
if (k.contains(":")) {
k = toUser.split(":")[0];
} else {
k = toUser.split("%3A")[0];
}
if (key.equals(k)) {
try {
webSocketServer.sendMessage(JSONUtils.toString(webSocketMsg));
} catch (IOException e) {
log.error("消息发送失败");
}
}
});
} catch (Exception e) {
e.printStackTrace();
throw new ServiceException("websocket异常");
}
}
利用websocket实现了扫码登录
@PostMapping("checkEmployeeId")
@ApiOperation("扫码校验")
public Response<String> checkEmployeeId(
@ApiParam("员工id") @RequestParam("employeeId") Long employeeId,
@ApiParam("水牌机用户id") @RequestParam("userId") String userId,
@ApiParam("上下牌判断") @RequestParam("flag") boolean flag) {
EnumWebsocket EnumWebsocket = flag ? enums.EnumWebsocket.websocket_upper : enums.EnumWebsocket.websocket_lower;
long temp = sessionHelper.getOrganizationEmployeeId();
if (Objects.equals(temp, employeeId)) {
HashMap<String, String> data = new HashMap<>();
data.put("employeeId", String.valueOf(employeeId));
data.put("userId", userId);
WebSocketMsg webSocketMsg = new WebSocketMsg()
.setData(JSONUtil.toJsonStr(data))
.setToUser(userId)
.setSignal(EnumWebsocket);
flowProjectCommon.webSocketSend(webSocketMsg);
return onSuccess();
} else {
return onFailure(ErrorCode.normal, "请扫描自己的二维码");
}
}
以上就是我在项目实战中遇到的一些难点以及解决方案, 希望对你有帮助, 码字不易, 请点赞支持一下哦!
更多推荐
所有评论(0)