spring boot + websocket+redis 实现集群消息推送
spring boot 整合websocket+redis+nginx 实现集群消息推送
·
spring boot + websocket
遇到的问题
- 当使用redis做集群部署的时候,由于spring boot 版本与redis 版本不匹配,redis设置了username,而依赖没有相关的设置,所以只能升级依赖
- websocket做集群部署时,前端做消息推送,没办法正确推送,所有使用redis 的发布订阅功能来实现
- 前端连接websocket一直连接失败,通过nginx转发之后,后端找不到相关接口,则需要在nginx配置websocke的做转发
添加后端依赖
Spring boot 版本:2.3.4
spring boot redis 版本:2.4.1 (因为redis版本较高,所以需要升级redis依赖到2.4.1,否则用户名验证失败)
如需配置集群则添加以下三个依赖
jedis 版本:2.9.0
lettuce-core 版本:6.1.0
reactor-core 版本:3.3.16
spring boot websocket 版本:2.3.4
集成websocket
- 添加websocket配置
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import org.springframework.context.annotation.Configuration;
@Configuration
public class WebSocketConfig
{
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
- 加添websocket的连接入口
import org.slf4j.LoggerFactory;
import java.io.IOException;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import javax.websocket.OnClose;
import javax.websocket.OnOpen;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;
import javax.websocket.server.PathParam;
import com.dgri.gmp.hr.config.SubscribeListener;
import com.dgri.gmp.hr.utils.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import javax.websocket.Session;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
@Component
@ServerEndpoint("/websocket/pushMessage/{userId}")
public class WebSocketServer
{
private static Logger log;
private final String TOPIC_PREFIX = "websocket_topic_";
private static AtomicInteger onlineCount;
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap;
private Session session;
private String userId;
@Autowired
private RedisMessageListenerContainer redisMessageListenerContainer;
@Autowired
private RedisUtil redisUtil;
private SubscribeListener subscribeListener;
public WebSocketServer() {
this.userId = "";
}
@OnOpen
public void onOpen(final Session session, @PathParam("userId") final String userId) {
this.session = session;
this.userId = userId;
if (WebSocketServer.webSocketMap.containsKey(userId)) {
WebSocketServer.webSocketMap.remove(userId);
WebSocketServer.webSocketMap.put(userId, this);
}
else {
WebSocketServer.webSocketMap.put(userId, this);
this.addOnlineCount();
}
WebSocketServer.log.info("用户连接:" + userId + ",当前在线人数为:" + this.getOnlineCount());
(this.subscribeListener = new SubscribeListener()).setSession(session);
this.redisMessageListenerContainer.addMessageListener((MessageListener)this.subscribeListener, (Topic)new ChannelTopic("websocket_topic_" + userId));
}
@OnClose
public void onClose() {
if (WebSocketServer.webSocketMap.containsKey(this.userId)) {
WebSocketServer.webSocketMap.remove(this.userId);
this.subOnlineCount();
}
this.redisMessageListenerContainer.removeMessageListener((MessageListener)this.subscribeListener);
WebSocketServer.log.info("用户退出:" + this.userId + ",当前在线人数为:" + this.getOnlineCount());
}
@OnMessage
public void onMessage(final String message, final Session session) {
WebSocketServer.log.info("用户消息:" + this.userId + ",报文:" + message);
if (StringUtils.isNotBlank((CharSequence)message)) {
try {
final JSONObject jsonObject = JSON.parseObject(message);
jsonObject.put("fromUserId", (Object)this.userId);
final String toUserId = jsonObject.getString("toUserId");
if (StringUtils.isNotBlank((CharSequence)toUserId) && WebSocketServer.webSocketMap.containsKey(toUserId)) {
WebSocketServer.webSocketMap.get(toUserId).sendMessage(message);
}
else {
this.redisUtil.publish("websocket_topic_" + toUserId, message);
WebSocketServer.log.error("请求的userId:" + toUserId + "不在该服务器上");
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
@OnError
public void onError(final Session session, final Throwable error) {
WebSocketServer.log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
error.printStackTrace();
}
public void sendMessage(final String message) {
try {
this.session.getBasicRemote().sendText(message);
}
catch (IOException e) {
e.printStackTrace();
}
}
public static void sendInfo(final String message, final String userId) {
WebSocketServer.log.info("发送消息到:" + userId + ",报文:" + message);
if (StringUtils.isNotBlank((CharSequence)userId) && WebSocketServer.webSocketMap.containsKey(userId)) {
WebSocketServer.webSocketMap.get(userId).sendMessage(message);
}
else {
WebSocketServer.log.error("用户" + userId + ",不在线!");
}
}
public int getOnlineCount() {
return WebSocketServer.onlineCount.get();
}
public void addOnlineCount() {
WebSocketServer.onlineCount.getAndIncrement();
}
public void subOnlineCount() {
WebSocketServer.onlineCount.getAndDecrement();
}
static {
WebSocketServer.log = LoggerFactory.getLogger((Class)WebSocketServer.class);
WebSocketServer.onlineCount = new AtomicInteger(0);
WebSocketServer.webSocketMap = new ConcurrentHashMap<String, WebSocketServer>();
}
}
集成redis集群
1.配置application.yml
#==========================redis配置信息===========================
spring:
redis:
username: redis_user #用户名
password: 11111111 #密码
cluster:
timeout: 60000
nodes: 127.0.0.1:5001,127.0.0.1:5001,127.0.0.1:5001,127.0.0.1:5001
max-redirects: 3 # 获取失败最大重定向次数
lettuce:
pool:
max-active: 50 #连接池最大连接数
max-idle: 20 # 连接池最大空闲连接
min-idle: 5 # 连接池最小空闲连接
- 添加redis集群配置文件
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.support.collections.RedisProperties;
import com.dgri.gmp.hr.websocket.TestRedisMsgReceiver;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @description: redis监听配置类
*/
@Configuration
public class RedisConfig {
private final Environment environment;
public RedisConfig(Environment environment) {
this.environment = environment;
}
@Bean
@ConditionalOnMissingBean(RedisConnectionFactory.class)
public RedisConnectionFactory RedisConnectionFactory() {
Map<String, Object> source = new HashMap<String, Object>();
source.put("spring.redis.cluster.nodes", environment.getProperty("spring.redis.cluster.nodes"));
source.put("spring.redis.cluster.timeout", environment.getProperty("spring.redis.cluster.timeout"));
source.put("spring.redis.cluster.max-redirects", environment.getProperty("spring.redis.cluster.max-redirects"));
MapPropertySource mapPropertySource = new MapPropertySource("RedisClusterConfiguration", source);
RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(mapPropertySource);
//获取application.yml 中的密码(密文)
String password = environment.getProperty("spring.redis.password");
String username = environment.getProperty("spring.redis.username");
//解密密码并停驾到配置中
redisClusterConfiguration.setUsername(username);
redisClusterConfiguration.setPassword(RedisPassword.of(password));
return new LettuceConnectionFactory(redisClusterConfiguration);
}
@Bean("StringRedisTemplate")
public StringRedisTemplate createStringRedisTemplate( RedisConnectionFactory factory){
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(factory);
stringRedisTemplate.setKeySerializer(new StringRedisSerializer());
stringRedisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return stringRedisTemplate;
}
@Bean
public RedisTemplate<String, Object> redisTemplate( RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//设置value hashValue值的序列化
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(
Object.class);
ObjectMapper om = new ObjectMapper();
// om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(om);
redisTemplate.setValueSerializer(serializer);
redisTemplate.setHashValueSerializer(serializer);
//key hasKey的序列化
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* description: 手动注册Redis监听到IOC
*
* @param redisConnectionFactory
* @return: org.springframework.data.redis.listener.RedisMessageListenerContainer
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer( RedisConnectionFactory redisConnectionFactory,
@Qualifier("listenerAdapter") MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
//----test
container.addMessageListener(listenerAdapter, new PatternTopic("TOPIC_TEST"));
return container;
}
@Bean(name = "listenerAdapter")
MessageListenerAdapter listenerAdapter(TestRedisMsgReceiver receiver) {
System.out.println("消息适配器---test");
return new MessageListenerAdapter(receiver, "onMessage");
}
- 添加redis 监听文件
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import javax.websocket.Session;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import com.dgri.gmp.hr.websocket.WebSocketServer;
import cn.hutool.core.util.ObjectUtil;
/**
* @description: redis监听
*/
public class SubscribeListener implements MessageListener {
/**
* 当前websocket的session
*/
private Session session;
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("=====================监听到redis消息===================="+message);
String msg = new String(message.getBody(),StandardCharsets.UTF_8).trim();
// replaceAll("\"", "");
if (ObjectUtil.isNotEmpty(session) && session.isOpen()) {
try {
session.getBasicRemote().sendText(msg);
} catch (IOException e) {
System.out.printf("发送消息异常,msg = {} , e = {}", msg, e);
}
}
}
}
- 添加redis 发布
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class RedisUtil
{
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void publish(final String key, final String value) {
this.stringRedisTemplate.convertAndSend(key, (Object)value);
}
}
- 如果使用nginx 进行转发的话 需要添加一下配置,才能正常连接上wensocket
location /你的代理地址 {
proxy_pass http://你的IP地址:端口号/websocket/pushMessage;
proxy_connect_timeout 4s;
proxy_read_timeout 7200s;
proxy_send_timeout 12s;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
前端websocket
1.websocket公共js
不想写了
更多推荐
已为社区贡献1条内容
所有评论(0)