一.为什么需要进程间通信

1).数据传输

一个进程需要将它的数据发送给另一个进程。

2).资源共享

多个进程之间共享同样的资源。

3).通知事件

一个进程需要向另一个或一组进程发送消息,通知它们发生了某种事件。

4).进程控制

有些进程希望完全控制另一个进程的执行(如Debug进程),该控制进程希望能够拦截另一个进程的所有操作,并能够及时知道它的状态改变。

注:为什么要用进程,不用进程? 

线程快?进程安全?线程的创建与销毁消耗资源小?

二.什么是进程间通信

首先了解几个名词:

1. 进程隔离

       进程隔离是为保护操作系统中进程互不干扰而设计的一组不同硬件和软件的技术。这个技术是为了避免进程A写入进程B的情况发生。 进程的隔离实现,使用了虚拟地址空间。进程A的虚拟地址和进程B的虚拟地址不同,这样就防止进程A将数据信息写入进程B。

2.虚拟地址空间

        就32位系统而言,当创建一个进程时,操作系统会为该进程分配一个 4GB 大小的虚拟进程地址空间。之所以是 4GB ,是因为在 32 位的操作系统中,一个指针长度是 4 字节,而 4 字节指针的寻址能力是从 0x00000000~0xFFFFFFFF ,最大值 0xFFFFFFFF 表示的即为 4GB 大小的容量。与虚拟地址空间相对的,还有一个物理地址空间,这个地址空间对应的是真实的物理内存。要注意的是这个 4GB 的地址空间是“虚拟”的,并不是真实存在的,而且每个进程只能访问自己虚拟地址空间中的数据,无法访问别的进程中的数据,通过这种方法实现了进程间的地址隔离。

        针对 Linux 操作系统,将最高的1G字节(从虚拟地址 0xC0000000 到 0xFFFFFFFF )供内核使用,称为内核空间,而较低的 3G 字节(从虚拟地址 0x00000000 到0xBFFFFFFF),供各个进程使用,称为用户空间。每个进程都可以通过系统调用进入到内核。其中在 Linux 系统中,进程的用户空间是独立的,而内核空间是共有的,进程切换时,用户空间切换,内核空间不变

        创建虚拟地址空间目的是为了解决进程地址空间隔离的问题。但程序要想执行,必须运行在真实的内存上,所以,必须在虚拟地址与物理地址间建立一种映射关系。这样,通过映射机制,当程序访问虚拟地址空间上的某个地址值时,就相当于访问了物理地址空间中的另一个值。人们想到了一种分段、分页的方法,它的思想是在虚拟地址空间和物理地址空间之间做一一映射。这种思想理解起来并不难,操作系统保证不同进程的地址空间被映射到物理地址空间中不同的区域上,这样每个进程最终访问到的物理地址空间都是彼此分开的。通过这种方式,就实现了进程间的地址隔离。

进程间通信(IPC,InterProcess Communication)是指在不同进程之间传播或交换信息。

 注:同时在不同终端运行同一个bin文件,不同终端的bin文件在运行时有什么是相同的?

三.IPC通信原理

      每个进程各自有不同的用户地址空间,任何一个进程的全局变量在另一个进程中都看不到,所以进程之间要交换数据必须通过内核,在内核中开辟一块缓冲区,进程1把数据从用户空间拷到内核缓冲区,进程2再从内核缓冲区把数据读走,内核提供的这种机制称为进程间通信机制。通常的做法是消息发送方将要发送的数据存放在内存缓存区中,通过系统调用进入内核态。然后内核程序在内核空间分配内存,开辟一块内核缓存区,内核空间调用 copy_from_user() 函数将数据从用户空间的内存缓存区拷贝到内核空间的内核缓存区中。同样的,接收方进程在接收数据时在自己的用户空间开辟一块内存缓存区,然后内核程序调用 copy_to_user() 函数将数据从内核缓存区拷贝到接收进程的用户空间内存缓存区。这样数据发送方进程和数据接收方进程就完成了一次数据传输,我们称完成了一次进程间通信。

主要的过程如下图所示:

四.通信方式

        IPC的方式通常有linux下的 管道(Streams)(包括无名管道和命名管道)、消息队列、信号量、信号、共享存储、Socket等。其中 Socket和Streams支持不同主机上的两个进程IPC,以及android下的Binder。

以Linux中的C语言编程为例。

一、管道

管道,通常指无名管道,是 UNIX 系统IPC最古老的形式。

1、特点:

  1. 它是半双工的(即数据只能在一个方向上流动),具有固定的读端和写端。

  2. 它只能用于具有亲缘关系的进程之间的通信(也是父子进程或者兄弟进程之间)。

  3. 它可以看成是一种特殊的文件,对于它的读写也可以使用普通的read、write 等函数。但是它不是普通的文件,并不属于其他任何文件系统,并且只存在于内存中。

2、原型:

#include <unistd.h>
int pipe(int fd[2]);    // 返回值:若成功返回0,失败返回-1

当一个管道建立时,调用pipe函数 在内核中开辟一块缓冲区用于通信,它会创建两个文件描述符:fd[0]为读而打开,fd[1]为写而打开。如下图:

要关闭管道只需将这两个文件描述符关闭即可。

3、例子

单个进程中的管道几乎没有任何用处。所以,通常调用 pipe 的进程接着调用 fork,这样就创建了父进程与子进程之间的 IPC 通道。如下图所示:

若要数据流从父进程流向子进程,则关闭父进程的读端(fd[0])与子进程的写端(fd[1]);反之,则可以使数据流从子进程流向父进程。

#include<stdio.h>
#include<unistd.h>

int main()
{
    int fd[2];  // 两个文件描述符
    pid_t pid;
    char buff[20];

    if(pipe(fd) < 0)  // 创建管道
        printf("Create Pipe Error!\n");

    if((pid = fork()) < 0)  // 创建子进程
    {
        printf("Fork Error!\n");
    }
    else if(pid > 0)  // 父进程
    {
        close(fd[0]); // 关闭读端
        write(fd[1], "hello world\n", 12);
    }
    else
    {
        close(fd[1]); // 关闭写端
        read(fd[0], buff, 20);
        printf("%s", buff);
    }

    return 0;
}

二、FIFO

FIFO,也称为命名管道,它是一种文件类型。

1、特点

  1. FIFO可以在无关的进程之间交换数据,与无名管道不同。

  2. FIFO有路径名与之相关联,它以一种特殊设备文件形式存在于文件系统中。

2、原型

#include <sys/stat.h>
// 返回值:成功返回0,出错返回-1
int mkfifo(const char *pathname, mode_t mode);

其中的 mode 参数与open函数中的 mode 相同。一旦创建了一个 FIFO,就可以用一般的文件I/O函数操作它。

当 open 一个FIFO时,是否设置非阻塞标志(O_NONBLOCK)的区别:

  • 若没有指定O_NONBLOCK(默认),只读 open 要阻塞到某个其他进程为写而打开此 FIFO。类似的,只写 open 要阻塞到某个其他进程为读而打开它。

  • 若指定了O_NONBLOCK,则只读 open 立即返回。而只写 open 将出错返回 -1 如果没有进程已经为读而打开该 FIFO,其errno置ENXIO。

3、例子

FIFO的通信方式类似于在进程中使用文件来传输数据,只不过FIFO类型文件同时具有管道的特性。在数据读出时,FIFO管道中同时清除数据,并且“先进先出”。下面的例子演示了使用 FIFO 进行 IPC 的过程:

write_fifo.c

#include<stdio.h>
#include<stdlib.h>   // exit
#include<fcntl.h>    // O_WRONLY
#include<sys/stat.h>
#include<time.h>     // time

int main()
{
    int fd;
    int n, i;
    char buf[1024];
    time_t tp;

    printf("I am %d process.\n", getpid()); // 说明进程ID

    if((fd = open("fifo1", O_WRONLY)) < 0) // 以写打开一个FIFO
    {
        perror("Open FIFO Failed");
        exit(1);
    }

    for(i=0; i<10; ++i)
    {
        time(&tp);  // 取系统当前时间
        n=sprintf(buf,"Process %d's time is %s",getpid(),ctime(&tp));
        printf("Send message: %s", buf); // 打印
        if(write(fd, buf, n+1) < 0)  // 写入到FIFO中
        {
            perror("Write FIFO Failed");
            close(fd);
            exit(1);
        }
        sleep(1);  // 休眠1秒
    }

    close(fd);  // 关闭FIFO文件
    return 0;
}

read_fifo.c

#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<fcntl.h>
#include<sys/stat.h>

int main()
{
    int fd;
    int len;
    char buf[1024];

    if(mkfifo("fifo1", 0666) < 0 && errno!=EEXIST) // 创建FIFO管道
        perror("Create FIFO Failed");

    if((fd = open("fifo1", O_RDONLY)) < 0)  // 以读打开FIFO
    {
        perror("Open FIFO Failed");
        exit(1);
    }

    while((len = read(fd, buf, 1024)) > 0) // 读取FIFO管道
        printf("Read message: %s", buf);

    close(fd);  // 关闭FIFO文件
    return 0;
}

        上述例子可以扩展成 客户进程—服务器进程 通信的实例,write_fifo的作用类似于客户端,可以打开多个客户端向一个服务器发送请求信息,read_fifo类似于服务器,它适时监控着FIFO的读端,当有数据时,读出并进行处理,但是有一个关键的问题是,每一个客户端必须预先知道服务器提供的FIFO接口,下图显示了这种安排:

三、消息队列

消息队列,顾名思义,想必看到的公共资源可能就是一种队列。这种队列满足数据结构里队列的特点:先进先出。消息对列提供了一个进程向另一个进程发送一块数据快的通信方法,注意,是以块为基本单位,前面的管道是以字节流为基本单位。每个数据块都被认为是由类型的。接收者进程接受数据块可以有不同的类型值,比如可以是结构体型。每个消息 队列的最大长度是上限的(MSGMAX),每个消息队列的总的字节数也是有上限的(MSGMNB),系统的消息队列总数也是有上线的(MSGMNI),

#define MSGMNI  16         // 消息队列总数上线 
#define MSGMAX  8192       // 消息队列最大长度上线
#define MSGMNB  16384      // 消息队列总的字节数上线

 其整体结构可以理解如下:

 一个消息队列由一个标识符(即队列ID)来标识。每个消息队列都有一个队列头,用结构struct msg_queue来描述。队列头中包含了该消息队列的大量信息,包括消息队列键值、用户ID、组ID、消息队列中消息数目等等,甚至记录了最近对消息队列读写进程的ID。读者可以访问这些信息,也可以设置其中的某些信息。

结构msg_queue用来描述消息队列头,存在于内核空间:

 struct msg_queue {
    struct kern_ipc_perm q_perm;
    time_t q_stime;        /* last msgsnd time */
    time_t q_rtime;        /* last msgrcv time */
    time_t q_ctime;        /* last change time */
    unsigned long q_cbytes;    /* current number of bytes on queue */
    unsigned long q_qnum;      /* number of messages in queue */
    unsigned long q_qbytes;    /* max number of bytes on queue */
    pid_t q_lspid;          /* pid of last msgsnd */
    pid_t q_lrpid;          /* last receive pid */
    struct list_head q_messages;
    struct list_head q_receivers;
    struct list_head q_senders;
};

 结构msqid_ds用来设置或返回消息队列的信息,存在于用户空间:

struct msqid_ds {
    struct ipc_perm msg_perm;
    struct msg *msg_first;      /* first message on queue,unused  */
    struct msg *msg_last;      /* last message in queue,unused */
    __kernel_time_t msg_stime;  /* last msgsnd time */
    __kernel_time_t msg_rtime;  /* last msgrcv time */
    __kernel_time_t msg_ctime;  /* last change time */
    unsigned long  msg_lcbytes; /* Reuse junk fields for 32 bit */
    unsigned long  msg_lqbytes; /* ditto */
    unsigned short msg_cbytes;  /* current number of bytes on queue */
    unsigned short msg_qnum;    /* number of messages in queue */
    unsigned short msg_qbytes;  /* max number of bytes on queue */
    __kernel_ipc_pid_t msg_lspid;  /* pid of last msgsnd */
    __kernel_ipc_pid_t msg_lrpid;  /* last receive pid */
};

下图说明了内核与消息队列是怎样建立起联系的:

其中:struct ipc_ids msg_ids是内核中记录消息队列的全局数据结构;struct msg_queue是每个消息队列的队列头。

 从上图可以看出,全局数据结构 struct ipc_ids msg_ids 可以访问到每个消息队列头的第一个成员:struct kern_ipc_perm;而每个struct kern_ipc_perm能够与具体的消息队列对应起来是因为在该结构中,有一个key_t类型成员key,而key则唯一确定一个消息队列。 kern_ipc_perm结构如下:

struct kern_ipc_perm{  //内核中记录消息队列的全局数据结构msg_ids能够访问到该结构;
    key_t  key;    //该键值则唯一对应一个消息队列
    uid_t  uid;
    gid_t  gid;
    uid_t  cuid;
    gid_t  cgid;
    mode_t  mode;
    unsigned long seq;
}

ipc_perm结构体如下:

struct ipc_perm {
    key_t key;                        /* Key supplied to msgget() */
    uid_t uid;                         /* Effective UID of owner */
    gid_t gid;                        /* Effective GID of owner */
    uid_t cuid;                       /* Effective UID of creator */
    gid_t cgid;                      /* Effective GID of creator */
    unsigned short mode;    /* Permissions */
    unsigned short seq;       /* Sequence number */
};

 

1、特点

    1.消息队列是消息的链表,具有特定的格式,存放在内存中并由消息队列标识符标识.
    2.消息队列允许一个或多个进程向它写入与读取消息.
    3.管道和命名管道都是通信数据都是先进先出的原则。
    4.消息队列可以实现消息的随机查询,消息不一定要以先进先出的次序读取,也可以按消息的类型读取.比FIFO更有优势。

2、原型

#include <sys/msg.h>
#include <sys/types.h>
#include <sys/ipc.h>

key_t ftok(char *pathname, char proj);
  返回与路径pathname相对应的一个键值
  pathname:文件名(含路径),通常设置为当前目录“.” 比如projid为'a',则为"./a"文件
  projid:项目ID,必须为非0整数(0-255).


int msgget(key_t key, int flag); 
 创建或打开消息队列:成功返回队列ID,失败返回-1
 flag:
   IPC_CREAT:创建新的消息队列。
   IPC_EXCL:与IPC_CREAT一同使用,表示如果要创建的消息队列已经存在,则返回错误。
   IPC_NOWAIT:读写消息队列要求无法满足时,不阻塞


int msgsnd(int msqid,  struct msgbuf *msgp,  size_t msgsz,  int msgflag);
 添加消息:成功返回0,失败返回-1
 msqid:已打开的消息队列id
 msgp:存放消息的结构体指针。
 msgflag:函数的控制属性。
 消息结构msgbuf为:
 struct msgbuf
 {
    long mtype;//消息类型
    char mtext[1];//消息正文,消息数据的首地址,这个数据的最大长度为8012吧,又可把他看成是一个结构,也有类型和数据,recv时解析即可。
  }
  msgsz:消息数据的长度。
  msgflag:
      IPC_NOWAIT: 指明在消息队列没有足够空间容纳要发送的消息时,msgsnd立即返回。
      0:msgsnd调用阻塞直到条件满足为止.(一般选这个)


int msgrcv(int msqid,  struct msgbuf *msgp,  size_t msgsz,  long msgtype,  int msgflag);
读取消息:成功返回消息数据的长度,失败返回-1
msqid:已打开的消息队列id
    msgp:存放消息的结构体指针。msgp->mtype与第四个参数是相同的。
    msgsz:消息的字节数,指定mtext的大小。
    msgtype:消息类型,消息类型 mtype的值。如果为0,则接受该队列中的第一条信息,如果小于0,则接受小于该值的绝对值的消息类型,如果大于0,接受指定类型的消息,即该值消息。
    msgflag:函数的控制属性。
    msgflag:
        MSG_NOERROR:若返回的消息比nbytes字节多,则消息就会截短到nbytes字节,且不通知消息发送进程.
        IPC_NOWAIT:调用进程会立即返回.若没有收到消息则返回-1.
        0:msgrcv调用阻塞直到条件满足为止.
在成功地读取了一条消息以后,队列中的这条消息将被删除。


int msgctl(int msqid, int cmd, struct msqid_ds *buf);
 控制消息队列:成功返回0,失败返回-1
 msqid:消息队列ID,消息队列标识符,该值为msgget创建消息队列的返回值。
 cmd:
    IPC_STAT:将msqid相关的数据结构中各个元素的当前值存入到由buf指向的结构中.
    IPC_SET:将msqid相关的数据结构中的元素设置为由buf指向的结构中的对应值.
    IPC_RMID:删除由msqid指示的消息队列,将它从系统中删除并破坏相关数据结构.
 buf:消息队列缓冲区

在以下两种情况下,msgget将创建一个新的消息队列:

  • 如果没有与键值key相对应的消息队列,并且flag中包含了IPC_CREAT标志位。
  • key参数为IPC_PRIVATE

函数msgrcv在读取消息队列时,type参数有下面几种情况:

  • type == 0,返回队列中的第一个消息;
  • type > 0,返回队列中消息类型为 type 的第一个消息;
  • type < 0,返回队列中消息类型值小于或等于 type 绝对值的消息,如果有多个,则取类型值最小的消息。

可以看出,type值非 0 时用于以非先进先出次序读消息。也可以把 type 看做优先级的权值。(其他的参数解释,请自行Google之)

3、例子

下面写了一个简单的使用消息队列进行IPC的例子,服务端程序一直在等待特定类型的消息,当收到该类型的消息以后,发送另一种特定类型的消息作为反馈,客户端读取该反馈并打印出来。

msg_server.c

#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h>

// 用于创建一个唯一的key
#define MSG_FILE "/etc/passwd"

// 消息结构
struct msg_form {
    long mtype;
    char mtext[256];
};

int main()
{
    int msqid;
    key_t key;
    struct msg_form msg;

    // 获取key值
    if((key = ftok(MSG_FILE,'z')) < 0)
    {
        perror("ftok error");
        exit(1);
    }

    // 打印key值
    printf("Message Queue - Server key is: %d.\n", key);

    // 创建消息队列
    if ((msqid = msgget(key, IPC_CREAT|0777)) == -1)
    {
        perror("msgget error");
        exit(1);
    }

    // 打印消息队列ID及进程ID
    printf("My msqid is: %d.\n", msqid);
    printf("My pid is: %d.\n", getpid());

    // 循环读取消息
    for(;;)
    {
        msgrcv(msqid, &msg, 256, 888, 0);// 返回类型为888的第一个消息
        printf("Server: receive msg.mtext is: %s.\n", msg.mtext);
        printf("Server: receive msg.mtype is: %d.\n", msg.mtype);

        msg.mtype = 999; // 客户端接收的消息类型
        sprintf(msg.mtext, "hello, I'm server %d", getpid());
        msgsnd(msqid, &msg, sizeof(msg.mtext), 0);
    }
    return 0;
}

msg_client.c

#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h>

// 用于创建一个唯一的key
#define MSG_FILE "/etc/passwd"

// 消息结构
struct msg_form {
    long mtype;
    char mtext[256];
};

int main()
{
    int msqid;
    key_t key;
    struct msg_form msg;

    // 获取key值
    if ((key = ftok(MSG_FILE, 'z')) < 0)
    {
        perror("ftok error");
        exit(1);
    }

    // 打印key值
    printf("Message Queue - Client key is: %d.\n", key);

    // 打开消息队列
    if ((msqid = msgget(key, IPC_CREAT|0777)) == -1)
    {
        perror("msgget error");
        exit(1);
    }

    // 打印消息队列ID及进程ID
    printf("My msqid is: %d.\n", msqid);
    printf("My pid is: %d.\n", getpid());

    // 添加消息,类型为888
    msg.mtype = 888;
    sprintf(msg.mtext, "hello, I'm client %d", getpid());
    msgsnd(msqid, &msg, sizeof(msg.mtext), 0);

    // 读取类型为999的消息
    msgrcv(msqid, &msg, 256, 999, 0);
    printf("Client: receive msg.mtext is: %s.\n", msg.mtext);
    printf("Client: receive msg.mtype is: %d.\n", msg.mtype);
    return 0;
}

四、信号量

信号量(semaphore)与已经介绍过的 IPC 结构不同,它是一个计数器。信号量用于实现进程间的互斥与同步,而不是用于存储进程间通信数据。

1、特点

  1. 信号量用于进程间同步,若要在进程间传递数据需要结合共享内存

  2. 信号量基于操作系统的 PV 操作,程序对信号量的操作都是原子操作。

  3. 每次对信号量的 PV 操作不仅限于对信号量值加 1 或减 1,而且可以加减任意正整数。

  4. 支持信号量组。

2、原型

最简单的信号量是只能取 0 和 1 的变量,这也是信号量最常见的一种形式,叫做二值信号量(Binary Semaphore)。而可以取多个正整数的信号量被称为通用信号量。

Linux 下的信号量函数都是在通用的信号量数组上进行操作,而不是在一个单一的二值信号量上进行操作。

#include <sys/sem.h>
// 创建或获取一个信号量组:若成功返回信号量集ID,失败返回-1
int semget(key_t key, int num_sems, int sem_flags);
// 对信号量组进行操作,改变信号量的值:成功返回0,失败返回-1
int semop(int semid, struct sembuf semoparray[], size_t numops);
// 控制信号量的相关信息
int semctl(int semid, int sem_num, int cmd, ...);

semget创建新的信号量集合时,必须指定集合中信号量的个数(即num_sems),通常为1; 如果是引用一个现有的集合,则将num_sems指定为 0 。

semop函数中,sembuf结构的定义如下:

struct sembuf
{
    short sem_num; // 信号量组中对应的序号,0~sem_nums-1
    short sem_op;  // 信号量值在一次操作中的改变量
    short sem_flg; // IPC_NOWAIT, SEM_UNDO
}

其中 sem_op 是一次操作中的信号量的改变量:

  • sem_op > 0,表示进程释放相应的资源数,将 sem_op 的值加到信号量的值上。如果有进程正在休眠等待此信号量,则换行它们。

  • sem_op < 0,请求 sem_op 的绝对值的资源。

    • 如果相应的资源数可以满足请求,则将该信号量的值减去sem_op的绝对值,函数成功返回。
    • 当相应的资源数不能满足请求时,这个操作与sem_flg有关。
      • sem_flg 指定IPC_NOWAIT,则semop函数出错返回EAGAIN
      • sem_flg 没有指定IPC_NOWAIT,则将该信号量的semncnt值加1,然后进程挂起直到下述情况发生:
        1. 当相应的资源数可以满足请求,此信号量的semncnt值减1,该信号量的值减去sem_op的绝对值。成功返回;
        2. 此信号量被删除,函数smeop出错返回EIDRM;
        3. 进程捕捉到信号,并从信号处理函数返回,此情况下将此信号量的semncnt值减1,函数semop出错返回EINTR
  • sem_op == 0,进程阻塞直到信号量的相应值为0:

    • 当信号量已经为0,函数立即返回。
    • 如果信号量的值不为0,则依据sem_flg决定函数动作:
      • sem_flg指定IPC_NOWAIT,则出错返回EAGAIN
      • sem_flg没有指定IPC_NOWAIT,则将该信号量的semncnt值加1,然后进程挂起直到下述情况发生:
        1. 信号量值为0,将信号量的semzcnt的值减1,函数semop成功返回;
        2. 此信号量被删除,函数smeop出错返回EIDRM;
        3. 进程捕捉到信号,并从信号处理函数返回,在此情况将此信号量的semncnt值减1,函数semop出错返回EINTR

semctl函数中的命令有多种,这里就说两个常用的:

  • SETVAL:用于初始化信号量为一个已知的值。所需要的值作为联合semun的val成员来传递。在信号量第一次使用之前需要设置信号量。
  • IPC_RMID:删除一个信号量集合。如果不删除信号量,它将继续在系统中存在,即使程序已经退出,它可能在你下次运行此程序时引发问题,而且信号量是一种有限的资源。

3、例子

#include<stdio.h>
#include<stdlib.h>
#include<sys/sem.h>

// 联合体,用于semctl初始化
union semun
{
    int              val; /*for SETVAL*/
    struct semid_ds *buf;
    unsigned short  *array;
};

// 初始化信号量
int init_sem(int sem_id, int value)
{
    union semun tmp;
    tmp.val = value;
    if(semctl(sem_id, 0, SETVAL, tmp) == -1)
    {
        perror("Init Semaphore Error");
        return -1;
    }
    return 0;
}

// P操作:
//    若信号量值为1,获取资源并将信号量值-1
//    若信号量值为0,进程挂起等待
int sem_p(int sem_id)
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序号*/
    sbuf.sem_op = -1; /*P操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1)
    {
        perror("P operation Error");
        return -1;
    }
    return 0;
}

// V操作:
//    释放资源并将信号量值+1
//    如果有进程正在挂起等待,则唤醒它们
int sem_v(int sem_id)
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序号*/
    sbuf.sem_op = 1;  /*V操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1)
    {
        perror("V operation Error");
        return -1;
    }
    return 0;
}

// 删除信号量集
int del_sem(int sem_id)
{
    union semun tmp;
    if(semctl(sem_id, 0, IPC_RMID, tmp) == -1)
    {
        perror("Delete Semaphore Error");
        return -1;
    }
    return 0;
}


int main()
{
    int sem_id;  // 信号量集ID
    key_t key;
    pid_t pid;

    // 获取key值
    if((key = ftok(".", 'z')) < 0)
    {
        perror("ftok error");
        exit(1);
    }

    // 创建信号量集,其中只有一个信号量
    if((sem_id = semget(key, 1, IPC_CREAT|0666)) == -1)
    {
        perror("semget error");
        exit(1);
    }

    // 初始化:初值设为0资源被占用
    init_sem(sem_id, 0);

    if((pid = fork()) == -1)
        perror("Fork Error");
    else if(pid == 0) /*子进程*/
    {
        sleep(2);
        printf("Process child: pid=%d\n", getpid());
        sem_v(sem_id);  /*释放资源*/
    }
    else  /*父进程*/
    {
        sem_p(sem_id);   /*等待资源*/
        printf("Process father: pid=%d\n", getpid());
        sem_v(sem_id);   /*释放资源*/
        del_sem(sem_id); /*删除信号量集*/
    }
    return 0;
}

上面的例子如果不加信号量,则父进程会先执行完毕。这里加了信号量让父进程等待子进程执行完以后再执行。

五、共享内存

共享内存(Shared Memory),指两个或多个进程共享一个给定的存储区。

1、特点

  1. 共享内存是最快的一种 IPC,因为进程是直接对内存进行存取。

  2. 因为多个进程可以同时操作,所以需要进行同步。

  3. 信号量+共享内存通常结合在一起使用,信号量用来同步对共享内存的访问。

2、原型

#include <sys/shm.h>
// 创建或获取一个共享内存:成功返回共享内存ID,失败返回-1
int shmget(key_t key, size_t size, int flag);
// 连接共享内存到当前进程的地址空间:成功返回指向共享内存的指针,失败返回-1
void *shmat(int shm_id, const void *addr, int flag);
// 断开与共享内存的连接:成功返回0,失败返回-1
int shmdt(void *addr);
// 控制共享内存的相关信息:成功返回0,失败返回-1
int shmctl(int shm_id, int cmd, struct shmid_ds *buf);

当用shmget函数创建一段共享内存时,必须指定其 size;而如果引用一个已存在的共享内存,则将 size 指定为0 。

当一段共享内存被创建以后,它并不能被任何进程访问。必须使用shmat函数连接该共享内存到当前进程的地址空间,连接成功后把共享内存区对象映射到调用进程的地址空间,随后可像本地空间一样访问。

shmdt函数是用来断开shmat建立的连接的。注意,这并不是从系统中删除该共享内存,只是当前进程不能再访问该共享内存而已。

shmctl函数可以对共享内存执行多种操作,根据参数 cmd 执行相应的操作。常用的是IPC_RMID(从系统中删除该共享内存)。

3、例子

下面这个例子,使用了【共享内存+信号量+消息队列】的组合来实现服务器进程与客户进程间的通信。

  • 共享内存用来传递数据;
  • 信号量用来同步;
  • 消息队列用来 在客户端修改了共享内存后 通知服务器读取。

server.c

#include<stdio.h>
#include<stdlib.h>
#include<sys/shm.h>  // shared memory
#include<sys/sem.h>  // semaphore
#include<sys/msg.h>  // message queue
#include<string.h>   // memcpy

// 消息队列结构
struct msg_form {
    long mtype;
    char mtext;
};

// 联合体,用于semctl初始化
union semun
{
    int              val; /*for SETVAL*/
    struct semid_ds *buf;
    unsigned short  *array;
};

// 初始化信号量
int init_sem(int sem_id, int value)
{
    union semun tmp;
    tmp.val = value;
    if(semctl(sem_id, 0, SETVAL, tmp) == -1)
    {
        perror("Init Semaphore Error");
        return -1;
    }
    return 0;
}

// P操作:
//  若信号量值为1,获取资源并将信号量值-1
//  若信号量值为0,进程挂起等待
int sem_p(int sem_id)
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序号*/
    sbuf.sem_op = -1; /*P操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1)
    {
        perror("P operation Error");
        return -1;
    }
    return 0;
}

// V操作:
//  释放资源并将信号量值+1
//  如果有进程正在挂起等待,则唤醒它们
int sem_v(int sem_id)
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序号*/
    sbuf.sem_op = 1;  /*V操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1)
    {
        perror("V operation Error");
        return -1;
    }
    return 0;
}

// 删除信号量集
int del_sem(int sem_id)
{
    union semun tmp;
    if(semctl(sem_id, 0, IPC_RMID, tmp) == -1)
    {
        perror("Delete Semaphore Error");
        return -1;
    }
    return 0;
}

// 创建一个信号量集
int creat_sem(key_t key)
{
    int sem_id;
    if((sem_id = semget(key, 1, IPC_CREAT|0666)) == -1)
    {
        perror("semget error");
        exit(-1);
    }
    init_sem(sem_id, 1);  /*初值设为1资源未占用*/
    return sem_id;
}


int main()
{
    key_t key;
    int shmid, semid, msqid;
    char *shm;
    char data[] = "this is server";
    struct shmid_ds buf1;  /*用于删除共享内存*/
    struct msqid_ds buf2;  /*用于删除消息队列*/
    struct msg_form msg;  /*消息队列用于通知对方更新了共享内存*/

    // 获取key值
    if((key = ftok(".", 'z')) < 0)
    {
        perror("ftok error");
        exit(1);
    }

    // 创建共享内存
    if((shmid = shmget(key, 1024, IPC_CREAT|0666)) == -1)
    {
        perror("Create Shared Memory Error");
        exit(1);
    }

    // 连接共享内存
    shm = (char*)shmat(shmid, 0, 0);
    if((int)shm == -1)
    {
        perror("Attach Shared Memory Error");
        exit(1);
    }


    // 创建消息队列
    if ((msqid = msgget(key, IPC_CREAT|0777)) == -1)
    {
        perror("msgget error");
        exit(1);
    }

    // 创建信号量
    semid = creat_sem(key);

    // 读数据
    while(1)
    {
        msgrcv(msqid, &msg, 1, 888, 0); /*读取类型为888的消息*/
        if(msg.mtext == 'q')  /*quit - 跳出循环*/
            break;
        if(msg.mtext == 'r')  /*read - 读共享内存*/
        {
            sem_p(semid);
            printf("%s\n",shm);
            sem_v(semid);
        }
    }

    // 断开连接
    shmdt(shm);

    /*删除共享内存、消息队列、信号量*/
    shmctl(shmid, IPC_RMID, &buf1);
    msgctl(msqid, IPC_RMID, &buf2);
    del_sem(semid);
    return 0;
}

client.c

#include<stdio.h>
#include<stdlib.h>
#include<sys/shm.h>  // shared memory
#include<sys/sem.h>  // semaphore
#include<sys/msg.h>  // message queue
#include<string.h>   // memcpy

// 消息队列结构
struct msg_form {
    long mtype;
    char mtext;
};

// 联合体,用于semctl初始化
union semun
{
    int              val; /*for SETVAL*/
    struct semid_ds *buf;
    unsigned short  *array;
};

// P操作:
//  若信号量值为1,获取资源并将信号量值-1
//  若信号量值为0,进程挂起等待
int sem_p(int sem_id)
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序号*/
    sbuf.sem_op = -1; /*P操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1)
    {
        perror("P operation Error");
        return -1;
    }
    return 0;
}

// V操作:
//  释放资源并将信号量值+1
//  如果有进程正在挂起等待,则唤醒它们
int sem_v(int sem_id)
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序号*/
    sbuf.sem_op = 1;  /*V操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1)
    {
        perror("V operation Error");
        return -1;
    }
    return 0;
}


int main()
{
    key_t key;
    int shmid, semid, msqid;
    char *shm;
    struct msg_form msg;
    int flag = 1; /*while循环条件*/

    // 获取key值
    if((key = ftok(".", 'z')) < 0)
    {
        perror("ftok error");
        exit(1);
    }

    // 获取共享内存
    if((shmid = shmget(key, 1024, 0)) == -1)
    {
        perror("shmget error");
        exit(1);
    }

    // 连接共享内存
    shm = (char*)shmat(shmid, 0, 0);
    if((int)shm == -1)
    {
        perror("Attach Shared Memory Error");
        exit(1);
    }

    // 创建消息队列
    if ((msqid = msgget(key, 0)) == -1)
    {
        perror("msgget error");
        exit(1);
    }

    // 获取信号量
    if((semid = semget(key, 0, 0)) == -1)
    {
        perror("semget error");
        exit(1);
    }

    while(flag)
    {
        char c;
        printf("Please input command: ");
        scanf("%c", &c);
        switch(c)
        {
            case 'r':
                printf("Data to send: ");
                sem_p(semid);  /*访问资源*/
                scanf("%s", shm);
                sem_v(semid);  /*释放资源*/
                /*清空标准输入缓冲区*/
                while((c=getchar())!='\n' && c!=EOF);
                msg.mtype = 888;
                msg.mtext = 'r';  /*发送消息通知服务器读数据*/
                msgsnd(msqid, &msg, sizeof(msg.mtext), 0);
                break;
            case 'q':
                msg.mtype = 888;
                msg.mtext = 'q';
                msgsnd(msqid, &msg, sizeof(msg.mtext), 0);
                flag = 0;
                break;
            default:
                printf("Wrong input!\n");
                /*清空标准输入缓冲区*/
                while((c=getchar())!='\n' && c!=EOF);
        }
    }

    // 断开连接
    shmdt(shm);

    return 0;
}

注意:当scanf()输入字符或字符串时,缓冲区中遗留下了\n,所以每次输入操作后都需要清空标准输入的缓冲区。但是由于 gcc 编译器不支持fflush(stdin)(它只是标准C的扩展),所以我们使用了替代方案:

while((c=getchar())!='\n' && c!=EOF);

六、信号

信号是一种事件通知机制,当接收到该信号的进程会执行相应的操作。

1、特点

  1. 由硬件产生,如从键盘输入Ctrl+C可以终止当前进程
  2. 由其他进程发送,例如,在shell进程下,使用命令kill  -信号值 PID
  3. 异常,当进程异常时发送信号

2、原型

#include<signal.h>
void(*signal(int sig,void (*func)(int)(int))
// sig:信号值
// func:信号处理的函数指针,参数为信号值

int sigaction(int sig,const struct sigaction *act,struct sigaction *oact);
// sig:信号值
// act:指定信号的动作,相当于func
// oact:保存原信号的动作

int kill(pid_t pid,int sig)
// 它的作用是把信号sig发送给pid进程,成功时返回0;失败原因一般存在3点:给定的信号无效、发送权限不够、目标进程不存在
// kill调用失败返回-1,调用失败通常有三大原因:
// 1、给定的信号无效(errno = EINVAL)
// 2、发送权限不够( errno = EPERM )
// 3、目标进程不存在( errno = ESRCH )

信号是由操作系统处理的,所以信号的处理在内核态。如果不是紧急信号的话,它不一定被立即处理,操作系统不会为了处理一个信号而把当前正在运行的进程挂起,因为挂起(进程切换)当前进程消耗很大。所以操作系统一般会将信号先放入信号表中,一般选择在内核态切换回用户态的时候处理信号(不用自己单独进行进程切换以免浪费时间)

3、例子

函数signal的例子 signal1.c

#include <signal.h>
#include <stdio.h>
#include <unistd.h>
 
void ouch(int sig)
{
	printf("\nOUCH! - I got signal %d\n", sig);
	//恢复终端中断信号SIGINT的默认行为
	(void) signal(SIGINT, SIG_DFL);
}
 
int main()
{
	//改变终端中断信号SIGINT的默认行为,使之执行ouch函数
	//而不是终止程序的执行
	(void) signal(SIGINT, ouch);
	while(1)
	{
		printf("Hello World!\n");
		sleep(1);
	}
	return 0;
}

 函数sigcation 函数的例子 signal2.c

#include <unistd.h>
#include <stdio.h>
#include <signal.h>
 
void ouch(int sig)
{
	printf("\nOUCH! - I got signal %d\n", sig);
}
 
int main()
{
	struct sigaction act;
	act.sa_handler = ouch;
	//创建空的信号屏蔽字,即不屏蔽任何信息
	sigemptyset(&act.sa_mask);
	//使sigaction函数重置为默认行为
	act.sa_flags = SA_RESETHAND;
 
	sigaction(SIGINT, &act, 0);
 
	while(1)
	{
		printf("Hello World!\n");
		sleep(1);
	}
	return 0;
}

 一个综合例子 signal3.c

int main()
{
	pid_t pid;
	pid=fork();
	switch(pid)
	{
	case -1:
		perror("fork failed\n");
 
	case 0://子进程
		sleep(5);
		kill(getppid(),SIGALRM);
		exit(0);
	default:;
	}
 
	signal(SIGALRM,func);
	while(!n)
	{
		printf("hello world\n");
		sleep(1);
	}
	if(n)
	{
		printf("hava a signal %d\n",SIGALRM);
	}
	exit(0);
}

七.套接字 (socket)

一、什么是socket

        socket,即套接字是一种通信机制,凭借这种机制,客户/服务器(即要进行通信的进程)系统的开发工作既可以在本地单机上进行,也可以跨网络进行。也就是说它可以让不在同一台计算机但通过网络连接计算机上的进程进行通信。也因为这样,套接字明确地将客户端和服务器区分开来。

二、套接字的属性

套接字的特性由3个属性确定,它们分别是:域、类型和协议。

1、套接字的域

它指定套接字通信中使用的网络介质,最常见的套接字域是AF_INET,它指的是Internet网络。当客户使用套接字进行跨网络的连接时,它就需要用到服务器计算机的IP地址和端口来指定一台联网机器上的某个特定服务,所以在使用socket作为通信的终点,服务器应用程序必须在开始通信之前绑定一个端口,服务器在指定的端口等待客户的连接。另一个域AF_UNIX表示UNIX文件系统,它就是文件输入/输出,而它的地址就是文件名。

2、套接字类型

因特网提供了两种通信机制:流(stream)和数据报(datagram),因而套接字的类型也就分为流套接字和数据报套接字。这里主要讲流套接字。

流套接字由类型SOCK_STREAM指定,它们是在AF_INET域中通过TCP/IP连接实现,同时也是AF_UNIX中常用的套接字类型。流套接字提供的是一个有序、可靠、双向字节流的连接,因此发送的数据可以确保不会丢失、重复或乱序到达,而且它还有一定的出错后重新发送的机制。

与流套接字相对的是由类型SOCK_DGRAM指定的数据报套接字,它不需要建立连接和维持一个连接,它们在AF_INET中通常是通过UDP/IP协议实现的。它对可以发送的数据的长度有限制,数据报作为一个单独的网络消息被传输,它可能会丢失、复制或错乱到达,UDP不是一个可靠的协议,但是它的速度比较高,因为它并一需要总是要建立和维持一个连接。

3、套接字协议

只要底层的传输机制允许不止一个协议来提供要求的套接字类型,我们就可以为套接字选择一个特定的协议。通常只需要使用默认值。

三、套接字地址

套接口是由socket数据结构代表的,形式如下:

struct socket
{
  socket_state  state;     /* 指明套接口的连接状态,一个套接口的连接状态可以有以下几种
套接口是空闲的,还没有进行相应的端口及地址的绑定;还没有连接;正在连接中;已经连接;正在解除连接。     */
  unsigned long     flags;
  struct proto_ops  ops;  /* 指明可对套接口进行的各种操作 */
  struct inode      inode; /* 指向sockfs文件系统中的相应inode */
  struct fasync_struct  *fasync_list; /* Asynchronous wake up list  */
  struct file       *file;            /* 指向sockfs文件系统中的相应文件  */
  struct sock       sk;  /* 任何协议族都有其特定的套接口特性,该域就指向特定协议族的套接口对象。 */
  wait_queue_head_t  wait;
  short              type;
  unsigned char      passcred;
};

每个套接字都有其自己的地址格式,对于AF_UNIX域套接字来说,它的地址由结构sockaddr_un来描述,该结构定义在头文件sys/un.h中,它的定义如下:

struct sockaddr_un{
    sa_family_t sun_family;//AF_UNIX,它是一个短整型
    char        sum_path[];//路径名
};

对于AF_INET域套接字来说,它的地址结构由sockaddr_in来描述,它至少包括以下几个成员:

struct sockaddr_in{
    short int            sin_family;//AF_INET
    unsigned short int    sin_port;//端口号
    struct in_addr        sin_addr;//IP地址
};

而in_addr被定义为:

struct in_addr{
    unsigned long int s_addr;
};

四、基于流套接字的客户/服务器的工作流程

使用socket进行进程通信的进程采用的客户/服务器系统是如何工作的呢?

1、服务器端

首先服务器应用程序用系统调用socket来创建一个套接字,它是系统分配给该服务器进程的类似文件描述符的资源,它不能与其他的进程共享。

接下来,服务器进程会给套接字起个名字,我们使用系统调用bind来给套接字命名。然后服务器进程就开始等待客户连接到这个套接字。

然后,系统调用listen来创建一个队列并将其用于存放来自客户的进入连接。

最后,服务器通过系统调用accept来接受客户的连接。它会创建一个与原有的命名套接不同的新套接字,这个套接字只用于与这个特定客户端进行通信,而命名套接字(即原先的套接字)则被保留下来继续处理来自其他客户的连接。

2、客户端

基于socket的客户端比服务器端简单,同样,客户应用程序首先调用socket来创建一个未命名的套接字,然后将服务器的命名套接字作为一个地址来调用connect与服务器建立连接。

一旦连接建立,我们就可以像使用底层的文件描述符那样用套接字来实现双向数据的通信。   

五、流式socket的接口及作用

socket的接口函数声明在头文件sys/types.h和sys/socket.h中。

1、创建套接字——socket系统调用

该函数用来创建一个套接字,并返回一个描述符,该描述符可以用来访问该套接字,它的原型如下:

int socket(int domain, int type, int protocol);

函数中的三个参数分别对应前面所说的三个套接字属性。protocol参数设置为0表示使用默认协议。

2、命名(绑定)套接字——bind系统调用

该函数把通过socket调用创建的套接字命名,从而让它可以被其他进程使用。对于AF_UNIX,调用该函数后套接字就会关联到一个文件系统路径名,对于AF_INET,则会关联到一个IP端口号。函数原型如下:

int bind( int socket, const struct sockaddr *address, size_t address_len);

成功时返回0,失败时返回-1;

3、创建套接字队列(监听)——listen系统调用

该函数用来创建一个队列来保存未处理的请求。成功时返回0,失败时返回-1,其原型如下:

int listen(int socket, int backlog);

backlog用于指定队列的长度,等待处理的进入连接的个数最多不能超过这个数字,否则往后的连接将被拒绝,导致客户的连接请求失败。调用后,程序一直会监听这个IP端口,如果有连接请求,就把它加入到这个队列中。

4、接受连接——accept系统调用

该系统调用用来等待客户建立对该套接字的连接。accept系统调用只有当客户程序试图连接到由socket参数指定的套接字上时才返回,也就是说,如果套接字队列中没有未处理的连接,accept将阻塞直到有客户建立连接为止。accept函数将创建一个新套接字来与该客户进行通信,并且返回新套接字的描述符,新套接字的类型和服务器监听套接字类型是一样的。它的原型如下:

int accept(int socket, struct sockaddr *address, size_t *address_len);

address为连接客户端的地址,参数address_len指定客户结构的长度,如果客户地址的长度超过这个值,它将会截断。

5、请求连接——connect系统调用

该系统调用用来让客户程序通过在一个未命名套接字和服务器监听套接字之间建立连接的方法来连接到服务器。它的原型如下:

int connect(int socket, const struct sockaddr *address, size_t address_len);

参数socket指定的套接字连接到参数addres指定的服务器套接字。成功时返回0,失败时返回-1.

6、关闭socket——close系统调用

该系统调用用来终止服务器和客户上的套接字连接,我们应该总是在连接的两端(服务器和客户)关闭套接字。

六、进程使用流式socket进行通信

下面用多个客户程序和一个服务器程序来展示进程间如何利用套接字进行通信。

sockserver.c是一个服务器程序,它首先创建套接字,然后绑定一个端口再监听套接字,忽略子进程的停止消息等,然后它进入循环,一直循环检查是否有客户连接到服务器,如果有,则调用fork创建一个子进程来处理请求。利用read系统调用来读取客户端发来的信息,利用write系统调用来向客户端发送信息。这个服务器的工作非常简单,就是把客户发过来的字符+1,再发送回给客户。

sockclient.c是一个客户程序,它同样要先创建套接,然后连接到指定IP端口服务器,如果连接成功,就用write来发送信息给服务器,再用read获取服务器处理后的信息,再输出。

服务器sockserver.c的源代码如下:

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
 
int main()
{
    int server_sockfd = -1;
    int client_sockfd = -1;
    int client_len = 0;
    struct sockaddr_in server_addr;
    struct sockaddr_in client_addr;
    //创建流套接字
    server_sockfd = socket(AF_INET, SOCK_STREAM, 0);
    //设置服务器接收的连接地址和监听的端口
    server_addr.sin_family = AF_INET;//指定网络套接字
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);//接受所有IP地址的连接
    server_addr.sin_port = htons(9736);//绑定到9736端口
    //绑定(命名)套接字
    bind(server_sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
    //创建套接字队列,监听套接字
    listen(server_sockfd, 5);
    //忽略子进程停止或退出信号
    signal(SIGCHLD, SIG_IGN);
    
    while(1)
    {
        char ch = '\0';
        client_len = sizeof(client_addr);
        printf("Server waiting\n");
        //接受连接,创建新的套接字
        client_sockfd = accept(server_sockfd, (struct sockaddr*)&client_addr, &client_len);
 
        if(fork() == 0)
        {
            //子进程中,读取客户端发过来的信息,处理信息,再发送给客户端
            read(client_sockfd, &ch, 1);
            sleep(5);
            ch++;
            write(client_sockfd, &ch, 1);
            close(client_sockfd);
            exit(0);
        }
        else
        {
            //父进程中,关闭套接字
            close(client_sockfd);
        }
    }
}

客户sockclient.c的源代码如下:

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
 
int main()
{
    int sockfd = -1;
    int len = 0;
    struct sockaddr_in address;
    int result;
    char ch = 'A';
    //创建流套接字
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    //设置要连接的服务器的信息
    address.sin_family = AF_INET;//使用网络套接字
    address.sin_addr.s_addr = inet_addr("127.0.0.1");//服务器地址
    address.sin_port = htons(9736);//服务器所监听的端口
    len = sizeof(address);
    //连接到服务器
    result = connect(sockfd, (struct sockaddr*)&address, len);
 
    if(result == -1)
    {
        perror("ops:client\n");
        exit(1);
    }
    //发送请求给服务器
    write(sockfd, &ch, 1);
    //从服务器获取数据
    read(sockfd, &ch, 1);
    printf("char form server = %c\n", ch);
    close(sockfd);
    exit(0);
}

八. android下binder

Binder的定义,在不同场景下其定义不同

一. Binder 跨进程通信机制 模型

1 模型原理图

Binder 跨进程通信机制 模型 基于 Client - Server 模式

示意图
2 模型组成角色说明

示意图
3 Binder驱动的作用 & 原理

简介

示意图
跨进程通信的核心原理

示意图

4 模型原理步骤说明

示意图
额外说明
说明1:Client进程、Server进程 & Service Manager 进程之间的交互 都必须通过Binder驱动(使用 open 和 ioctl文件操作函数),而非直接交互
原因:

  1. Client进程、Server进程 & Service Manager进程属于进程空间的用户空间,不可进行进程间交互
  2. Binder驱动 属于 进程空间的 内核空间,可进行进程间 & 进程内交互

所以,原理图可表示为以下:

示意图

虚线表示并非直接交互

说明2: Binder驱动 & Service Manager进程 属于 Android基础架构(即系统已经实现好了);而Client 进程 和 Server 进程 属于Android应用层(需要开发者自己实现)
所以,在进行跨进程通信时,开发者只需自定义Client & Server 进程 并 显式使用上述3个步骤,最终借助 Android的基本架构功能就可完成进程间通信

示意图

说明3:Binder请求的线程管理
Server进程会创建很多线程来处理Binder请求
Binder模型的线程管理 采用Binder驱动的线程池,并由Binder驱动自身进行管理
而不是由Server进程来管理的

一个进程的Binder线程数默认最大是16,超过的请求会被阻塞等待空闲的Binder线程。
所以,在进程间通信时处理并发问题时,如使用ContentProvider时,它的CRUD(创建、检索、更新和删除)方法只能同时有16个线程同时工作

至此,我相信大家对Binder 跨进程通信机制 模型 已经有了一个非常清晰的定性认识
下面,我将通过一个实例,分析Binder跨进程通信机制 模型在 Android中的具体代码实现方式
即分析 上述步骤在Android中具体是用代码如何实现的

5. Binder机制 在Android中的具体实现原理

Binder机制在 Android中的实现主要依靠 Binder类,其实现了IBinder 接口
步骤1:注册服务
过程描述
Server进程 通过Binder驱动 向 Service Manager进程 注册服务
代码实现

  1. Server进程 创建 一个 Binder 对象
  2. Binder 实体是 Server进程 在 Binder 驱动中的存在形式
  3. 该对象保存 Server 和 ServiceManager 的信息(保存在内核空间中)
  4. Binder 驱动通过 内核空间的Binder 实体 找到用户空间的Server对象

 代码分析

    
    Binder binder = new Stub();
    // 步骤1:创建Binder对象 ->>分析1

    // 步骤2:创建 IInterface 接口类 的匿名类
    // 创建前,需要预先定义 继承了IInterface 接口的接口 -->分析3
    IInterface plus = new IPlus(){

          // 确定Client进程需要调用的方法
          public int add(int a,int b) {
               return a+b;
         }

          // 实现IInterface接口中唯一的方法
          public IBinder asBinder(){ 
                return null ;
           }
};
          // 步骤3
          binder.attachInterface(plus,"add two int");
         // 1. 将(add two int,plus)作为(key,value)对存入到Binder对象中的一个Map<String,IInterface>对象中
         // 2. 之后,Binder对象 可根据add two int通过queryLocalIInterface()获得对应IInterface对象(即plus)的引用,可依靠该引用完成对请求方法的调用
        // 分析完毕,跳出


<-- 分析1:Stub类 -->
    public class Stub extends Binder {
    // 继承自Binder类 ->>分析2

          // 复写onTransact()
          @Override
          boolean onTransact(int code, Parcel data, Parcel reply, int flags){
          // 具体逻辑等到步骤3再具体讲解,此处先跳过
          switch (code) { 
                case Stub.add: { 

                       data.enforceInterface("add two int"); 

                       int  arg0  = data.readInt();
                       int  arg1  = data.readInt();

                       int  result = this.queryLocalIInterface("add two int") .add( arg0,  arg1); 

                        reply.writeInt(result); 

                        return true; 
                  }
           } 
      return super.onTransact(code, data, reply, flags); 

}
// 回到上面的步骤1,继续看步骤2

<-- 分析2:Binder 类 -->
 public class Binder implement IBinder{
    // Binder机制在Android中的实现主要依靠的是Binder类,其实现了IBinder接口
    // IBinder接口:定义了远程操作对象的基本接口,代表了一种跨进程传输的能力
    // 系统会为每个实现了IBinder接口的对象提供跨进程传输能力
    // 即Binder类对象具备了跨进程传输的能力

        void attachInterface(IInterface plus, String descriptor);
        // 作用:
          // 1. 将(descriptor,plus)作为(key,value)对存入到Binder对象中的一个Map<String,IInterface>对象中
          // 2. 之后,Binder对象 可根据descriptor通过queryLocalIInterface()获得对应IInterface对象(即plus)的引用,可依靠该引用完成对请求方法的调用

        IInterface queryLocalInterface(Stringdescriptor) ;
        // 作用:根据 参数 descriptor 查找相应的IInterface对象(即plus引用)

        boolean onTransact(int code, Parcel data, Parcel reply, int flags);
        // 定义:继承自IBinder接口的
        // 作用:执行Client进程所请求的目标方法(子类需要复写)
        // 参数说明:
        // code:Client进程请求方法标识符。即Server进程根据该标识确定所请求的目标方法
        // data:目标方法的参数。(Client进程传进来的,此处就是整数a和b)
        // reply:目标方法执行后的结果(返回给Client进程)
         // 注:运行在Server进程的Binder线程池中;当Client进程发起远程请求时,远程请求会要求系统底层执行回调该方法

        final class BinderProxy implements IBinder {
         // 即Server进程创建的Binder对象的代理对象类
         // 该类属于Binder的内部类
        }
        // 回到分析1原处
}

<-- 分析3:IInterface接口实现类 -->

 public interface IPlus extends IInterface {
          // 继承自IInterface接口->>分析4
          // 定义需要实现的接口方法,即Client进程需要调用的方法
         public int add(int a,int b);
// 返回步骤2
}

<-- 分析4:IInterface接口类 -->
// 进程间通信定义的通用接口
// 通过定义接口,然后再服务端实现接口、客户端调用接口,就可实现跨进程通信。
public interface IInterface
{
    // 只有一个方法:返回当前接口关联的 Binder 对象。
    public IBinder asBinder();
}
  // 回到分析3原处

注册服务后,Binder驱动持有 Server进程创建的Binder实体

步骤2:获取服务

  • Client进程 使用 某个 service前(此处是 相加函数),须 通过Binder驱动 向 ServiceManager进程 获取相应的Service信息
  • 具体代码实现过程如下

示意图

 示意图

此时,Client进程与 Server进程已经建立了连接

步骤3:使用服务
Client进程 根据获取到的 Service信息(Binder代理对象),通过Binder驱动 建立与 该Service所在Server进程通信的链路,并开始使用服务

过程描述

  1. Client进程 将参数(整数a和b)发送到Server进程
  2. Server进程 根据Client进程要求调用 目标方法(即加法函数)
  3. Server进程 将目标方法的结果(即加法后的结果)返回给Client进程

代码实现过程

步骤1: Client进程 将参数(整数a和b)发送到Server进程

// 1. Client进程 将需要传送的数据写入到Parcel对象中
// data = 数据 = 目标方法的参数(Client进程传进来的,此处就是整数a和b) + IInterface接口对象的标识符descriptor
  android.os.Parcel data = android.os.Parcel.obtain();
  data.writeInt(a); 
  data.writeInt(b); 

  data.writeInterfaceToken("add two int");;
  // 方法对象标识符让Server进程在Binder对象中根据"add two int"通过queryLocalIInterface()查找相应的IInterface对象(即Server创建的plus),Client进程需要调用的相加方法就在该对象中

  android.os.Parcel reply = android.os.Parcel.obtain();
  // reply:目标方法执行后的结果(此处是相加后的结果)

// 2. 通过 调用代理对象的transact() 将 上述数据发送到Binder驱动
  binderproxy.transact(Stub.add, data, reply, 0)
  // 参数说明:
    // 1. Stub.add:目标方法的标识符(Client进程 和 Server进程 自身约定,可为任意)
    // 2. data :上述的Parcel对象
    // 3. reply:返回结果
    // 0:可不管

// 注:在发送数据后,Client进程的该线程会暂时被挂起
// 所以,若Server进程执行的耗时操作,请不要使用主线程,以防止ANR


// 3. Binder驱动根据 代理对象 找到对应的真身Binder对象所在的Server 进程(系统自动执行)
// 4. Binder驱动把 数据 发送到Server 进程中,并通知Server 进程执行解包(系统自动执行)

 步骤2:Server进程根据Client进要求 调用 目标方法(即加法函数)

// 1. 收到Binder驱动通知后,Server 进程通过回调Binder对象onTransact()进行数据解包 & 调用目标方法
  public class Stub extends Binder {

          // 复写onTransact()
          @Override
          boolean onTransact(int code, Parcel data, Parcel reply, int flags){
          // code即在transact()中约定的目标方法的标识符

          switch (code) { 
                case Stub.add: { 
                  // a. 解包Parcel中的数据
                       data.enforceInterface("add two int"); 
                        // a1. 解析目标方法对象的标识符

                       int  arg0  = data.readInt();
                       int  arg1  = data.readInt();
                       // a2. 获得目标方法的参数
                      
                       // b. 根据"add two int"通过queryLocalIInterface()获取相应的IInterface对象(即Server创建的plus)的引用,通过该对象引用调用方法
                       int  result = this.queryLocalIInterface("add two int") .add( arg0,  arg1); 
                      
                        // c. 将计算结果写入到reply
                        reply.writeInt(result); 
                        
                        return true; 
                  }
           } 
      return super.onTransact(code, data, reply, flags); 
      // 2. 将结算结果返回 到Binder驱动

 步骤3:Server进程 将目标方法的结果(即加法后的结果)返回给Client进程

  // 1. Binder驱动根据 代理对象 沿原路 将结果返回 并通知Client进程获取返回结果
  // 2. 通过代理对象 接收结果(之前被挂起的线程被唤醒)

    binderproxy.transact(Stub.ADD, data, reply, 0);
    reply.readException();;
    result = reply.readInt();
          }
}

 用一个流程图来总结步骤3的内容

流程图

六. 代码案例 

1. binder_common.h

#ifndef __BINDER_COMMON_H__
#define __BINDER_COMMON_H__

#include <opencv2/opencv.hpp>

using namespace cv;

#define DBG_TAG "XlabBinderServer"
/* 普通调试打印信息, 带函数名和行号 */
#define DBG_PRINT(fmt, args...) \
    do { \
        printf(DBG_TAG "[%s:%d] --- " fmt "\n",__FUNCTION__,__LINE__, ##args); \
    } while (0)

typedef struct {
    int size;
    int rows;
    int cols;
} XlabCome_S;

typedef enum {
    ENUM_INIT = 0,
} BinderFunction_E;

typedef enum {
    ENUM_AK_READ = 0,
    ENUM_AK_WRITE,
} BinderRdwrflag_E;

class XlabBindeCommon {
public:
    static uint32_t saveToFile(void *data, uint32_t size, char *path) {
        FILE *fp = fopen((const char *)path, "w+");
        if (fp == NULL) {
            DBG_PRINT("open failed!\n");
            return 0;
        }
        uint32_t ret = fwrite(data, 1, size, fp);
        fclose(fp);

        return ret;
    };

    static  uint32_t readFromFile(void *data, uint32_t size, const char *path)
    {
        FILE *fp = fopen((const char *)path, "rb");
        if (fp == NULL) {
            return 0;
        }
        uint32_t ret = fread(data, 1, size, fp);
        fclose(fp);

        return ret;
    };

    /**
    * 设置共享内存文件
    * @return     返回指向共享内存指针
    * @param      XlabCome_S 结构体  rdwrflag 读写标志位
    * @exception    无
    **/
    static uchar * sharedMemoryMap(XlabCome_S *xlabcom,BinderRdwrflag_E rdwrflage) {
        int fd = open("/mnt/xlab_mmap", O_RDWR | O_CREAT, 00777);
        if (fd < 0) {
            DBG_PRINT("open mmap filed!\n");
            return nullptr;
        }

        if (ENUM_AK_WRITE) {
            ftruncate(fd, xlabcom->size);     //改变文件大小
        }
        unsigned char *mapped = static_cast<unsigned char *>(mmap(NULL, xlabcom->size,
                                PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
        if (mapped == MAP_FAILED) {
            DBG_PRINT("mmap failed\n");
            return nullptr;
        }
        return mapped;
    };
};

#endif

client端

binder_client.h

#ifndef __AK_BINDER_CLIENT_H__
#define __AK_BINDER_CLIENT_H__
#include <binder/IBinder.h>
#include <binder/Binder.h>
#include <stdio.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include "binder_common.h"

using namespace android;

class XlabBinder {
public:
    XlabBinder() {
        m_binder = NULL;
    }

    ~XlabBinder() {};

    /**
    * 初始化共享内存
    * @return     
    * @param      stp_para 
    * @exception    无
    **/
    int Init(unsigned char *readBuf, XlabCome_S *xlabcom);

private:
    sp <IBinder> m_binder;
    //通过ServiceManager获取服务接口
    bool getXlabBinderService();
};
//namespace

#endif

binder_client.cpp

#include <binder/IServiceManager.h>
#include <binder/IPCThreadState.h>
#include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <fstream>
#include "binder_client.h"

using namespace android;

int XlabBinder::Init(unsigned char *readBuf, XlabCome_S *xlabcom)
{
    uchar *readBuf1 = XlabBindeCommon::sharedMemoryMap(xlabcom, ENUM_AK_WRITE);
    if (readBuf1 == nullptr) {
        DBG_PRINT("memory map open filed!\n");
        return -2;
    }
    memcpy(readBuf1,readBuf,xlabcom->size);
    bool ret = getXlabBinderService();
    if (ret == true) {
        Parcel data, reply;
        status_t ret = data.write((void *)xlabcom, sizeof(XlabCome_S));
        if (ret != NO_ERROR) {
            DBG_PRINT("trans failed!!");
        }
        m_binder->transact(ENUM_INIT, data, &reply);
        bool answer = reply.readByte();

        DBG_PRINT("answer %d \n", answer);
        return answer;
    } else {
        DBG_PRINT("connect server errorr\n");
        return -5;
    }
}


bool XlabBinder::getXlabBinderService()
{
    ProcessState::initWithDriver("/dev/vndbinder");
    sp <IServiceManager> sm = defaultServiceManager();
    bool ret = false;
    int retry = 200;       // 开机等待Service启动
    if (m_binder == NULL) {
        while (retry-- > 0) {
            m_binder = sm->getService(String16("android.xlabservice"));
            if (m_binder == NULL) {
                DBG_PRINT("xlabservice not published, waiting...\n");
                usleep(100 * 1000);
                continue;
            } else {
                ret = true;
                break;
            }
        }
    } else {
        ret = true;
    }
    return ret;
}


int main()
{
    XlabBinder xlb;
    Mat src_img = imread("Test.png");
    int size = src_img.cols * src_img.rows *3;
	XlabCome_S xlabcome;
    xlabcome.size = size;
    xlabcome.cols = src_img.cols;
    xlabcome.rows = src_img.rows;
    xlb.Init(src_img.data, &xlabcome);
  
    return 0;
}

server端

binder_server.cpp

#include <sys/types.h>
#include <unistd.h>
#include <grp.h>
#include <binder/IPCThreadState.h>
#include <binder/ProcessState.h>
#include <binder/IServiceManager.h>
#include <utils/Log.h>
#include <private/android_filesystem_config.h>
#include "binder_service.h"

using namespace android;
int main()
{
    ProcessState::initWithDriver("/dev/vndbinder");
    sp<ProcessState> proc(ProcessState::self());
    sp<IServiceManager> sm = defaultServiceManager();// 获得ServiceManager接口
    XlabBinderService::instantiate();
    // 执行addService()函数,注册服务
    ProcessState::self()->startThreadPool();
    IPCThreadState::self()->joinThreadPool();
    // 进入循环,等待客户端的请求
    return 0;
}

binder_service.h

#ifndef __BINDER_SERVER_H__
#define __BINDER_SERVER_H__

#include <utils/threads.h>
#include <utils/RefBase.h>
#include <binder/IInterface.h>
#include <binder/BpBinder.h>
#include <binder/Parcel.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include "binder_common.h"


namespace android {

class XlabBinderService : public BBinder {
public:
    static int instantiate();
    XlabBinderService();
    virtual ~XlabBinderService();
    virtual status_t onTransact(uint32_t, const Parcel &, Parcel *, uint32_t);
private:

};

}; //namespace

#endif

binder_service.cpp

#include <binder/IServiceManager.h>
#include <binder/IPCThreadState.h>
#include <unistd.h>
#include "binder_service.h"
#include "binder_common.h"

namespace android {
XlabBinderService::XlabBinderService()
{}

XlabBinderService::~XlabBinderService()
{}

int XlabBinderService::instantiate()
{
    // 调用addService()可向ServiceManager注册新的服务
    int r = defaultServiceManager()->addService(String16("android.xlabservice"),
                    new XlabBinderService());
    DBG_PRINT("xlabservice start!\n");
    return r;
}

// 该函数分析接收到的数据包,调用相应的接口函数处理请求
status_t XlabBinderService::onTransact(uint32_t code, const Parcel &data,
        Parcel *reply, uint32_t flags)
{
    DBG_PRINT("code is : %d\n", code);
    int rets = 0;
    switch (code) {
        case ENUM_INIT: {
            XlabCome_S *xlabcom = (XlabCome_S *)data.readInplace(sizeof(XlabCome_S));
            if(xlabcom == NULL){
                DBG_PRINT("recv data init error..");
                rets = -1;
            }

            unsigned char *readBuf = XlabBindeCommon::sharedMemoryMap(xlabcom,ENUM_AK_READ);
            if (readBuf == nullptr) {
                DBG_PRINT("memory map open filed!\n");
                rets = -2;
            }
            if (0 == access("/sdcard/Binder", 0)) {
                Mat dst(xlabcom->rows, xlabcom->cols , CV_8UC3, readBuf);
                imwrite("/sdcard/Binder/InitMapTest.png",dst);
            }

            DBG_PRINT("ret = %d\n", rets);
            reply->writeByte(rets);
            break;
        }
        default:
            return BBinder::onTransact(code, data, reply, flags);
    }
    return rets;
}
}; //namespace

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐