说明

前面消息互发以及广播都是单机就可以完成测试, 但实际场景中客户端的连接数量很大, 那就需要有一定数量的服务端去支撑, 所以准备虚拟机测试。

1. 虚拟机准备

1.1 准备1个1核1G的虚拟机(160), 配置java环境, 安装redis和minio

1.2 准备6个1核1G的空虚拟机(161到166), 只需要java环境即可

2. 服务端改造

2.1 修改 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hahashou.netty</groupId>
    <artifactId>server</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>server</name>
    <description>Netty Server Project For Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.100.Final</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.security</groupId>
            <artifactId>spring-security-crypto</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2.2 修改 application.yml (每个服务端的id是不一样的)

server:
  port: 32000

spring:
  redis:
    host: 192.168.109.160
    port: 6379
    password: root

logging:
  level:
    com.hahashou.netty: info

netty:
  server:
    # 唯一标识(与hosts文件里对应)
    id : netty-server-1
    # 客户端需要连接的端口
    port: 35000

2.3 config包下增加 NettyStatic类

package com.hahashou.netty.server.config;

import io.netty.channel.Channel;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @description: 静态常量
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
public class NettyStatic {

    /** key: 用户code; value: channelId */
    public static Map<String, String> USER_CHANNEL = new ConcurrentHashMap<>(32);

    /** key: channelId; value: Channel */
    public static Map<String, Channel> CHANNEL = new ConcurrentHashMap<>(32);

    public static Map<String, NettyClientHandler> NETTY_CLIENT_HANDLER = new ConcurrentHashMap<>(32);

    public static Map<NettyClientHandler, NettyClient> NETTY_CLIENT = new ConcurrentHashMap<>(32);
}

2.4 config包下增加 RedisConfig类

package com.hahashou.netty.server.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @description: Redis配置
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        // 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        // 使用StringRedisSerializer来序列化和反序列化redis的key
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // 开启事务:redisTemplate.setEnableTransactionSupport(true); 我觉得一般用不到(该操作是为了执行一组命令而设置的)
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        return redisTemplate;
    }

    @Bean
    public ValueOperations<String, Object> redisOperation(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForValue();
    }

    public static String NETTY_SERVER_LOCK = "NETTY_SERVER_LOCK";

    public static String NETTY_SERVER_LIST = "NETTY_SERVER_LIST";

    public static String OFFLINE_MESSAGE = "OFFLINE_MESSAGE_";
}

2.5 修改 EventLoopGroupConfig类

package com.hahashou.netty.server.config;

import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.RejectedExecutionHandlers;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @description: Netty线程组
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Configuration
public class EventLoopGroupConfig {

    private int bossNum = 1;

    private int workerNum = 4;

    private int businessNum = 1;

    private int maxPending = 100000;

    /** ------------------------------ 服务端 ------------------------------ */
    @Bean("bossGroup")
    public NioEventLoopGroup bossGroup() {
        return new NioEventLoopGroup(bossNum);
    }
    
    @Bean("workerGroup")
    public NioEventLoopGroup workerGroup() {
        return new NioEventLoopGroup(workerNum);
    }
    
    @Bean("businessGroup")
    public EventExecutorGroup businessGroup() {
        return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(),
                maxPending, RejectedExecutionHandlers.reject());
    }

    /** ------------------------------ 客户端 ------------------------------ */
    @Bean("clientWorkerGroup")
    public NioEventLoopGroup clientWorkerGroup() {
        return new NioEventLoopGroup(workerNum);
    }

    @Bean("clientBusinessGroup")
    public EventExecutorGroup clientBusinessGroup() {
        return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(), maxPending, RejectedExecutionHandlers.reject());
    }

    static class BusinessThreadFactory implements ThreadFactory {

        private final ThreadGroup group;

        private final AtomicInteger threadNumber = new AtomicInteger(1);

        private final String namePrefix;

        BusinessThreadFactory() {
            SecurityManager securityManager = System.getSecurityManager();
            group = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = "netty-server-";
        }

        @Override
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
            if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
            if (thread.getPriority() != Thread.NORM_PRIORITY) {
                thread.setPriority(Thread.NORM_PRIORITY);
            }
            return thread;
        }
    }
}

2.6 config包下增加 SpringBean类

package com.hahashou.netty.server.config;

import io.netty.util.HashedWheelTimer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;

/**
 * @description: Spring Bean管理
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Configuration
public class SpringBean {

    @Bean
    public PasswordEncoder passwordEncoder() {
        return new BCryptPasswordEncoder();
    }

    /**
     * 最多能new64个, private static final int INSTANCE_COUNT_LIMIT = 64;
     * @return
     */
    @Bean
    public HashedWheelTimer hashedWheelTimer() {
        // 默认tick间隔100毫秒, 轮子大小为512
        return new HashedWheelTimer();
    }
}

2.7 server包下增加 ApplicationInitial类

package com.hahashou.netty.server;

import com.hahashou.netty.server.config.NettyServer;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * @description: 应用初始化
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Component
@Slf4j
public class ApplicationInitial implements ApplicationRunner {

    @Resource
    private HashedWheelTimer hashedWheelTimer;
    @Resource
    private NettyServer nettyServer;

    @Override
    public void run(ApplicationArguments args) {
        hashedWheelTimer.newTimeout(nettyServer, 1 , TimeUnit.SECONDS);
    }
}

2.8 修改 Message类

package com.hahashou.netty.server.config;

import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.Getter;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
 * @description:
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Data
public class Message {

    /** 广播秘钥 */
    private String secretKey;

    /** 发送者用户code */
    private String userCode;

    /** 中转的服务端Id */
    private String serverId;

    /** 接收者用户code */
    private String friendUserCode;

    /** 连接时专用 */
    private String channelId;

    /** 消息类型 */
    private Integer type;

    public enum TypeEnum {

        TEXT(0, "文字", "", new ArrayList<>()),
        IMAGE(1, "图片", "image", Arrays.asList("bmp", "gif", "jpeg", "jpg", "png")),
        VOICE(2, "语音", "voice", Arrays.asList("mp3", "amr", "flac", "wma", "aac")),
        VIDEO(3, "视频", "video", Arrays.asList("mp4", "avi", "rmvb", "flv", "3gp", "ts", "mkv")),

        ;

        @Getter
        private Integer key;

        @Getter
        private String describe;

        @Getter
        private String bucketName;

        @Getter
        private List<String> formatList;

        TypeEnum(int key, String describe, String bucketName, List<String> formatList) {
            this.key = key;
            this.describe = describe;
            this.bucketName = bucketName;
            this.formatList = formatList;
        }

        public static TypeEnum select(String format) {
            TypeEnum result = null;
            for (TypeEnum typeEnum : TypeEnum.values()) {
                if (typeEnum.getFormatList().contains(format)) {
                    result = typeEnum;
                    break;
                }
            }
            return result;
        }
    }

    /** 文字或文件的全路径名称 */
    private String text;

    public static ByteBuf transfer(Message message) {
        return Unpooled.copiedBuffer(JSON.toJSONString(message), CharsetUtil.UTF_8);
    }

    /**
     * 生成指定长度的随机字符串
     * @param length
     * @return
     */
    public static String randomString (int length) {
        if (length > 64) {
            length = 64;
        }
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add(i + "");
        }
        for (char i = 'A'; i <= 'Z'; i++) {
            list.add(String.valueOf(i));
        }
        for (char i = 'a'; i <= 'z'; i++) {
            list.add(String.valueOf(i));
        }
        list.add("α");
        list.add("ω");
        Collections.shuffle(list);
        String string = list.toString();
        return string.replace("[", "")
                .replace("]", "")
                .replace(", ", "")
                .substring(0, length);
    }
}

2.9 config包下增加 NettyClientHandler类

package com.hahashou.netty.server.config;

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

/**
 * @description:
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Getter
    @Setter
    private String userCode;

    @Getter
    @Setter
    private String hostName;

    @Getter
    @Setter
    private int port;

    @Resource
    private ValueOperations<String, Object> redisOperation;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("{}, 作为客户端, 与其他服务端连接", LocalDateTime.now());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        NettyStatic.CHANNEL.remove(ctx.channel().id().asLongText());
        NettyClientHandler nettyClientHandler = NettyStatic.NETTY_CLIENT_HANDLER.remove(hostName + "@" + port);
        NettyClient nettyClient = NettyStatic.NETTY_CLIENT.remove(nettyClientHandler);
        nettyClient = null;
        nettyClientHandler = null;
        System.gc();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg != null) {
            Message message = JSON.parseObject(msg.toString(), Message.class);
            String channelId = message.getChannelId(),
                    text = message.getText();
            if (StringUtils.hasText(channelId)) {
                Channel channel = ctx.channel();
                message.setUserCode(userCode);
                NettyStatic.USER_CHANNEL.put(hostName, channelId);
                NettyStatic.CHANNEL.put(channelId, channel);
                channel.writeAndFlush(Message.transfer(message));
            } else if (StringUtils.hasText(text)) {
                String friendUserCode = message.getFriendUserCode();
                if (StringUtils.hasText(message.getServerId())) {
                    String queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);
                    if (StringUtils.hasText(queryChannelId)) {
                        Channel channel = NettyStatic.CHANNEL.get(queryChannelId);
                        if (channel == null) {
                            offlineMessage(friendUserCode, message);
                            return;
                        }
                        // 此时, 已不需要serverId
                        message.setServerId(null);
                        channel.writeAndFlush(Message.transfer(message));
                    } else {
                        offlineMessage(friendUserCode, message);
                    }
                }
            }
        }
    }

    /**
     * 离线消息存储Redis
     * @param friendUserCode
     * @param message
     */
    public void offlineMessage(String friendUserCode, Message message) {
        List<Message> messageList = new ArrayList<>();
        Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);
        if (offlineMessage != null) {
            messageList = JSON.parseArray(offlineMessage.toString(), Message.class);
        }
        messageList.add(message);
        redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

    }
}

2.10 config包下增加 NettyClient类

package com.hahashou.netty.server.config;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.PreDestroy;
import java.net.*;
import java.nio.charset.Charset;

/**
 * @description: Netty-客户端TCP服务
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Slf4j
public class NettyClient {

	@Getter
	@Setter
    private NioEventLoopGroup clientWorkerGroup;

	@Getter
	@Setter
	private EventExecutorGroup clientBusinessGroup;

    public void createClient(NettyClientHandler nettyClientHandler) {
		Bootstrap bootstrap = new Bootstrap();
		bootstrap.group(clientWorkerGroup)
				.channel(NioSocketChannel.class)
				.handler(new ChannelInitializer<NioSocketChannel>() {
					@Override
					protected void initChannel(NioSocketChannel ch) throws Exception {
						ChannelPipeline pipeline = ch.pipeline();
						pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
						pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
						pipeline.addLast(clientBusinessGroup, nettyClientHandler);
					}});
		try {
			InetAddress inetAddress = InetAddress.getByName(nettyClientHandler.getHostName());
			SocketAddress socketAddress = new InetSocketAddress(inetAddress, nettyClientHandler.getPort());
			bootstrap.connect(socketAddress).sync().channel();
		} catch (UnknownHostException exception) {
			log.error("请检查hosts文件是否配置正确 : {}", exception.getMessage());
		} catch (InterruptedException exception) {
			log.error("客户端中断异常 : {}", exception.getMessage());
		}
	}

    @PreDestroy
    public void destroy() {
		clientWorkerGroup.shutdownGracefully().syncUninterruptibly();
        log.info("客户端关闭成功");
    }
}

2.11 修改 NettyServer类

package com.hahashou.netty.server.config;

import com.alibaba.fastjson.JSON;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * @description: Netty-服务端TCP服务
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Component
@Slf4j
public class NettyServer implements TimerTask {

	@Value("${netty.server.id}")
	private String serverId;

	@Value("${netty.server.port}")
	private int port;

    @Resource
    private NioEventLoopGroup bossGroup;

    @Resource
    private NioEventLoopGroup workerGroup;

    @Resource
    private EventExecutorGroup businessGroup;

	@Resource
	private NettyServerHandler nettyServerHandler;

	@Resource
	private NioEventLoopGroup clientWorkerGroup;

	@Resource
	private EventExecutorGroup clientBusinessGroup;

	@Resource
	private RedisTemplate<String, Object> redisTemplate;
	@Resource
	private ValueOperations<String, Object> redisOperation;
	@Resource
	private HashedWheelTimer hashedWheelTimer;

	@Override
	public void run(Timeout timeout) {
		Object nettyServerLock = redisOperation.get(RedisConfig.NETTY_SERVER_LOCK);
		if (nettyServerLock != null) {
			hashedWheelTimer.newTimeout(this, 10, TimeUnit.SECONDS);
			return;
		}
		try {
			redisOperation.set(RedisConfig.NETTY_SERVER_LOCK, true);
			//String hostAddress = InetAddress.getLocalHost().getHostAddress();
			ServerBootstrap serverBootstrap = new ServerBootstrap();
			ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)
					.childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						public void initChannel(SocketChannel ch) {
							ChannelPipeline pipeline = ch.pipeline();
							pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
							pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
							pipeline.addLast(businessGroup, nettyServerHandler);
						}
					})
					// 服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
					.option(ChannelOption.SO_BACKLOG, 1024)
					.childOption(ChannelOption.TCP_NODELAY, true)
					.childOption(ChannelOption.SO_KEEPALIVE, true)
					// 此处有个大坑, 详见文章脱坑指南
					.bind(port)
					.sync();
			if (channelFuture.isSuccess()) {
				log.info("{} 启动成功", serverId);
				redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);
			}
			thisNodeHandle(port);
			channelFuture.channel().closeFuture().sync();
		} catch (InterruptedException exception) {
			log.error("{} 启动失败: {}", serverId, exception.getMessage());
		} finally {
			redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);
		}
	}

	private void thisNodeHandle(int port) {
		Set<String> nodeList = new HashSet<>();
		Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);
		if (nettyServerList != null) {
			nodeList = new HashSet<>(JSON.parseArray(nettyServerList.toString(), String.class));
			for (String hostAndPort : nodeList) {
				String[] split = hostAndPort.split("@");
				String connectHost = split[0];
				int connectPort = Integer.parseInt(split[1]);
				NettyClient nettyClient = new NettyClient();
				nettyClient.setClientWorkerGroup(clientWorkerGroup);
				nettyClient.setClientBusinessGroup(clientBusinessGroup);
				NettyClientHandler nettyClientHandler = new NettyClientHandler();
				nettyClientHandler.setUserCode(serverId);
				nettyClientHandler.setHostName(connectHost);
				nettyClientHandler.setPort(connectPort);
				nettyClient.createClient(nettyClientHandler);
				NettyStatic.NETTY_CLIENT_HANDLER.put(connectHost + "@" + connectPort, nettyClientHandler);
				NettyStatic.NETTY_CLIENT.put(nettyClientHandler, nettyClient);
			}
		}
		nodeList.add(serverId + "@" + port);
		redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(nodeList));
	}

	public void stop() {
		bossGroup.shutdownGracefully().syncUninterruptibly();
		workerGroup.shutdownGracefully().syncUninterruptibly();
		log.info("TCP服务关闭成功");
		Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);
		List<String> hostList = JSON.parseArray(nettyServerList.toString(), String.class);
		hostList.remove(serverId + "@" + port);
		if (CollectionUtils.isEmpty(hostList)) {
			redisTemplate.delete(RedisConfig.NETTY_SERVER_LIST);
		} else {
			redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(hostList));
		}
	}

    @PreDestroy
    public void destroy() {
    	stop();
    }
}

2.12 修改 NettyServerHandler类

package com.hahashou.netty.server.config;

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @description:
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Value("${netty.server.id}")
    private String serverId;

    public static String SERVER_PREFIX = "netty-server-";

    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    @Resource
    private ValueOperations<String, Object> redisOperation;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        String channelId = channel.id().asLongText();
        log.info("有客户端连接, channelId : {}", channelId);
        NettyStatic.CHANNEL.put(channelId, channel);
        Message message = new Message();
        message.setChannelId(channelId);
        channel.writeAndFlush(Message.transfer(message));
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        String channelId = ctx.channel().id().asLongText();
        log.info("有客户端断开连接, channelId : {}", channelId);
        NettyStatic.CHANNEL.remove(channelId);
        for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {
            if (entry.getValue().equals(channelId)) {
                redisTemplate.delete(entry.getKey());
                break;
            }
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg != null) {
            Message message = JSON.parseObject(msg.toString(), Message.class);
            String userCode = message.getUserCode(),
                    channelId = message.getChannelId(),
                    friendUserCode = message.getFriendUserCode();
            if (StringUtils.hasText(userCode) && StringUtils.hasText(channelId)) {
                connect(userCode, channelId);
            } else if (StringUtils.hasText(message.getText())) {
                Object code = redisOperation.get(friendUserCode);
                if (code != null) {
                    String queryServerId = code.toString();
                    message.setServerId(serverId.equals(queryServerId) ? null : queryServerId);
                    if (StringUtils.hasText(friendUserCode)) {
                        sendOtherClient(message);
                    } else {
                        sendAdmin(ctx.channel(), message);
                    }
                } else {
                    offlineMessage(friendUserCode, message);
                }
            }
        }
    }

    /**
     * 建立连接
     * @param userCode
     * @param channelId
     */
    private void connect(String userCode, String channelId) {
        log.info("{} 连接", userCode);
        NettyStatic.USER_CHANNEL.put(userCode, channelId);
        if (!userCode.startsWith(SERVER_PREFIX)) {
            redisOperation.set(userCode, serverId);
        }
    }

    /**
     * 发送给其他客户端
     * @param message
     */
    private void sendOtherClient(Message message) {
        String friendUserCode = message.getFriendUserCode(),
                serverId = message.getServerId();
        String queryChannelId;
        if (StringUtils.hasText(serverId)) {
            log.info("向" + serverId + " 进行转发");
            queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);
        } else {
            queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);
        }
        if (StringUtils.hasText(queryChannelId)) {
            Channel channel = NettyStatic.CHANNEL.get(queryChannelId);
            if (channel == null) {
                offlineMessage(friendUserCode, message);
                return;
            }
            channel.writeAndFlush(Message.transfer(message));
        } else {
            offlineMessage(friendUserCode, message);
        }
    }

    /**
     * 离线消息存储Redis
     * @param friendUserCode
     * @param message
     */
    public void offlineMessage(String friendUserCode, Message message) {
        // 1条message在redis中大概是100B, 1万条算1M, redis.conf的maxmemory设置的是256M
        List<Message> messageList = new ArrayList<>();
        Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);
        if (offlineMessage != null) {
            messageList = JSON.parseArray(offlineMessage.toString(), Message.class);
        }
        messageList.add(message);
        redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));
    }

    /**
     * 发送给服务端
     * @param channel
     * @param message
     */
    private void sendAdmin(Channel channel, Message message) {
        message.setUserCode("ADMIN");
        message.setText(LocalDateTime.now().toString());
        channel.writeAndFlush(Message.transfer(message));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.info("有客户端发生异常, channelId : {}", ctx.channel().id().asLongText());
    }
}

2.13 新建service包, 并新增 ServerService接口

package com.hahashou.netty.server.service;

import com.hahashou.netty.server.config.Message;

/**
 * @description:
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
public interface ServerService {

    /**
     * 发送消息
     * @param dto
     */
    void send(Message dto);

    /**
     * 停止服务(为后续断线重连做准备)
     */
    void stop();
}

2.14 service包下新建impl包, 并新增 ServerServiceImpl类

package com.hahashou.netty.server.service.impl;

import com.alibaba.fastjson.JSON;
import com.hahashou.netty.server.config.*;
import com.hahashou.netty.server.service.ServerService;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @description:
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Service
@Slf4j
public class ServerServiceImpl implements ServerService {

    @Value("${netty.server.id}")
    private String serverId;

    @Resource
    private PasswordEncoder passwordEncoder;

    @Resource
    private ValueOperations<String, Object> redisOperation;

    @Resource
    private NettyServer nettyServer;

    @Override
    public void send(Message dto) {
        String friendUserCode = dto.getFriendUserCode();
        if (StringUtils.hasText(friendUserCode)) {
            Object code = redisOperation.get(friendUserCode);
            if (code != null) {
                String queryServerId = code.toString();
                dto.setServerId(serverId.equals(queryServerId) ? null : queryServerId);
                if (StringUtils.hasText(friendUserCode)) {
                    sendOtherClient(dto);
                }
            } else {
                offlineMessage(friendUserCode, dto);
            }
        } else {
            // 全体广播, 需要校验秘钥(inputSecretKey应该是一个动态值, 通过手机+验证码每次广播时获取, 自行实现)
            String inputSecretKey = dto.getSecretKey();
            // encodedPassword生成见main方法
            String encodedPassword = "$2a$10$J/UEqtme/w2D0TWB4gJKFeSsyc3s8pepr6ahzOsORkC9zpaLSvZbG";
            if (StringUtils.hasText(inputSecretKey) && passwordEncoder.matches(inputSecretKey, encodedPassword)) {
                dto.setSecretKey(null);
                for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {
                    String key = entry.getKey();
                    if (key.startsWith(NettyServerHandler.SERVER_PREFIX)) {
                        // 这里可以用http调用其他服务端, 自行补充(信息redis都有)
                        continue;
                    }
                    // 只处理连接本端的客户端
                    String value = entry.getValue();
                    Channel channel = NettyStatic.CHANNEL.get(value);
                    if (channel == null) {
                        offlineMessage(friendUserCode, dto);
                        return;
                    }
                    channel.writeAndFlush(Message.transfer(dto));
                }
            }
        }
    }

    public static void main(String[] args) {
        String text = "uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU";
        PasswordEncoder passwordEncoder = new BCryptPasswordEncoder();
        String encode = passwordEncoder.encode(text);
        log.info(encode);
        if (passwordEncoder.matches(text, encode)) {
            log.info("秘钥正确");
        }
    }

    /**
     * 发送给其他客户端
     * @param message
     */
    private void sendOtherClient(Message message) {
        String friendUserCode = message.getFriendUserCode(),
                serverId = message.getServerId();
        String queryChannelId;
        if (StringUtils.hasText(serverId)) {
            log.info("向" + serverId + " 进行转发");
            queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);
        } else {
            queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);
        }
        if (StringUtils.hasText(queryChannelId)) {
            Channel channel = NettyStatic.CHANNEL.get(queryChannelId);
            if (channel == null) {
                offlineMessage(friendUserCode, message);
                return;
            }
            channel.writeAndFlush(Message.transfer(message));
        } else {
            offlineMessage(friendUserCode, message);
        }
    }

    /**
     * 离线消息存储Redis
     * @param friendUserCode
     * @param message
     */
    public void offlineMessage(String friendUserCode, Message message) {
        List<Message> messageList = new ArrayList<>();
        Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);
        if (offlineMessage != null) {
            messageList = JSON.parseArray(offlineMessage.toString(), Message.class);
        }
        messageList.add(message);
        redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));
    }

    @Override
    public void stop() {
        nettyServer.stop();
    }
}

2.15 修改 ServerController类

package com.hahashou.netty.server.controller;

import com.hahashou.netty.server.config.Message;
import com.hahashou.netty.server.service.ServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

/**
 * @description:
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@RestController
@RequestMapping("/server")
@Slf4j
public class ServerController {

    @Resource
    private ServerService serverService;

    /**
     * 秘钥记录: uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU
     * @param dto
     * @return
     */
    @PostMapping("/send")
    public String send(@RequestBody Message dto) {
        serverService.send(dto);
        return "success";
    }

    @GetMapping("/stop")
    public String stop() {
        serverService.stop();
        return "stop netty success";
    }
}

3. 脱坑指南, 针对 NettyServer类

工具

yum -y install net-tools
netstat -tunlp

防火墙打开时, 当使用 bind(String inetHost, int inetPort) 方法时, 因为inetHost是127.0.0.1, 所以只有本机可以访问35000, 要想让其他机器可以连接到, 需使用 bind(int inetPort) 方法, 下图是前后两次端口占用情况
端口占用情况
结论
当使用bind(String inetHost, int inetPort)方法时, 无论防火墙关闭以及启动, 虚拟机均有问题; 但当机器有公网IP, 且防火墙关闭或端口开放时, 通过DNS解析映射是没有问题的, 建议还是用bind(int inetPort)方法

4. 服务端准备

4.1 打包3个服务端的jar包, id分别为netty-server-1、netty-server-2、netty-server-3, 分别放在161到163上

4.2 161、162、163端口开放

firewall-cmd --zone=public --add-port=35000/tcp --permanent
firewall-cmd --zone=public --add-port=32000/tcp --permanent
firewall-cmd --reload

4.3 161、162、163修改hosts

vi /etc/hosts

追加内容

192.168.109.161 netty-server-1
192.168.109.162 netty-server-2
192.168.109.163 netty-server-3

4.4 依次启动161、162、163

java -Dfile.encoding=UTF-8 -jar server-1.0-SNAPSHOT.jar

161
服务端1启动
162
服务端2启动
163
服务端3启动
redis中记录的服务列表
redis中记录的服务列表

5. 客户端改造

5.1 修改 application.yml

server:
  port: 32001

logging:
  level:
    com.hahashou.netty: info

spring:
  servlet:
    multipart:
      max-file-size: 128MB
      max-request-size: 256MB

userCode: Aa
host: 192.168.109.161

minio:
  endpoint: http://192.168.109.160:9000
  accessKey: root
  secretKey: root123456

5.2 修改 NettyClient类

package com.hahashou.netty.client.config;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;

/**
 * @description: Netty-TCP服务
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Component
@Slf4j
public class NettyClient implements ApplicationListener<ApplicationStartedEvent> {

	@Value("${host}")
	private String host;

	public static int PORT = 35000;

    @Resource
    private NioEventLoopGroup workerGroup;

	@Resource
	private EventExecutorGroup businessGroup;

	@Resource
	private NettyClientHandler nettyClientHandler;

	public static Channel CHANNEL;

    @SneakyThrows
	@Override
	public void onApplicationEvent(ApplicationStartedEvent event) {
		createClient(workerGroup, businessGroup, nettyClientHandler, host, PORT);
    }

    public void createClient(NioEventLoopGroup workerGroup, EventExecutorGroup businessGroup,
							 NettyClientHandler nettyClientHandler, String host, int port) {
		Bootstrap bootstrap = new Bootstrap();
		bootstrap.group(workerGroup)
				.channel(NioSocketChannel.class)
				.handler(new ChannelInitializer<NioSocketChannel>() {
					@Override
					protected void initChannel(NioSocketChannel ch) throws Exception {
						ChannelPipeline pipeline = ch.pipeline();
						pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
						pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
						pipeline.addLast(businessGroup, nettyClientHandler);
					}});
		try {
			CHANNEL = bootstrap.connect(host, port).sync().channel();
		} catch (InterruptedException exception) {
			log.error("客户端中断异常 : {}", exception.getMessage());
		}
	}

    @PreDestroy
    public void destroy() {
		workerGroup.shutdownGracefully().syncUninterruptibly();
        log.info("客户端关闭成功");
    }
}

6. 客户端准备

6.1 准备6个jar包, 修改application.yml, 并根据下述规则放到对应机器上

Aa放在163上, Bb放在164上, Cc放在165上, Dd放在166上, Ee放在161上, Ff放在162上

userCode: Aa
host: 192.168.109.161
userCode: Bb
host: 192.168.109.161
userCode: Cc
host: 192.168.109.162
userCode: Dd
host: 192.168.109.162
userCode: Ee
host: 192.168.109.163
userCode: Ff
host: 192.168.109.163

6.2 161到166端口开放

firewall-cmd --zone=public --add-port=32001/tcp --permanent
firewall-cmd --reload

6.3 启动所有客户端

AB连接
CD连接
EF连接

7. 测试

请求参数

7.1 两个客户端连同一服务端, 不会出现转发

Aa向Bb发送消息, 且Bb收到后回复Aa
Aa向Bb
Bb向Aa

7.2 两个客户端连不同服务端

Aa向Cc发送消息(通过服务端1转发到服务端2), 且Cc收到后回复Aa(通过服务端2转发到服务端1)
A到C的转发
Aa向CcC到A的转发
Cc向Aa
Aa向Ee发送消息, 且Ee收到后回复Aa
Aa向Ee
Ee向Aa

7.3 广播

广播请求参数
收到广播

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐