Toy-RPC的性能测试与优化

该RPC的介绍参见我之前写的一篇文章链接Github,经过若干轮性能优化之后目前的release为4.0版本

测试环境

准备两台虚拟机,配置设置为2C4G,分别运行客户端和服务器。
Java运行的参数设置为java -jar -server -Xmx1g -Xms1g -XX:MaxDirectMemorySize=1g -XX:+UseG1GC Dubbo-RPC-Server.jar

benchmark

代码主要在benchmark模块下。
dubbo与toy-rpc共用同一套client和server代码。

测试用例

在这里参考了一下rpc-benchmark的代码以及文章。使用createUser、existUser、getUser、listUser四个方法来测试性能,这四个对于以CRUD为主的业务系统而言比较有代表性。

Server

接口:

public interface UserService {
    boolean existUser(String email);

    boolean createUser(User user);

    User getUser(long id);

    Page<User> listUser(int pageNo);
}

实现:

public class UserServiceBaseImpl implements UserService {

    @Override
    public boolean existUser(String email) {
        if (email == null || email.isEmpty()) {
            return true;
        }

        if (email.charAt(email.length() - 1) < '5') {
            return false;
        }
        return true;
    }

    @Override
    public User getUser(long id) {
        User user = new User();

        user.setId(id);
        user.setName(new String("Doug Lea"));
        user.setSex(1);
        user.setBirthday(LocalDate.of(1968, 12, 8));
        user.setEmail(new String("dong.lea@gmail.com"));
        user.setMobile(new String("18612345678"));
        user.setAddress(new String("北京市 中关村 中关村大街1号 鼎好大厦 1605"));
        user.setIcon(new String("https://www.baidu.com/img/bd_logo1.png"));
        user.setStatus(1);
        user.setCreateTime(LocalDateTime.now());
        user.setUpdateTime(user.getCreateTime());

        List<Integer> permissions = new ArrayList<Integer>(
                Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 19, 88, 86, 89, 90, 91, 92));

        user.setPermissions(permissions);

        return user;
    }

    @Override
    public Page<User> listUser(int pageNo) {
        List<User> userList = new ArrayList<>(15);

        for (int i = 0; i < 15; i++) {
            User user = new User();

            user.setId(i);
            user.setName("Doug Lea" + i);
            user.setSex(1);
            user.setBirthday(LocalDate.of(1968, 12, 8));
            user.setEmail("dong.lea@gmail.com" + i);
            user.setMobile("18612345678" + i);
            user.setAddress("北京市 中关村 中关村大街1号 鼎好大厦 1605" + i);
            user.setIcon("https://www.baidu.com/img/bd_logo1.png" + i);
            user.setStatus(1);
            user.setCreateTime(LocalDateTime.now());
            user.setUpdateTime(user.getCreateTime());

            List<Integer> permissions = new ArrayList<Integer>(
                    Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 19, 88, 86, 89, 90, 91, 92));
            user.setPermissions(permissions);

            userList.add(user);
        }

        Page<User> page = new Page<>();
        page.setPageNo(pageNo);
        page.setTotal(1000);
        page.setResult(userList);

        return page;
    }

    @Override
    public boolean createUser(User user) {
        if (user == null) {
            return false;
        }
        return true;
    }
}

Client

客户端的测试逻辑是设置32个线程,每个线程发出如3000次请求,共用同一个客户端连接。
首先进行若干轮的warmup,避免JIT等运行时优化带来的影响(前几次测试没有做,V6、V7有做),然后是进行若干轮正式的测试,最后对正式测试取一个平均值,将正式测试结果与平均结果都写入到CSV文件中。
测试项为QPS,平均响应时间AVG_RT,P90(90%的请求的RT都小于该值),P99,P999。

@Slf4j
public abstract class AbstractClient {
    private int threads;
    private int requestsTotal;
    private int requestsPerThread;
    private ExecutorService executorService;
    private UserService userService;
    private int measurementIterations;
    private int warmupIterations;

    protected abstract UserService getUserService();

    public void run(String ...strings){
        int threads = Integer.parseInt(strings[0]);
        int requestTotal = Integer.parseInt(strings[1]);
        int warmupIterations = Integer.parseInt(strings[2]);
        int measurementIterations = Integer.parseInt(strings[3]);
        run(threads, requestTotal,warmupIterations,measurementIterations);
    }

    public void run(int threads, int requestsTotal, int warmupIterations, int measurementIterations) {
        this.threads = threads;
        this.requestsTotal = requestsTotal;
        this.requestsPerThread = requestsTotal / threads;
        this.userService = getUserService();
        this.executorService = Executors.newFixedThreadPool(threads);
        this.warmupIterations = warmupIterations;
        this.measurementIterations = measurementIterations;
        createUser();
        existUser();
        getUser();
        listUser();
    }

    @Data
    public class BenchmarkResult {
        @CsvBindByName(column = "Time(ms)")
        private String mills;
        @CsvBindByName(column = "QPS(ms)")
        private String qps;
        @CsvBindByName(column = "AVG_RT(ms)")
        private String avgRt;
        @CsvBindByName(column = "P90(ms)")
        private String p90;
        @CsvBindByName(column = "P99(ms)")
        private String p99;
        @CsvBindByName(column = "P999(ms)")
        private String p999;
        @CsvBindByName(column = "Type")
        private String index;

        private double _mills;
        private double _qps;
        private double _avgRt;
        private double _p90;
        private double _p99;
        private double _p999;

        public BenchmarkResult() {
        }

        public BenchmarkResult(int index, long nanos, List<Long> rts) {
            this.index = "NORMAL-" + index;
            double mills = 1.0 * nanos / 1000000;
            // 每毫秒的处理请求数
            double qps = 1.0 * requestsTotal * 1000000 / nanos;
            // 毫秒
            double avgRt = 1.0 * rts.stream().mapToLong(x -> x).sum() / 1000000 / requestsTotal;
            Collections.sort(rts);

            this._mills = mills;
            this._qps = qps;
            this._avgRt = avgRt;
            this._p90 = 1.0 * rts.get((int) (rts.size() * 0.9)) / 1000000;
            this._p99 = 1.0 * rts.get((int) (rts.size() * 0.99)) / 1000000;
            this._p999 = 1.0 * rts.get((int) (rts.size() * 0.999)) / 1000000;

            this.mills = String.format("%.3f", _mills).trim();
            this.qps = String.format("%.3f", _qps);
            this.avgRt = String.format("%.3f ", _avgRt);
            this.p90 = String.format("%.3f", _p90);
            this.p99 = String.format("%.3f", _p99);
            this.p999 = String.format("%.3f", _p999);
        }
    }

    public BenchmarkResult avgBenchmarkResult(List<BenchmarkResult> benchmarkResults) {
        BenchmarkResult result = new BenchmarkResult();
        result.index = "AVG";
        result.mills = String.format("%.3f", benchmarkResults.stream().mapToDouble(BenchmarkResult::get_mills).average().getAsDouble()).trim();

        result.qps = String.format("%.3f", benchmarkResults.stream().mapToDouble(BenchmarkResult::get_qps).average().getAsDouble());

        result.avgRt = String.format("%.3f ", benchmarkResults.stream().mapToDouble(BenchmarkResult::get_avgRt).average().getAsDouble());
        result.p90 = String.format("%.3f", benchmarkResults.stream().mapToDouble(BenchmarkResult::get_p90).average().getAsDouble());
        result.p99 = String.format("%.3f", benchmarkResults.stream().mapToDouble(BenchmarkResult::get_p99).average().getAsDouble());

        result.p999 = String.format("%.3f", benchmarkResults.stream().mapToDouble(BenchmarkResult::get_p999).average().getAsDouble());
        return result;
    }


    private void createUser() {
        try {
            Path benchmark = Paths.get(System.getProperty("user.home"), "benchmark", "createUser.csv");
            final Path parent = benchmark.getParent();
            if (parent != null) // null will be returned if the path has no parent
                Files.createDirectories(parent);
            if (!Files.exists(benchmark)) {
                Files.createFile(benchmark);
            }
            BufferedWriter writer = Files.newBufferedWriter(benchmark, StandardOpenOption.WRITE);

            StatefulBeanToCsv beanToCsv = new StatefulBeanToCsvBuilder(writer)
                    .withQuotechar(CSVWriter.NO_QUOTE_CHARACTER)
                    .withSeparator(CSVWriter.DEFAULT_SEPARATOR)
                    .withEscapechar('\\').build();
            log.info("----------------------------------------------------------------------------");
            log.info("createUser started");
            List<BenchmarkResult> results = new ArrayList<>();
            for (int i = 0; i < warmupIterations + measurementIterations; i++) {
                CountDownLatch countDownLatch = new CountDownLatch(threads);

                User user = new User();
                long id = 1L;
                user.setId(id);
                user.setName("Doug Lea" + id);
                user.setSex(1);
                user.setBirthday(LocalDate.of(1968, 12, 8));
                user.setEmail("dong.lea@gmail.com" + id);
                user.setMobile("18612345678" + id);
                user.setAddress("北京市 中关村 中关村大街1号 鼎好大厦 1605" + id);
                user.setIcon("https://www.baidu.com/img/bd_logo1.png" + id);
                user.setStatus(1);
                user.setCreateTime(LocalDateTime.now());
                user.setUpdateTime(user.getCreateTime());

                List<Integer> permissions = new ArrayList<Integer>(
                        Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 19, 88, 86, 89, 90, 91, 92));
                user.setPermissions(permissions);

                List<Long> rts = new Vector<>(requestsTotal);
                Runnable r = () -> {
                    for (int j = 0; j < requestsPerThread; j++) {
                        long begin = System.nanoTime();
                        try {
                            userService.createUser(user);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        rts.add(System.nanoTime() - begin);
                    }
                    countDownLatch.countDown();
                };


                for (int k = 0; k < threads; k++) {
                    executorService.submit(r);
                }
                long benchmarkStart = System.nanoTime();
                countDownLatch.await();
                long nanos = System.nanoTime() - benchmarkStart;
                if (i >= warmupIterations) {
                    results.add(new BenchmarkResult(i - warmupIterations, nanos, rts));
                }
            }
            results.add(avgBenchmarkResult(results));
            beanToCsv.write(results);
            writer.close();
            log.info("createUser end");

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (CsvDataTypeMismatchException e) {
            e.printStackTrace();
        } catch (CsvRequiredFieldEmptyException e) {
            e.printStackTrace();
        }
    }

    private void existUser() {
        try {
            Path benchmark = Paths.get(System.getProperty("user.home"), "benchmark", "existUser.csv");
            final Path parent = benchmark.getParent();
            if (parent != null) // null will be returned if the path has no parent
                Files.createDirectories(parent);
            if (!Files.exists(benchmark)) {
                Files.createFile(benchmark);
            }
            BufferedWriter writer = Files.newBufferedWriter(benchmark, StandardOpenOption.WRITE);

            StatefulBeanToCsv beanToCsv = new StatefulBeanToCsvBuilder(writer)
                    .withQuotechar(CSVWriter.NO_QUOTE_CHARACTER)
                    .withSeparator(CSVWriter.DEFAULT_SEPARATOR)
                    .withEscapechar('\\').build();

            log.info("----------------------------------------------------------------------------");
            log.info("existUser started");
            List<BenchmarkResult> results = new ArrayList<>();
            for (int i = 0; i < warmupIterations + measurementIterations; i++) {
                CountDownLatch countDownLatch = new CountDownLatch(threads);
                List<Long> rts = new Vector<>(requestsTotal);
                Runnable r = () -> {
                    for (int j = 0; j < requestsPerThread; j++) {
                        long begin = System.nanoTime();
                        try {
                            userService.existUser(j + "@gmail.com");
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        rts.add(System.nanoTime() - begin);
                    }
                    countDownLatch.countDown();
                };
                for (int k = 0; k < threads; k++) {
                    executorService.submit(r);
                }
                long benchmarkStart = System.nanoTime();
                countDownLatch.await();
                long nanos = System.nanoTime() - benchmarkStart;
                if (i >= warmupIterations) {
                    results.add(new BenchmarkResult(i - warmupIterations, nanos, rts));
                }
            }
            results.add(avgBenchmarkResult(results));
            beanToCsv.write(results);
            writer.close();
            log.info("existUser end");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (CsvDataTypeMismatchException e) {
            e.printStackTrace();
        } catch (CsvRequiredFieldEmptyException e) {
            e.printStackTrace();
        }
    }

    private void getUser() {
        try {
            Path benchmark = Paths.get(System.getProperty("user.home"), "benchmark", "getUser.csv");
            final Path parent = benchmark.getParent();
            if (parent != null) // null will be returned if the path has no parent
                Files.createDirectories(parent);
            if (!Files.exists(benchmark)) {
                Files.createFile(benchmark);
            }
            BufferedWriter writer = Files.newBufferedWriter(benchmark, StandardOpenOption.WRITE);

            StatefulBeanToCsv beanToCsv = new StatefulBeanToCsvBuilder(writer)
                    .withQuotechar(CSVWriter.NO_QUOTE_CHARACTER)
                    .withSeparator(CSVWriter.DEFAULT_SEPARATOR)
                    .withEscapechar('\\').build();

            log.info("----------------------------------------------------------------------------");
            log.info("getUser started");
            List<BenchmarkResult> results = new ArrayList<>();
            for (int i = 0; i < warmupIterations + measurementIterations; i++) {
                CountDownLatch countDownLatch = new CountDownLatch(threads);
                List<Long> rts = new Vector<>(requestsTotal);
                Runnable r = () -> {
                    for (int j = 0; j < requestsPerThread; j++) {
                        long begin = System.nanoTime();
                        try {
                            userService.getUser(j);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        rts.add(System.nanoTime() - begin);
                    }
                    countDownLatch.countDown();
                };

                for (int k = 0; k < threads; k++) {
                    executorService.submit(r);
                }
                long benchmarkStart = System.nanoTime();
                countDownLatch.await();
                long nanos = System.nanoTime() - benchmarkStart;
                if (i >= warmupIterations) {
                    results.add(new BenchmarkResult(i - warmupIterations, nanos, rts));
                }
            }
            results.add(avgBenchmarkResult(results));
            beanToCsv.write(results);
            writer.close();
            log.info("getUser end");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (CsvDataTypeMismatchException e) {
            e.printStackTrace();
        } catch (CsvRequiredFieldEmptyException e) {
            e.printStackTrace();
        }
    }

    private void listUser() {
        try {
            Path benchmark = Paths.get(System.getProperty("user.home"), "benchmark", "listUser.csv");
            final Path parent = benchmark.getParent();
            if (parent != null) // null will be returned if the path has no parent
                Files.createDirectories(parent);
            if (!Files.exists(benchmark)) {
                Files.createFile(benchmark);
            }
            BufferedWriter writer = Files.newBufferedWriter(benchmark, StandardOpenOption.WRITE);

            StatefulBeanToCsv beanToCsv = new StatefulBeanToCsvBuilder(writer)
                    .withQuotechar(CSVWriter.NO_QUOTE_CHARACTER)
                    .withSeparator(CSVWriter.DEFAULT_SEPARATOR)
                    .withEscapechar('\\').build();

            log.info("----------------------------------------------------------------------------");
            log.info("listUser started");
            List<BenchmarkResult> results = new ArrayList<>();
            for (int i = 0; i < warmupIterations + measurementIterations; i++) {
                CountDownLatch countDownLatch = new CountDownLatch(threads);
                List<Long> rts = new Vector<>(requestsTotal);
                Runnable r = () -> {
                    for (int j = 0; j < requestsPerThread; j++) {
                        long begin = System.nanoTime();
                        try {
                            userService.listUser(j);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        rts.add(System.nanoTime() - begin);
                    }
                    countDownLatch.countDown();
                };

                for (int k = 0; k < threads; k++) {
                    executorService.submit(r);
                }
                long benchmarkStart = System.nanoTime();
                countDownLatch.await();
                long nanos = System.nanoTime() - benchmarkStart;
                if (i >= warmupIterations) {
                    results.add(new BenchmarkResult(i - warmupIterations, nanos, rts));
                }
            }
            results.add(avgBenchmarkResult(results));
            beanToCsv.write(results);
            writer.close();
            log.info("listUser end");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (CsvDataTypeMismatchException e) {
            e.printStackTrace();
        } catch (CsvRequiredFieldEmptyException e) {
            e.printStackTrace();
        }
    }
}

测试结果

QPS是越高表示性能越高,RT是越小表示性能越高。
这里写图片描述

V6和V7添加了warmup,这两次的测试结果如下:
这里写图片描述
足以看出warmup之后同样代码的运行效率是有提升的,体现了JIT的作用。

优化历程

从上面的测试结果来看,整体的性能可以说是提升了三倍以上,但主要是V1->V2带来的提升,V2之后的版本的性能提升都非常有限。最初版本比Dubbo性能要查,性能最好的版本性能要比Dubbo高出一倍以上。
下面我把每个版本对应的Github的commit版本号都列出来,方便大家去查看diff。
注意后面的版本都是在前面版本的基础上修改的。

V1

CommitV1
服务器配置:

rpc.application.name=benchmark_provider
rpc.application.serialize=jdk
rpc.application.proxy=jdk
rpc.protocol.type=toy
rpc.protocol.port=8000
rpc.protocol.executor.server.type=threadpool
rpc.protocol.executor.client.type=threadpool
rpc.registry.address=127.0.0.1:2181
rpc.cluster.loadbalance=LEAST_ACTIVE

客户端配置:

rpc.application.name=benchmark_consumer
rpc.application.serialize=jdk
rpc.application.proxy=jdk
rpc.protocol.type=toy
rpc.protocol.executor.server.type=threadpool
rpc.protocol.executor.client.type=threadpool
rpc.registry.address=127.0.0.1:2181
rpc.cluster.loadbalance=consistent_hash
rpc.cluster.faulttolerance=failover

V2

CommitV2
将序列化方式由JDK修改为protostuff。
这一版可以看出,序列化对RPC的性能影响是非常大的。
那为什么Protostuff比JDK序列化性能高那么多呢?
具体源码还没研究过,简单看一下应该是JDK等需要在序列化结果中加上类型信息,而另一种不加类型信息的是在IDL文件里写好package和类名,生成的代码直接就有了类型信息。比如protobuf, thrift。缺点是需要生成代码,双方都要知道IDL文件。

public class ProtostuffSerializer extends AbstractSerializer {
    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();

    @Override
    public <T> byte[] serialize(T obj) {
        Class<T> cls = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema<T> schema = getSchema(cls);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) {
            throw new RPCException(ErrorEnum.SERIALIZER_ERROR, "序列化失败", e);
        } finally {
            buffer.clear();
        }
    }

    @Override
    protected <T> T deserializeOnObject(byte[] data, Class<T> cls, T t) {
        try {
            Schema<T> schema = getSchema(cls);
            ProtostuffIOUtil.mergeFrom(data, t, schema);
            return t;
        } catch (Exception e) {
            throw new RPCException(ErrorEnum.SERIALIZER_ERROR, "反序列化失败", e);
        }
    }

    private static <T> Schema<T> getSchema(Class<T> cls) {
        Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
        if (schema == null) {
            schema = RuntimeSchema.createFrom(cls);
            if (schema != null) {
                cachedSchema.put(cls, schema);
            }
        }
        return schema;
    }
}

V3

CommitV3
对Request和Response使用Recycler,复用对象,减少GC。
这里对GC做了一点额外的性能测试:
在linux中可以使用jstat命令查看GC情况。
S0C:年轻代中第一个survivor(幸存区)的容量 (字节)
S1C:年轻代中第二个survivor(幸存区)的容量 (字节)
S0U:年轻代中第一个survivor(幸存区)目前已使用空间 (字节)
S1U:年轻代中第二个survivor(幸存区)目前已使用空间 (字节)
EC:年轻代中Eden(伊甸园)的容量 (字节)
EU:年轻代中Eden(伊甸园)目前已使用空间 (字节)
OC:Old代的容量 (字节)
OU:Old代目前已使用空间 (字节)
PC:Perm(持久代)的容量 (字节)
PU:Perm(持久代)目前已使用空间 (字节)
YGC:从应用程序启动到采样时年轻代中gc次数
YGCT:从应用程序启动到采样时年轻代中gc所用时间(s)
FGC:从应用程序启动到采样时old代(全gc)gc次数
FGCT:从应用程序启动到采样时old代(全gc)gc所用时间(s)
GCT:从应用程序启动到采样时gc用的总时间(s)

对V2和V3都发起了32threads*10000次请求,服务器的GC情况为:
V2:YGC次数为64,YGC时间为0.249。
V3:YGC次数为64,YGC时间为0.160。
足以看出YGC的时间是减少了的。

Domain

以RPCRequest为例:

public class RPCRequest implements Serializable {
    private final transient Recycler.Handle<RPCRequest> handle;
    private String requestId;
    private String interfaceName;
    private String methodName;
    private String[] parameterTypes;
    private Object[] parameters;
}
Recycler
public class GlobalRecycler {
    private static Map<Class<?>, Recycler<?>> RECYCLER = new HashMap<>();

    static {
        RECYCLER.put(RPCRequest.class, new Recycler<RPCRequest>() {
            @Override
            protected RPCRequest newObject(Handle<RPCRequest> handle) {
                return new RPCRequest(handle);
            }
        });
        RECYCLER.put(RPCResponse.class, new Recycler<RPCResponse>() {
            @Override
            protected RPCResponse newObject(Handle<RPCResponse> handle) {
                return new RPCResponse(handle);
            }
        });
    }


    public static boolean isReusable(Class<?> cls) {
        return RECYCLER.containsKey(cls);
    }

    public static <T> T reuse(Class<T> cls) {
        if (isReusable(cls)) {
            return (T) RECYCLER.get(cls).get();
        }
        throw new RPCException(ErrorEnum.RECYCLER_ERROR,"该类型对象不可复用:{}",cls);
    }
}

更多内容可以去了解一下Netty的Recycler的实现原理,简单地说就是ThreadLocal<Queue<Handle>>

V4

CommitV4
Netty:如果底层支持EPOLL,则使用EPOLL;否则使用JDK的NIO。
Netty:TCP_NODELAY

Server:

    public void run() {
        //两个事件循环器,第一个用于接收客户端连接,第二个用于处理客户端的读写请求
        //是线程组,持有一组线程
        log.info("支持EPOLL?:{}", Epoll.isAvailable());
        bossGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1);
        workerGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
        log.info("bossGroup:{},workerGroup:{}", bossGroup, workerGroup);
        try {
            //服务器辅助类,用于配置服务器
            ServerBootstrap bootstrap = new ServerBootstrap();
            //配置服务器参数
            bootstrap.group(bossGroup, workerGroup)
                    //使用这种类型的NIO通道,现在是基于TCP协议的
                    .channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    //对Channel进行初始化,绑定实际的事件处理器,要么实现ChannelHandler接口,要么继承ChannelHandlerAdapter类
                    .childHandler(channelInitializer)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    //服务器配置项
                    //BACKLOG
                    //TCP维护有两个队列,分别称为A和B
                    //客户端发送SYN,服务器接收到后发送SYN ACK,将客户端放入到A队列
                    //客户端接收到后再次发送ACK,服务器接收到后将客户端从A队列移至B队列,服务器的accept返回。
                    //A和B队列长度之和为backlog
                    //当A和B队列长度之和大于backlog时,新的连接会被TCP内核拒绝
                    //注意:backlog对程序的连接数并无影响,影响的只是还没有被accept取出的连接数。
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //指定发送缓冲区大小
                    .option(ChannelOption.SO_SNDBUF, 32 * 1024)
                    //指定接收缓冲区大小
                    .option(ChannelOption.SO_RCVBUF, 32 * 1024)
                    .option(ChannelOption.TCP_NODELAY, true);

            //这里的option是针对于上面的NioServerSocketChannel
            //复杂的时候可能会设置多个Channel
            //.sync表示是一个同步阻塞执行,普通的Netty的IO操作都是异步执行的
            //一个ChannelFuture代表了一个还没有发生的I/O操作。这意味着任何一个请求操作都不会马上被执行
            //Netty强烈建议直接通过添加监听器的方式获取I/O结果,而不是通过同步等待(.sync)的方式
            //如果用户操作调用了sync或者await方法,会在对应的future对象上阻塞用户线程


            //绑定端口,开始监听
            //注意这里可以绑定多个端口,每个端口都针对某一种类型的数据(控制消息,数据消息)
            String host = InetAddress.getLocalHost().getHostAddress();
            this.channelFuture = bootstrap.bind(host, getGlobalConfig().getPort()).sync();
            //应用程序会一直等待,直到channel关闭
            log.info("服务器启动,当前服务器类型为:{}", this.getClass().getSimpleName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }

那么Nio和Epoll有什么差别呢?
- Nio底层依赖JDK,JDK又基于JNI依赖Epoll;Epoll直接JNI依赖Epoll
- Nio使用Epoll的LT,Epoll默认使用Epoll的ET。ET较LT在性能上是有一定提升的,但相应的要求使用者在FD就绪时必须进行读取并一次性读取完毕。更多内容可以去了解一下Epoll的ET与LT。
- Epoll可以使用一些nio没有的配置参数, 如 TCP_CORK, SO_REUSEADDR等等。

V5、V6

CommitV6
将Invocation每次创建局部内部类修改为不生成局部内部类,而是传入Function。
Invocation修改为不每次创建对象,设置为单例;当Filter不存在时,去掉InvokerDelegate,直达ToyInvoker。

这里主要是对代码进行优化。
Invocation是对同步、异步、Oneway等调用方式的抽象。
最初是每次创建方法内部类,每次创建新的类会不断在方法区中创建类,并且动态创建的类调用时效率也会比较低,主要是缺少JIT的优化。
目前是调整为单例,传入Function来调用动态逻辑,并且Function可以作为属性,避免每次去创建。

V7

CommitV7
将动态代理由JDK调整为Javassist,目前看来性能没有较为明显的提升。
这里其实就是在拼接动态代理生成类的Java代码。

public class JavassistRPCProxyFactory extends AbstractRPCProxyFactory {
    private CtClass invokerCtClass = new CtClass(Invoker.class.getName()) {
    };

    private CtClass interceptorCtClass = new CtClass(AbstractRPCProxyFactory.class.getName()) {
    };


    @Override
    protected <T> T doCreateProxy(Class<T> interfaceClass, Invoker<T> invoker) {
        T t = null;
        try {
            String interfaceName = interfaceClass.getName();
            ClassPool pool = ClassPool.getDefault();
            // 传入类名,最后生成某种Interface
            // 我们保证某个interface只会生成一个代理类
            CtClass proxyClass = pool.makeClass(interfaceName + "$ProxyInvoker");
            // 设置接口类型
            proxyClass.setInterfaces(new CtClass[]{pool.getCtClass(interfaceName)});
            CtField invokerField = new CtField(this.invokerCtClass, "invoker", proxyClass);
            invokerField.setModifiers(Modifier.PRIVATE | Modifier.FINAL);
            proxyClass.addField(invokerField);
            CtField interceptorField = new CtField(this.interceptorCtClass, "interceptor", proxyClass);
            interceptorField.setModifiers(Modifier.PRIVATE | Modifier.FINAL);
            proxyClass.addField(interceptorField);

            CtConstructor ctConstructor = new CtConstructor(new CtClass[]{this.invokerCtClass, this.interceptorCtClass}, proxyClass);
            ctConstructor.setModifiers(Modifier.PUBLIC);
            ctConstructor.setBody("{this.invoker=$1;this.interceptor=$2;}");
            proxyClass.addConstructor(ctConstructor);
            Method[] methods = interfaceClass.getMethods();
            for (int i = 0; i < methods.length; i++) {
                Method method = methods[i];
                addInterfaceMethod(interfaceName, proxyClass, method);
            }
            String source = proxyClass.toString();
            log.info("source:{}", source);
            t = interfaceClass.cast(proxyClass.toClass().getConstructor(Invoker.class, AbstractRPCProxyFactory.class).newInstance(invoker, this));
        } catch (Exception e) {
            e.printStackTrace();
            throw new RPCException(ErrorEnum.CREATE_PROXY_ERROR, "生成Javassist动态代理失败", e);
        }
        return t;
    }


    private static void addInterfaceMethod(String classToProxy, CtClass implementer, Method method) throws CannotCompileException {
        String methodCode = generateMethodCode(classToProxy, method);
        CtMethod cm = CtNewMethod.make(methodCode, implementer);
        implementer.addMethod(cm);
    }

    private static String generateMethodCode(String interfaceName, Method method) {
        String methodName = method.getName();
        String methodReturnType = method.getReturnType().getName();
        Class[] parameterTypes = method.getParameterTypes();
        Class[] exceptionTypes = method.getExceptionTypes();

        //组装方法的Exception声明  
        StringBuilder exceptionBuffer = new StringBuilder();
        if (exceptionTypes.length > 0) exceptionBuffer.append(" throws ");
        for (int i = 0; i < exceptionTypes.length; i++) {
            if (i != exceptionTypes.length - 1) {
                exceptionBuffer.append(exceptionTypes[i].getName()).append(",");
            } else {
                exceptionBuffer.append(exceptionTypes[i].getName());
            }
        }

        //组装方法的参数列表  
        StringBuilder parameterBuffer = new StringBuilder();
        for (int i = 0; i < parameterTypes.length; i++) {
            Class parameter = parameterTypes[i];
            String parameterType = parameter.getName();
            //动态指定方法参数的变量名  
            String refName = "a" + i;
            parameterBuffer.append(parameterType).append(" " + refName);
            if (i != parameterTypes.length - 1) {
                parameterBuffer.append(",");
            }
        }

        //方法声明,由于是实现接口的方法,所以是public  
        StringBuilder methodDeclare = new StringBuilder();
        methodDeclare.append("public ").append(methodReturnType).append(" ").append(methodName).append("(").append(parameterBuffer).append(")").append(exceptionBuffer).append(" {");
        // methodDeclare.append("System.out.println(a0);");     
//        methodDeclare.append("System.out.println(new Object[]{a0});");
        // 方法体
        methodDeclare.append("Object returnObj = interceptor.invokeProxyMethod(")
                .append("invoker").append(",")
                .append("\"")
                .append(interfaceName).append("\"")
                .append(",")
                .append("\"").append(methodName).append("\"")
                .append(",")
                .append("new String[]{");
        for (int i = 0; i < parameterTypes.length; i++) {
            methodDeclare.append("\"").append(parameterTypes[i].getName()).append("\"");
        }
        methodDeclare.append("}");
        methodDeclare.append(",");
        //传递方法里的参数  
        methodDeclare.append("new Object[]{");
        for (int i = 0; i < parameterTypes.length; i++) {
            methodDeclare.append("($w)a").append(i);
            if (i != parameterTypes.length - 1) {
                methodDeclare.append(",");
            }
        }
        methodDeclare.append("});");
        if (method.getReturnType().isPrimitive()) {
            if (method.getReturnType().equals(Boolean.TYPE))
                methodDeclare.append("return ((Boolean)returnObj).booleanValue();\n");
            else if (method.getReturnType().equals(Integer.TYPE))
                methodDeclare.append("return ((Integer)returnObj).intValue();\n");
            else if (method.getReturnType().equals(Long.TYPE))
                methodDeclare.append("return ((Long)returnObj).longValue();\n");
            else if (method.getReturnType().equals(Float.TYPE))
                methodDeclare.append("return ((Float)returnObj).floatValue();\n");
            else if (method.getReturnType().equals(Double.TYPE))
                methodDeclare.append("return ((Double)returnObj).doubleValue();\n");
            else if (method.getReturnType().equals(Character.TYPE))
                methodDeclare.append("return ((Character)returnObj).charValue();\n");
            else if (method.getReturnType().equals(Byte.TYPE))
                methodDeclare.append("return ((Byte)returnObj).byteValue();\n");
            else if (method.getReturnType().equals(Short.TYPE))
                methodDeclare.append("return ((Short)returnObj).shortValue();\n");
        } else {
            methodDeclare.append("return (" + methodReturnType + ")returnObj;\n");
        }
        methodDeclare.append("}");  
//        System.out.println(methodDeclare.toString());
        return methodDeclare.toString();
    }
}

总结

在前段时间实习的时候,师兄带着我参加了天池中间件性能挑战赛,感受到了把软件性能一步步提升的乐趣,重复这样一个“尝试优化-性能测试-根据反馈进行调整”的过程是非常有成就感的,这也是我做Toy-RPC性能优化的一个初衷。在这过程中确实也学到了很多知识点,比如Netty的一些组件、序列化协议等实现都是可以继续往下深挖的,后续可能会进一步做一些性能优化。谢谢!

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐