redis网络编程——Selector模型
JNI什么是JNI?JNI是Java Native Interface的缩写,通过使用 Java本地接口书写程序,可以确保代码在不同的平台上方便移植。提供了将Java与C/C++、汇编等本地代码集成的方案,该规范使得在 Java 虚拟机内运行的 Java 代码能够与其它编程语言互相操作,包括创建本地方法、更新Java对象、调用Java方法,引用 Java类,捕捉和抛出异常等,也允许 Java...
JNI
什么是JNI?
JNI是Java Native Interface的缩写,通过使用 Java本地接口书写程序,可以确保代码在不同的平台上方便移植。提供了将Java与C/C++、汇编等本地代码集成的方案,该规范使得在 Java 虚拟机内运行的 Java 代码能够与其它编程语言互相操作,包括创建本地方法、更新Java对象、调用Java方法,引用 Java类,捕捉和抛出异常等,也允许 Java代码调用 C/C++或汇编语言编写的程序和库。
定义一个native方法,并通过C语言实现
- 定义native方法,并且装载库;
public class TestHello {
public native void testHello();
public static void main(String[] args) {
//装载库,保证JVM在启动的时候就会装载,故而一般是也给static
System.loadLibrary("test");
TestHello th = new TestHello();
th.testHello();
}
}
- javac 生成class编译文件;
javah 包名+类名
生成.h头文件;
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class com_TestHello */
#ifndef _Included_com_TestHello
#define _Included_com_TestHello
#ifdef __cplusplus
extern "C" {
#endif
/*
* Class: com_TestHello
* Method: testHello
* Signature: ()V
*/
JNIEXPORT void JNICALL Java_com_TestHello_testHello
(JNIEnv *, jobject);
#ifdef __cplusplus
}
#endif
#endif
- 创建c文件,引入头文件,创建与头文件方法名参数完全一致的方法,在这个方法里通过c实现要干的事情;
#include<stdio.h>
#include "com_TestHello.h"
JNIEXPORT void JNICALL Java_com_TestHello_testHello
(JNIEnv *env, jobject c1){
printf("Hello World");
}
- 编译c文件
gcc
- 便以一个动态链接库;
gcc -fPIC -I /usr/local/src/jdk/jdk1.8/include -I /usr/local/src/jdk/jdk1.8/include/linux -shared -o libtest.so testHello.
- 把这个库所在的目录添加到path
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/root123/Desktop/com
- 然后执行
java com/TestHello
;
然后看到打印出了内容;
socket
当java中需要创建一个socket对象,然后虚拟机去执行发现需要一个socket,然后就通过C代码去操作系统中建立socket;
这是os中的socket,为通信创建端点;
自定义一个socket文件来操作,不适用jdk提供的native方法。
#include<stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "com_ServerSocket.h"
JNIEXPORT void JNICALL Java_com_ServerSocket_conn(JNIEnv *env, jobject c1){
//调用OS socket函数
//第一个参数表示域参数指定通信域;
//第二个参数指定套接字的类型
//返回一个文件描述符
int lfd = socket(AF_INET,SOCK_STREAM,0);
//封装一个结构体
struct sockaddr_in my_addr;
my_addr.sin_family = AF_INET; // ipv4
//htons将整形变量从主机字节顺序变成网络字节顺序,
my_addr.sin_port = htons(8080);
//设置当前主机任何一个可用的IP地址
my_addr.sin_addr.s_addr = htonl(INADDR_ANY);//
//当使用socket创建套接字时,它存在于命名空间中,但没有分配地址给它,bind()将制定的地址分配到文件引用的套接字描述符。就是分配一个name给这个socket;
//绑定IP地址、端口等信息
//第一个参数上面的socket描述符,第二个参数封装的结构体,第三个参数地址的大小
bind(lfd, (struct sockaddr*)&my_addr, sizeof(my_addr));
//监听当前socket在同一时刻能够接收的连接数
listen(lfd, 128);
printf("listen client @port=%d...\n",8080);
struct sockaddr_in client_addr;
char cli_ip[INET_ADDRSTRLEN] = "";
socklen_t cliaddr_len = sizeof(client_addr);
int connfd = 0;
connfd = accept(lfd, (struct sockaddr*)&client_addr, &cliaddr_len);
//将被填充了的client_addr放入定义的数组中;
inet_ntop(AF_INET, &client_addr.sin_addr, cli_ip, INET_ADDRSTRLEN);
printf("----------------------------------------------\n");
//将客户端 端口网络字节序转为本地字节序并打印
printf("client ip=%s,port=%d\n", cli_ip,ntohs(client_addr.sin_port));
//到此已经连接成功,开始监听读取数据
char recv_buf[512] = "";
while(1)
{
int k =read(connfd,recv_buf,sizeof(recv_buf));
printf("recv data=%d\n",k);
printf("%s\n",recv_buf);
}
close(connfd);
printf("client closed!\n");
close(lfd);
}
int main(){
return 0;
}
文件描述符
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。
传入传出参数
传入参数:为本身值,传入函数让函数使用;
传出参数:本身没值,从函数中带出值;
传入传出参数:传入一个值,函数改变之后,再传出来;
accept()
accept()系统调用用于基于连接的套接字类型(SOCK_STREAM、SOCK_SEQPACKET)。它提取了第一个监听套接字(客户端)的挂起连接队列上的连接请求,创建一个新的连接套接字,并返回一个引用该套接字的新文件描述符。新创建的套接字没有处于监听状态。原始套接字sockfd不受此调用的影响。
- 第一个参数:传入参数;一个文件描述符,表示已经创建的一个socket,并且已经使用bind()绑定到本地地址,它正在监听之后的连接;
- 第二个参数:传出参数;参数addr是指向sockaddr结构的指针。这个结构被填满了与对等套接字的地址;可以理解为传入的是个指针,当后面后客户端来连接的时候就指向所创建的客户端的socket;
- 第三个参数:传入传出参数;传入时是sockaddr结构体大小,传出时也是这个结构体的大小,因为这个结构体在accept会被填充;
- 返回值:一个整型表示为接收到的客户端创建的socket的文件描述符;
编写java类来调用我们写好的C文件
public class ServerSocket {
static {
//JVM启动的时候会装载这个类库
System.loadLibrary("ServerSocket");
}
//定义一个本地方法,调用我们用C创建的server文件
public native void conn();
public static void main(String[] args) {
ServerSocket ss = new ServerSocket();
ss.conn();
}
}
编译java文件,生成.h文件
javac ServerSocket.java
cd ..
javah com.ServerSocket
在C文件头部引入
#include "com_ServerSocket.h"
C中调用的方法要参考h文件方法名,改变参数;
JNIEXPORT void JNICALL Java_com_ServerSocket_conn(JNIEnv *env, jobject c1){
将C文件编译成动态连接库
gcc -fPIC -I /usr/local/src/jdk/jdk1.8/include -I /usr/local/src/jdk/jdk1.8/include/linux -shared -o libServerSocket.so server.c
libServerSocket.so是lib+在java中指定的类库名+.so;
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/root123/Desktop/com
可以看到服务端socket已经在监听连接了;
我们使用centos中的nc 来连接;
安装yum install nc.x86_64 -y
测试:
客户端:
服务端:因为我们在C文件中只要有客户端连接就打印ip和端口号;
通信流程
public static void main(String[] args) throws IOException {
byte[] bytes = new byte[1024];
ServerSocket serverSocket = new ServerSocket();
//绑定Ip和端口
serverSocket.bind(new InetSocketAddress(9876));
//接收连接,执行到这里会阻塞监听,直到有客户端来连接
Socket accept = serverSocket.accept();
//阻塞读取数据
accept.getInputStream().read(bytes);
}
BIO
服务端accept等待连接,这时客户端A来连接,accept执行,read阻塞等待A发送数据,这时客户端B来连接的话是连接不上的,因为服务端没有在监听连接而是在监听A的数据,所以单线程是无法解决并发的;
那么怎么使用单线程解决
public static void main(String[] args) throws IOException {
//记录已经连接的socket
List<Socket> clientList=new ArrayList<>();
byte[] bytes = new byte[1024];
ServerSocket serverSocket = new ServerSocket();
//绑定Ip和端口
serverSocket.bind(new InetSocketAddress(9876));
while (true){
//接收连接,执行到这里会阻塞监听,直到有客户端来连接
Socket accept = serverSocket.accept();
//伪代码,accept设为非阻塞,就可以支持并发连接
accept.noblock();
//将链接上的socket记录
clientList.add(accept);
//轮询读取已经连接上的socket,如果有人发数据就读取
for (Socket socket : clientList) {
//阻塞读取数据
int read = socket.getInputStream().read(bytes);
if (read!=0){
System.out.println("处理数据");
}
}
}
}
单线程处理的瓶颈在于存放socket的集合clientList,如果在高并发情况下轮询list,则需要耗费大量的时间而且这个集合的大小取决于jvm堆内存大小所以限制非常大。可以将这个list轮询交给操作系统中的select()去处理,直接由操作系统内核来执行,这样空间大小取决于主机内存大小,而且操作系统内核轮询的速度更快。
使用java 提供的nio API
public class MySocket {
public static void main(String[] args) {
List<SocketChannel> list = new ArrayList();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
//使用nio、非阻塞
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(9091));
//设置非阻塞
ssc.configureBlocking(false);
while (true){
//非阻塞
SocketChannel socketChannel = ssc.accept();
if(socketChannel==null){
Thread.sleep(1000);
System.out.println("没人连接");
for (SocketChannel channel : list) {
int k =channel.read(byteBuffer);
System.out.println(k);
if(k!=0){
byteBuffer.flip();
System.out.println(new String(byteBuffer.array()));
}
}
}else{
//如果有人来连接,就创建一个socket,并且将read方法设置为非阻塞。
socketChannel.configureBlocking(false);
list.add(socketChannel);
//得到套接字,循环所有的套接字,通过套接字获取数据
for (SocketChannel channel : list) {
int k =channel.read(byteBuffer);
System.out.println(k+"=======================================");
if(k!=0){
byteBuffer.flip();
System.out.println(new String(byteBuffer.array()));
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如果没人来连接,我就一直循环list,看看已经连接的socket有没有发送数据,如果有人来连接,我就将服务端给客户端创建返回的socket的read方法设置为非阻塞,然后也轮询socket。
select
当操作系统运行server.c创建一个服务端socket时,就创建了一个进程,当这个进程一旦运行起来,操作系统就会为它创建一张文件描述符表。当有客户端来连接的时候操作系统就会创建一个IP+端口的socket文件(因为linux中一切皆文件),并返回一个文件描述符(对应文件位置在表中的索引),而文件描述符就在文件描述符表中。这个文件描述符表最多能存1024个文件描述符(select只能连接1024个),而且三个还被鼠标文件描述符、键盘文件描述符、异常文件描述符所占用。
NIO底层用的是什么IO模型?
根据操作系统来确定如果是windows系统则使用的是Selector IO多路复用模型;
关于OS select函数的介绍:
select()允许程序监视多个文件描述符,等待直到一个或多个文件描述符为某些类型的IO操作做好准备。一个文件描述符如果可能有非阻塞的IO操作那么则认为它准备好了。
提供了四个宏操作集合,FD_ZERO()清除一个集合、FD_SET和FD_CLR()分别添加和删除一个给定的文件描述符、FD_ISSET()用来测试文件描述符是否在集合中。
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
- nfds:表示集合最大长度+1;
- timeout:表示select()返回时间上限,如果timeval 结构体的两个属性都为0,则立即返回;如果timeout为0,select()可能会阻塞;
- readfds:指向有读事件发生的文件描述符的集合;
- writefds:指向有写事件发生的文件描述符的集合;
- exceptfds:指向发生异常的文件描述符的集合;
- 返回值:如果成功,select()返回上面三个集合中总的发生事件的文件描述符的个数,如果发生超时则可能返回0,如果发生错误返回-1;
当服务端启动时候先将创建listen文件描述符放到集合中返回,再有客户端A连接的时候,listen(对listen来说是有读事件发生了)就创建一个与客户端对应的进程A1(socket文件描述符),让A与A1通信;listen继续监听;
#include<stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
void start(){
struct sockaddr_in my_addr;
my_addr.sin_family = AF_INET; // ipv4
my_addr.sin_port = htons(8080);
my_addr.sin_addr.s_addr = htonl(INADDR_ANY);//
struct sockaddr_in client_addr;
char cli_ip[INET_ADDRSTRLEN] = "";
int clientfd = 0;
int listenfd = socket(AF_INET,SOCK_STREAM,0);
bind(listenfd, (struct sockaddr*)&my_addr, sizeof(my_addr));
listen(listenfd, 128);
printf("listen client @port=%d...\n",8080);
//connfd = accept(lfd, (struct sockaddr*)&client_addr, &cliaddr_len);
//inet_ntop(AF_INET, &client_addr.sin_addr, cli_ip, INET_ADDRSTRLEN);
//printf("----------------------------------------------\n");
//printf("client ip=%s,port=%d\n", cli_ip,ntohs(client_addr.sin_port));
int lastfd = listenfd;
int i;
fd_set readset,totalSet;
//将读事件集合清零
FD_ZERO(&readset);
FD_SET(listenfd, &totalSet);
//轮询
while(1)
{
readset = totalSet;
//如果有人来连接或发送数据,就把发生事件的socket文件描述符放到readset中,并返回有发生事件socket的个数
int z = select(lastfd+1,&readset,NULL,NULL,NULL);
//如果z大于0说明read集合里有文件描述符,有读事件发生
//但是读事件分为两种情况:
//1、有人来连接,listen上发生读事件;
//2、有人发数据,其他socket发生读事件,所以要判断情况处理
if(z>0){
//判断读集合中是否包含listen,如果包含则说明有人来连接;
//那就accept创建一个socket并返回它的文件描述符,并且将返回的文件描述符放到总集合中去被监听 ;
if(FD_ISSET(listenfd,&readset)){
socklen_t cliaddr_len = sizeof(client_addr);
clientfd = accept(listenfd, (struct sockaddr*)&client_addr, &cliaddr_len);
inet_ntop(AF_INET, &client_addr.sin_addr, cli_ip, INET_ADDRSTRLEN);
printf("----------------------------------------------\n");
printf("client ip=%s,port=%d\n", cli_ip,ntohs(client_addr.sin_port));
FD_SET(clientfd, &totalSet);
lastfd=clientfd;
//判断z是否为1,如果为1说明只有listen一个,那就是只有连接,
//没有其他人发送数据,那就重新监听
if(0==--z){
continue;
}
}
//否则说明,除了有人连接,还有人发送数据,遍历read集合,读取发送数据的socket
for(i=listenfd+1;i<=lastfd;i++){
if(FD_ISSET(i,&readset)){
char recv_buf[512] = "";
int rs=read(i,recv_buf,sizeof(recv_buf));
//如果读到=0说明这个客户端断开,那就关闭连接,
//并且从total集合中将它删除,不再监听
if(rs==0){
close(i);
FD_CLR(i,&totalSet);
}else{
printf("%s\n",recv_buf);
//write(0,recv_buf,rs);
}
}
}
}
}
}
int main(){
start();
return 0;
}
更多推荐
所有评论(0)