Toy-RPC的性能测试与优化
Toy-RPC的性能测试与优化该RPC的介绍参见我之前写的一篇文章链接,Github,经过若干轮性能优化之后目前的release为4.0版本。测试环境准备两台虚拟机,配置设置为2C4G,分别运行客户端和服务器。Java运行的参数设置为java -jar -server -Xmx1g -Xms1g -XX:MaxDirectMemorySize=1g -XX:+UseG1GC Dub...
Toy-RPC的性能测试与优化
该RPC的介绍参见我之前写的一篇文章链接,Github,经过若干轮性能优化之后目前的release为4.0版本。
测试环境
准备两台虚拟机,配置设置为2C4G,分别运行客户端和服务器。
Java运行的参数设置为java -jar -server -Xmx1g -Xms1g -XX:MaxDirectMemorySize=1g -XX:+UseG1GC Dubbo-RPC-Server.jar
benchmark
代码主要在benchmark模块下。
dubbo与toy-rpc共用同一套client和server代码。
测试用例
在这里参考了一下rpc-benchmark的代码以及文章。使用createUser、existUser、getUser、listUser四个方法来测试性能,这四个对于以CRUD为主的业务系统而言比较有代表性。
Server
接口:
public interface UserService {
boolean existUser(String email);
boolean createUser(User user);
User getUser(long id);
Page<User> listUser(int pageNo);
}
实现:
public class UserServiceBaseImpl implements UserService {
@Override
public boolean existUser(String email) {
if (email == null || email.isEmpty()) {
return true;
}
if (email.charAt(email.length() - 1) < '5') {
return false;
}
return true;
}
@Override
public User getUser(long id) {
User user = new User();
user.setId(id);
user.setName(new String("Doug Lea"));
user.setSex(1);
user.setBirthday(LocalDate.of(1968, 12, 8));
user.setEmail(new String("dong.lea@gmail.com"));
user.setMobile(new String("18612345678"));
user.setAddress(new String("北京市 中关村 中关村大街1号 鼎好大厦 1605"));
user.setIcon(new String("https://www.baidu.com/img/bd_logo1.png"));
user.setStatus(1);
user.setCreateTime(LocalDateTime.now());
user.setUpdateTime(user.getCreateTime());
List<Integer> permissions = new ArrayList<Integer>(
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 19, 88, 86, 89, 90, 91, 92));
user.setPermissions(permissions);
return user;
}
@Override
public Page<User> listUser(int pageNo) {
List<User> userList = new ArrayList<>(15);
for (int i = 0; i < 15; i++) {
User user = new User();
user.setId(i);
user.setName("Doug Lea" + i);
user.setSex(1);
user.setBirthday(LocalDate.of(1968, 12, 8));
user.setEmail("dong.lea@gmail.com" + i);
user.setMobile("18612345678" + i);
user.setAddress("北京市 中关村 中关村大街1号 鼎好大厦 1605" + i);
user.setIcon("https://www.baidu.com/img/bd_logo1.png" + i);
user.setStatus(1);
user.setCreateTime(LocalDateTime.now());
user.setUpdateTime(user.getCreateTime());
List<Integer> permissions = new ArrayList<Integer>(
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 19, 88, 86, 89, 90, 91, 92));
user.setPermissions(permissions);
userList.add(user);
}
Page<User> page = new Page<>();
page.setPageNo(pageNo);
page.setTotal(1000);
page.setResult(userList);
return page;
}
@Override
public boolean createUser(User user) {
if (user == null) {
return false;
}
return true;
}
}
Client
客户端的测试逻辑是设置32个线程,每个线程发出如3000次请求,共用同一个客户端连接。
首先进行若干轮的warmup,避免JIT等运行时优化带来的影响(前几次测试没有做,V6、V7有做),然后是进行若干轮正式的测试,最后对正式测试取一个平均值,将正式测试结果与平均结果都写入到CSV文件中。
测试项为QPS,平均响应时间AVG_RT,P90(90%的请求的RT都小于该值),P99,P999。
@Slf4j
public abstract class AbstractClient {
private int threads;
private int requestsTotal;
private int requestsPerThread;
private ExecutorService executorService;
private UserService userService;
private int measurementIterations;
private int warmupIterations;
protected abstract UserService getUserService();
public void run(String ...strings){
int threads = Integer.parseInt(strings[0]);
int requestTotal = Integer.parseInt(strings[1]);
int warmupIterations = Integer.parseInt(strings[2]);
int measurementIterations = Integer.parseInt(strings[3]);
run(threads, requestTotal,warmupIterations,measurementIterations);
}
public void run(int threads, int requestsTotal, int warmupIterations, int measurementIterations) {
this.threads = threads;
this.requestsTotal = requestsTotal;
this.requestsPerThread = requestsTotal / threads;
this.userService = getUserService();
this.executorService = Executors.newFixedThreadPool(threads);
this.warmupIterations = warmupIterations;
this.measurementIterations = measurementIterations;
createUser();
existUser();
getUser();
listUser();
}
@Data
public class BenchmarkResult {
@CsvBindByName(column = "Time(ms)")
private String mills;
@CsvBindByName(column = "QPS(ms)")
private String qps;
@CsvBindByName(column = "AVG_RT(ms)")
private String avgRt;
@CsvBindByName(column = "P90(ms)")
private String p90;
@CsvBindByName(column = "P99(ms)")
private String p99;
@CsvBindByName(column = "P999(ms)")
private String p999;
@CsvBindByName(column = "Type")
private String index;
private double _mills;
private double _qps;
private double _avgRt;
private double _p90;
private double _p99;
private double _p999;
public BenchmarkResult() {
}
public BenchmarkResult(int index, long nanos, List<Long> rts) {
this.index = "NORMAL-" + index;
double mills = 1.0 * nanos / 1000000;
// 每毫秒的处理请求数
double qps = 1.0 * requestsTotal * 1000000 / nanos;
// 毫秒
double avgRt = 1.0 * rts.stream().mapToLong(x -> x).sum() / 1000000 / requestsTotal;
Collections.sort(rts);
this._mills = mills;
this._qps = qps;
this._avgRt = avgRt;
this._p90 = 1.0 * rts.get((int) (rts.size() * 0.9)) / 1000000;
this._p99 = 1.0 * rts.get((int) (rts.size() * 0.99)) / 1000000;
this._p999 = 1.0 * rts.get((int) (rts.size() * 0.999)) / 1000000;
this.mills = String.format("%.3f", _mills).trim();
this.qps = String.format("%.3f", _qps);
this.avgRt = String.format("%.3f ", _avgRt);
this.p90 = String.format("%.3f", _p90);
this.p99 = String.format("%.3f", _p99);
this.p999 = String.format("%.3f", _p999);
}
}
public BenchmarkResult avgBenchmarkResult(List<BenchmarkResult> benchmarkResults) {
BenchmarkResult result = new BenchmarkResult();
result.index = "AVG";
result.mills = String.format("%.3f", benchmarkResults.stream().mapToDouble(BenchmarkResult::get_mills).average().getAsDouble()).trim();
result.qps = String.format("%.3f", benchmarkResults.stream().mapToDouble(BenchmarkResult::get_qps).average().getAsDouble());
result.avgRt = String.format("%.3f ", benchmarkResults.stream().mapToDouble(BenchmarkResult::get_avgRt).average().getAsDouble());
result.p90 = String.format("%.3f", benchmarkResults.stream().mapToDouble(BenchmarkResult::get_p90).average().getAsDouble());
result.p99 = String.format("%.3f", benchmarkResults.stream().mapToDouble(BenchmarkResult::get_p99).average().getAsDouble());
result.p999 = String.format("%.3f", benchmarkResults.stream().mapToDouble(BenchmarkResult::get_p999).average().getAsDouble());
return result;
}
private void createUser() {
try {
Path benchmark = Paths.get(System.getProperty("user.home"), "benchmark", "createUser.csv");
final Path parent = benchmark.getParent();
if (parent != null) // null will be returned if the path has no parent
Files.createDirectories(parent);
if (!Files.exists(benchmark)) {
Files.createFile(benchmark);
}
BufferedWriter writer = Files.newBufferedWriter(benchmark, StandardOpenOption.WRITE);
StatefulBeanToCsv beanToCsv = new StatefulBeanToCsvBuilder(writer)
.withQuotechar(CSVWriter.NO_QUOTE_CHARACTER)
.withSeparator(CSVWriter.DEFAULT_SEPARATOR)
.withEscapechar('\\').build();
log.info("----------------------------------------------------------------------------");
log.info("createUser started");
List<BenchmarkResult> results = new ArrayList<>();
for (int i = 0; i < warmupIterations + measurementIterations; i++) {
CountDownLatch countDownLatch = new CountDownLatch(threads);
User user = new User();
long id = 1L;
user.setId(id);
user.setName("Doug Lea" + id);
user.setSex(1);
user.setBirthday(LocalDate.of(1968, 12, 8));
user.setEmail("dong.lea@gmail.com" + id);
user.setMobile("18612345678" + id);
user.setAddress("北京市 中关村 中关村大街1号 鼎好大厦 1605" + id);
user.setIcon("https://www.baidu.com/img/bd_logo1.png" + id);
user.setStatus(1);
user.setCreateTime(LocalDateTime.now());
user.setUpdateTime(user.getCreateTime());
List<Integer> permissions = new ArrayList<Integer>(
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 19, 88, 86, 89, 90, 91, 92));
user.setPermissions(permissions);
List<Long> rts = new Vector<>(requestsTotal);
Runnable r = () -> {
for (int j = 0; j < requestsPerThread; j++) {
long begin = System.nanoTime();
try {
userService.createUser(user);
} catch (Exception e) {
e.printStackTrace();
}
rts.add(System.nanoTime() - begin);
}
countDownLatch.countDown();
};
for (int k = 0; k < threads; k++) {
executorService.submit(r);
}
long benchmarkStart = System.nanoTime();
countDownLatch.await();
long nanos = System.nanoTime() - benchmarkStart;
if (i >= warmupIterations) {
results.add(new BenchmarkResult(i - warmupIterations, nanos, rts));
}
}
results.add(avgBenchmarkResult(results));
beanToCsv.write(results);
writer.close();
log.info("createUser end");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (CsvDataTypeMismatchException e) {
e.printStackTrace();
} catch (CsvRequiredFieldEmptyException e) {
e.printStackTrace();
}
}
private void existUser() {
try {
Path benchmark = Paths.get(System.getProperty("user.home"), "benchmark", "existUser.csv");
final Path parent = benchmark.getParent();
if (parent != null) // null will be returned if the path has no parent
Files.createDirectories(parent);
if (!Files.exists(benchmark)) {
Files.createFile(benchmark);
}
BufferedWriter writer = Files.newBufferedWriter(benchmark, StandardOpenOption.WRITE);
StatefulBeanToCsv beanToCsv = new StatefulBeanToCsvBuilder(writer)
.withQuotechar(CSVWriter.NO_QUOTE_CHARACTER)
.withSeparator(CSVWriter.DEFAULT_SEPARATOR)
.withEscapechar('\\').build();
log.info("----------------------------------------------------------------------------");
log.info("existUser started");
List<BenchmarkResult> results = new ArrayList<>();
for (int i = 0; i < warmupIterations + measurementIterations; i++) {
CountDownLatch countDownLatch = new CountDownLatch(threads);
List<Long> rts = new Vector<>(requestsTotal);
Runnable r = () -> {
for (int j = 0; j < requestsPerThread; j++) {
long begin = System.nanoTime();
try {
userService.existUser(j + "@gmail.com");
} catch (Exception e) {
e.printStackTrace();
}
rts.add(System.nanoTime() - begin);
}
countDownLatch.countDown();
};
for (int k = 0; k < threads; k++) {
executorService.submit(r);
}
long benchmarkStart = System.nanoTime();
countDownLatch.await();
long nanos = System.nanoTime() - benchmarkStart;
if (i >= warmupIterations) {
results.add(new BenchmarkResult(i - warmupIterations, nanos, rts));
}
}
results.add(avgBenchmarkResult(results));
beanToCsv.write(results);
writer.close();
log.info("existUser end");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (CsvDataTypeMismatchException e) {
e.printStackTrace();
} catch (CsvRequiredFieldEmptyException e) {
e.printStackTrace();
}
}
private void getUser() {
try {
Path benchmark = Paths.get(System.getProperty("user.home"), "benchmark", "getUser.csv");
final Path parent = benchmark.getParent();
if (parent != null) // null will be returned if the path has no parent
Files.createDirectories(parent);
if (!Files.exists(benchmark)) {
Files.createFile(benchmark);
}
BufferedWriter writer = Files.newBufferedWriter(benchmark, StandardOpenOption.WRITE);
StatefulBeanToCsv beanToCsv = new StatefulBeanToCsvBuilder(writer)
.withQuotechar(CSVWriter.NO_QUOTE_CHARACTER)
.withSeparator(CSVWriter.DEFAULT_SEPARATOR)
.withEscapechar('\\').build();
log.info("----------------------------------------------------------------------------");
log.info("getUser started");
List<BenchmarkResult> results = new ArrayList<>();
for (int i = 0; i < warmupIterations + measurementIterations; i++) {
CountDownLatch countDownLatch = new CountDownLatch(threads);
List<Long> rts = new Vector<>(requestsTotal);
Runnable r = () -> {
for (int j = 0; j < requestsPerThread; j++) {
long begin = System.nanoTime();
try {
userService.getUser(j);
} catch (Exception e) {
e.printStackTrace();
}
rts.add(System.nanoTime() - begin);
}
countDownLatch.countDown();
};
for (int k = 0; k < threads; k++) {
executorService.submit(r);
}
long benchmarkStart = System.nanoTime();
countDownLatch.await();
long nanos = System.nanoTime() - benchmarkStart;
if (i >= warmupIterations) {
results.add(new BenchmarkResult(i - warmupIterations, nanos, rts));
}
}
results.add(avgBenchmarkResult(results));
beanToCsv.write(results);
writer.close();
log.info("getUser end");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (CsvDataTypeMismatchException e) {
e.printStackTrace();
} catch (CsvRequiredFieldEmptyException e) {
e.printStackTrace();
}
}
private void listUser() {
try {
Path benchmark = Paths.get(System.getProperty("user.home"), "benchmark", "listUser.csv");
final Path parent = benchmark.getParent();
if (parent != null) // null will be returned if the path has no parent
Files.createDirectories(parent);
if (!Files.exists(benchmark)) {
Files.createFile(benchmark);
}
BufferedWriter writer = Files.newBufferedWriter(benchmark, StandardOpenOption.WRITE);
StatefulBeanToCsv beanToCsv = new StatefulBeanToCsvBuilder(writer)
.withQuotechar(CSVWriter.NO_QUOTE_CHARACTER)
.withSeparator(CSVWriter.DEFAULT_SEPARATOR)
.withEscapechar('\\').build();
log.info("----------------------------------------------------------------------------");
log.info("listUser started");
List<BenchmarkResult> results = new ArrayList<>();
for (int i = 0; i < warmupIterations + measurementIterations; i++) {
CountDownLatch countDownLatch = new CountDownLatch(threads);
List<Long> rts = new Vector<>(requestsTotal);
Runnable r = () -> {
for (int j = 0; j < requestsPerThread; j++) {
long begin = System.nanoTime();
try {
userService.listUser(j);
} catch (Exception e) {
e.printStackTrace();
}
rts.add(System.nanoTime() - begin);
}
countDownLatch.countDown();
};
for (int k = 0; k < threads; k++) {
executorService.submit(r);
}
long benchmarkStart = System.nanoTime();
countDownLatch.await();
long nanos = System.nanoTime() - benchmarkStart;
if (i >= warmupIterations) {
results.add(new BenchmarkResult(i - warmupIterations, nanos, rts));
}
}
results.add(avgBenchmarkResult(results));
beanToCsv.write(results);
writer.close();
log.info("listUser end");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (CsvDataTypeMismatchException e) {
e.printStackTrace();
} catch (CsvRequiredFieldEmptyException e) {
e.printStackTrace();
}
}
}
测试结果
QPS是越高表示性能越高,RT是越小表示性能越高。
V6和V7添加了warmup,这两次的测试结果如下:
足以看出warmup之后同样代码的运行效率是有提升的,体现了JIT的作用。
优化历程
从上面的测试结果来看,整体的性能可以说是提升了三倍以上,但主要是V1->V2带来的提升,V2之后的版本的性能提升都非常有限。最初版本比Dubbo性能要查,性能最好的版本性能要比Dubbo高出一倍以上。
下面我把每个版本对应的Github的commit版本号都列出来,方便大家去查看diff。
注意后面的版本都是在前面版本的基础上修改的。
V1
CommitV1
服务器配置:
rpc.application.name=benchmark_provider
rpc.application.serialize=jdk
rpc.application.proxy=jdk
rpc.protocol.type=toy
rpc.protocol.port=8000
rpc.protocol.executor.server.type=threadpool
rpc.protocol.executor.client.type=threadpool
rpc.registry.address=127.0.0.1:2181
rpc.cluster.loadbalance=LEAST_ACTIVE
客户端配置:
rpc.application.name=benchmark_consumer
rpc.application.serialize=jdk
rpc.application.proxy=jdk
rpc.protocol.type=toy
rpc.protocol.executor.server.type=threadpool
rpc.protocol.executor.client.type=threadpool
rpc.registry.address=127.0.0.1:2181
rpc.cluster.loadbalance=consistent_hash
rpc.cluster.faulttolerance=failover
V2
CommitV2
将序列化方式由JDK修改为protostuff。
这一版可以看出,序列化对RPC的性能影响是非常大的。
那为什么Protostuff比JDK序列化性能高那么多呢?
具体源码还没研究过,简单看一下应该是JDK等需要在序列化结果中加上类型信息,而另一种不加类型信息的是在IDL文件里写好package和类名,生成的代码直接就有了类型信息。比如protobuf, thrift。缺点是需要生成代码,双方都要知道IDL文件。
public class ProtostuffSerializer extends AbstractSerializer {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
@Override
public <T> byte[] serialize(T obj) {
Class<T> cls = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(cls);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new RPCException(ErrorEnum.SERIALIZER_ERROR, "序列化失败", e);
} finally {
buffer.clear();
}
}
@Override
protected <T> T deserializeOnObject(byte[] data, Class<T> cls, T t) {
try {
Schema<T> schema = getSchema(cls);
ProtostuffIOUtil.mergeFrom(data, t, schema);
return t;
} catch (Exception e) {
throw new RPCException(ErrorEnum.SERIALIZER_ERROR, "反序列化失败", e);
}
}
private static <T> Schema<T> getSchema(Class<T> cls) {
Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
if (schema == null) {
schema = RuntimeSchema.createFrom(cls);
if (schema != null) {
cachedSchema.put(cls, schema);
}
}
return schema;
}
}
V3
CommitV3
对Request和Response使用Recycler,复用对象,减少GC。
这里对GC做了一点额外的性能测试:
在linux中可以使用jstat命令查看GC情况。
S0C:年轻代中第一个survivor(幸存区)的容量 (字节)
S1C:年轻代中第二个survivor(幸存区)的容量 (字节)
S0U:年轻代中第一个survivor(幸存区)目前已使用空间 (字节)
S1U:年轻代中第二个survivor(幸存区)目前已使用空间 (字节)
EC:年轻代中Eden(伊甸园)的容量 (字节)
EU:年轻代中Eden(伊甸园)目前已使用空间 (字节)
OC:Old代的容量 (字节)
OU:Old代目前已使用空间 (字节)
PC:Perm(持久代)的容量 (字节)
PU:Perm(持久代)目前已使用空间 (字节)
YGC:从应用程序启动到采样时年轻代中gc次数
YGCT:从应用程序启动到采样时年轻代中gc所用时间(s)
FGC:从应用程序启动到采样时old代(全gc)gc次数
FGCT:从应用程序启动到采样时old代(全gc)gc所用时间(s)
GCT:从应用程序启动到采样时gc用的总时间(s)
对V2和V3都发起了32threads*10000次请求,服务器的GC情况为:
V2:YGC次数为64,YGC时间为0.249。
V3:YGC次数为64,YGC时间为0.160。
足以看出YGC的时间是减少了的。
Domain
以RPCRequest为例:
public class RPCRequest implements Serializable {
private final transient Recycler.Handle<RPCRequest> handle;
private String requestId;
private String interfaceName;
private String methodName;
private String[] parameterTypes;
private Object[] parameters;
}
Recycler
public class GlobalRecycler {
private static Map<Class<?>, Recycler<?>> RECYCLER = new HashMap<>();
static {
RECYCLER.put(RPCRequest.class, new Recycler<RPCRequest>() {
@Override
protected RPCRequest newObject(Handle<RPCRequest> handle) {
return new RPCRequest(handle);
}
});
RECYCLER.put(RPCResponse.class, new Recycler<RPCResponse>() {
@Override
protected RPCResponse newObject(Handle<RPCResponse> handle) {
return new RPCResponse(handle);
}
});
}
public static boolean isReusable(Class<?> cls) {
return RECYCLER.containsKey(cls);
}
public static <T> T reuse(Class<T> cls) {
if (isReusable(cls)) {
return (T) RECYCLER.get(cls).get();
}
throw new RPCException(ErrorEnum.RECYCLER_ERROR,"该类型对象不可复用:{}",cls);
}
}
更多内容可以去了解一下Netty的Recycler的实现原理,简单地说就是ThreadLocal<Queue<Handle>>
。
V4
CommitV4
Netty:如果底层支持EPOLL,则使用EPOLL;否则使用JDK的NIO。
Netty:TCP_NODELAY
Server:
public void run() {
//两个事件循环器,第一个用于接收客户端连接,第二个用于处理客户端的读写请求
//是线程组,持有一组线程
log.info("支持EPOLL?:{}", Epoll.isAvailable());
bossGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1);
workerGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
log.info("bossGroup:{},workerGroup:{}", bossGroup, workerGroup);
try {
//服务器辅助类,用于配置服务器
ServerBootstrap bootstrap = new ServerBootstrap();
//配置服务器参数
bootstrap.group(bossGroup, workerGroup)
//使用这种类型的NIO通道,现在是基于TCP协议的
.channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
//对Channel进行初始化,绑定实际的事件处理器,要么实现ChannelHandler接口,要么继承ChannelHandlerAdapter类
.childHandler(channelInitializer)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//服务器配置项
//BACKLOG
//TCP维护有两个队列,分别称为A和B
//客户端发送SYN,服务器接收到后发送SYN ACK,将客户端放入到A队列
//客户端接收到后再次发送ACK,服务器接收到后将客户端从A队列移至B队列,服务器的accept返回。
//A和B队列长度之和为backlog
//当A和B队列长度之和大于backlog时,新的连接会被TCP内核拒绝
//注意:backlog对程序的连接数并无影响,影响的只是还没有被accept取出的连接数。
.option(ChannelOption.SO_BACKLOG, 128)
//指定发送缓冲区大小
.option(ChannelOption.SO_SNDBUF, 32 * 1024)
//指定接收缓冲区大小
.option(ChannelOption.SO_RCVBUF, 32 * 1024)
.option(ChannelOption.TCP_NODELAY, true);
//这里的option是针对于上面的NioServerSocketChannel
//复杂的时候可能会设置多个Channel
//.sync表示是一个同步阻塞执行,普通的Netty的IO操作都是异步执行的
//一个ChannelFuture代表了一个还没有发生的I/O操作。这意味着任何一个请求操作都不会马上被执行
//Netty强烈建议直接通过添加监听器的方式获取I/O结果,而不是通过同步等待(.sync)的方式
//如果用户操作调用了sync或者await方法,会在对应的future对象上阻塞用户线程
//绑定端口,开始监听
//注意这里可以绑定多个端口,每个端口都针对某一种类型的数据(控制消息,数据消息)
String host = InetAddress.getLocalHost().getHostAddress();
this.channelFuture = bootstrap.bind(host, getGlobalConfig().getPort()).sync();
//应用程序会一直等待,直到channel关闭
log.info("服务器启动,当前服务器类型为:{}", this.getClass().getSimpleName());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
那么Nio和Epoll有什么差别呢?
- Nio底层依赖JDK,JDK又基于JNI依赖Epoll;Epoll直接JNI依赖Epoll
- Nio使用Epoll的LT,Epoll默认使用Epoll的ET。ET较LT在性能上是有一定提升的,但相应的要求使用者在FD就绪时必须进行读取并一次性读取完毕。更多内容可以去了解一下Epoll的ET与LT。
- Epoll可以使用一些nio没有的配置参数, 如 TCP_CORK, SO_REUSEADDR等等。
V5、V6
CommitV6
将Invocation每次创建局部内部类修改为不生成局部内部类,而是传入Function。
Invocation修改为不每次创建对象,设置为单例;当Filter不存在时,去掉InvokerDelegate,直达ToyInvoker。
这里主要是对代码进行优化。
Invocation是对同步、异步、Oneway等调用方式的抽象。
最初是每次创建方法内部类,每次创建新的类会不断在方法区中创建类,并且动态创建的类调用时效率也会比较低,主要是缺少JIT的优化。
目前是调整为单例,传入Function来调用动态逻辑,并且Function可以作为属性,避免每次去创建。
V7
CommitV7
将动态代理由JDK调整为Javassist,目前看来性能没有较为明显的提升。
这里其实就是在拼接动态代理生成类的Java代码。
public class JavassistRPCProxyFactory extends AbstractRPCProxyFactory {
private CtClass invokerCtClass = new CtClass(Invoker.class.getName()) {
};
private CtClass interceptorCtClass = new CtClass(AbstractRPCProxyFactory.class.getName()) {
};
@Override
protected <T> T doCreateProxy(Class<T> interfaceClass, Invoker<T> invoker) {
T t = null;
try {
String interfaceName = interfaceClass.getName();
ClassPool pool = ClassPool.getDefault();
// 传入类名,最后生成某种Interface
// 我们保证某个interface只会生成一个代理类
CtClass proxyClass = pool.makeClass(interfaceName + "$ProxyInvoker");
// 设置接口类型
proxyClass.setInterfaces(new CtClass[]{pool.getCtClass(interfaceName)});
CtField invokerField = new CtField(this.invokerCtClass, "invoker", proxyClass);
invokerField.setModifiers(Modifier.PRIVATE | Modifier.FINAL);
proxyClass.addField(invokerField);
CtField interceptorField = new CtField(this.interceptorCtClass, "interceptor", proxyClass);
interceptorField.setModifiers(Modifier.PRIVATE | Modifier.FINAL);
proxyClass.addField(interceptorField);
CtConstructor ctConstructor = new CtConstructor(new CtClass[]{this.invokerCtClass, this.interceptorCtClass}, proxyClass);
ctConstructor.setModifiers(Modifier.PUBLIC);
ctConstructor.setBody("{this.invoker=$1;this.interceptor=$2;}");
proxyClass.addConstructor(ctConstructor);
Method[] methods = interfaceClass.getMethods();
for (int i = 0; i < methods.length; i++) {
Method method = methods[i];
addInterfaceMethod(interfaceName, proxyClass, method);
}
String source = proxyClass.toString();
log.info("source:{}", source);
t = interfaceClass.cast(proxyClass.toClass().getConstructor(Invoker.class, AbstractRPCProxyFactory.class).newInstance(invoker, this));
} catch (Exception e) {
e.printStackTrace();
throw new RPCException(ErrorEnum.CREATE_PROXY_ERROR, "生成Javassist动态代理失败", e);
}
return t;
}
private static void addInterfaceMethod(String classToProxy, CtClass implementer, Method method) throws CannotCompileException {
String methodCode = generateMethodCode(classToProxy, method);
CtMethod cm = CtNewMethod.make(methodCode, implementer);
implementer.addMethod(cm);
}
private static String generateMethodCode(String interfaceName, Method method) {
String methodName = method.getName();
String methodReturnType = method.getReturnType().getName();
Class[] parameterTypes = method.getParameterTypes();
Class[] exceptionTypes = method.getExceptionTypes();
//组装方法的Exception声明
StringBuilder exceptionBuffer = new StringBuilder();
if (exceptionTypes.length > 0) exceptionBuffer.append(" throws ");
for (int i = 0; i < exceptionTypes.length; i++) {
if (i != exceptionTypes.length - 1) {
exceptionBuffer.append(exceptionTypes[i].getName()).append(",");
} else {
exceptionBuffer.append(exceptionTypes[i].getName());
}
}
//组装方法的参数列表
StringBuilder parameterBuffer = new StringBuilder();
for (int i = 0; i < parameterTypes.length; i++) {
Class parameter = parameterTypes[i];
String parameterType = parameter.getName();
//动态指定方法参数的变量名
String refName = "a" + i;
parameterBuffer.append(parameterType).append(" " + refName);
if (i != parameterTypes.length - 1) {
parameterBuffer.append(",");
}
}
//方法声明,由于是实现接口的方法,所以是public
StringBuilder methodDeclare = new StringBuilder();
methodDeclare.append("public ").append(methodReturnType).append(" ").append(methodName).append("(").append(parameterBuffer).append(")").append(exceptionBuffer).append(" {");
// methodDeclare.append("System.out.println(a0);");
// methodDeclare.append("System.out.println(new Object[]{a0});");
// 方法体
methodDeclare.append("Object returnObj = interceptor.invokeProxyMethod(")
.append("invoker").append(",")
.append("\"")
.append(interfaceName).append("\"")
.append(",")
.append("\"").append(methodName).append("\"")
.append(",")
.append("new String[]{");
for (int i = 0; i < parameterTypes.length; i++) {
methodDeclare.append("\"").append(parameterTypes[i].getName()).append("\"");
}
methodDeclare.append("}");
methodDeclare.append(",");
//传递方法里的参数
methodDeclare.append("new Object[]{");
for (int i = 0; i < parameterTypes.length; i++) {
methodDeclare.append("($w)a").append(i);
if (i != parameterTypes.length - 1) {
methodDeclare.append(",");
}
}
methodDeclare.append("});");
if (method.getReturnType().isPrimitive()) {
if (method.getReturnType().equals(Boolean.TYPE))
methodDeclare.append("return ((Boolean)returnObj).booleanValue();\n");
else if (method.getReturnType().equals(Integer.TYPE))
methodDeclare.append("return ((Integer)returnObj).intValue();\n");
else if (method.getReturnType().equals(Long.TYPE))
methodDeclare.append("return ((Long)returnObj).longValue();\n");
else if (method.getReturnType().equals(Float.TYPE))
methodDeclare.append("return ((Float)returnObj).floatValue();\n");
else if (method.getReturnType().equals(Double.TYPE))
methodDeclare.append("return ((Double)returnObj).doubleValue();\n");
else if (method.getReturnType().equals(Character.TYPE))
methodDeclare.append("return ((Character)returnObj).charValue();\n");
else if (method.getReturnType().equals(Byte.TYPE))
methodDeclare.append("return ((Byte)returnObj).byteValue();\n");
else if (method.getReturnType().equals(Short.TYPE))
methodDeclare.append("return ((Short)returnObj).shortValue();\n");
} else {
methodDeclare.append("return (" + methodReturnType + ")returnObj;\n");
}
methodDeclare.append("}");
// System.out.println(methodDeclare.toString());
return methodDeclare.toString();
}
}
总结
在前段时间实习的时候,师兄带着我参加了天池中间件性能挑战赛,感受到了把软件性能一步步提升的乐趣,重复这样一个“尝试优化-性能测试-根据反馈进行调整”的过程是非常有成就感的,这也是我做Toy-RPC性能优化的一个初衷。在这过程中确实也学到了很多知识点,比如Netty的一些组件、序列化协议等实现都是可以继续往下深挖的,后续可能会进一步做一些性能优化。谢谢!
更多推荐
所有评论(0)