linux下的网络聊天室

一版:多reactor模式实现高并发服务器。

sever.c 

#include "msg.h"
#include "pthreadpool.h"

#define PORT 5554

int sockfd;
char *errmsg;
sqlite3 *db;
struct online *head = NULL;

void createlist()
{
    char sql[1024] = {0};
    int ret = sqlite3_open("test.db",&db);

    memset(sql,0,sizeof(sql));
    strcpy(sql,"create table if not exists person(id integer primary key,name text,password text);");

    ret = sqlite3_exec(db,sql,NULL,NULL,&errmsg);

    return;
}

void createmessage()
{
    char sql[1024] = {0};
    int ret = sqlite3_open("test.db",&db);

    memset(sql,0,sizeof(sql));
    strcpy(sql,"create table if not exists message(id integer primary key,name text,toname text,message text);");

    ret = sqlite3_exec(db,sql,NULL,NULL,&errmsg);

    return;
}

// void insert_message(Msg *pmsg)
// {
//     char sql[1024] = {0};
//     sprintf(sql,"insert into person(name,toname,message)values('%s','%s','%s');",pmsg->name,pmsg->toname,pmsg->message);
//     sqlite3_exec(db,sql,NULL,NULL,&errmsg);
//     return ;
// }


int my_strcmp(char *dest,char *src)
{
    printf("dest:%s\n",dest);
    printf("src:%s\n",src);
    int i = 0;
    while(dest[i] == src[i])
    {
        i++;
        if(dest[i] == '\0')
        {
            return 1;
        }
    }
    return 0;
}

int my_sqlite_callback(void *para,int contun,char **columvalue,char **columnName)
{
    strcpy((char *)para,columvalue[0]);

    return 0;
}

int querypassword(Msg *msg)
{
    char sql[1024] = {0};
    char password[20];

    sprintf(sql,"select password from person where name = '%s';",msg->name);
    sqlite3_exec(db,sql,my_sqlite_callback,(void*)password,&errmsg);
    if(my_strcmp(password,msg->password) == 1)
    {
        return 1;
    }
    return 0;
}

void insert_link(struct online *new_user)
{
    new_user->next = head;
    head = new_user;
}

void server_exit(int sig)
{
    shutdown(sockfd,SHUT_RDWR);
    
    exit(1);
}

int find_fd(char *toname)
{
    struct online*temp = head;
    while(temp != NULL)
    {
        if(strcmp(temp->name,toname) == 0)
        {
            return temp->cfd;
        }
        temp = temp->next;
    }
    return -1;
}

void *thread_read(void *arg)
{
    pthread_t id = pthread_self();
    pthread_detach(id);

    long cfd = (long)arg;
    Msg *pmsg = (Msg *)malloc(sizeof(Msg));

    // while(1)
    // {
        memset(pmsg,0,sizeof(Msg));
        int r_n = read(cfd,pmsg,sizeof(Msg));

        if(r_n == 0)
        {
            printf("server exit!\n");
            pthread_exit(NULL);
        }
        switch(pmsg->action)
        {

            case 1:
            {
                if(!querypassword(pmsg))
                {
                    pmsg->action = -1;
                }

                struct online *new_user = (struct online*)malloc(sizeof(struct online));
                strcpy(new_user->name,pmsg->name);
                new_user->cfd = cfd;
                insert_link(new_user);
                write(cfd,pmsg,sizeof(Msg));
                break;
            }
            case 2:
            {
                int to_fd = find_fd(pmsg->toname);
                if(to_fd == -1)
                {
                    // insert_message(pmsg);
                    pmsg->action = -2;
                    write(cfd,pmsg,sizeof(Msg));
                }
                else
                {
                    write(to_fd,pmsg,sizeof(Msg));
                }
                break;
            }
            case 0:
            {
                char sql[1024] = {0};
                int ret = sqlite3_open("test.db",&db);

                memset(sql,0,sizeof(sql));
                sprintf(sql,"insert into person(name,password)values('%s',%s);",pmsg->name,pmsg->password);
                ret = sqlite3_exec(db,sql,NULL,NULL,&errmsg);
                break;
            }
            case 3:
            {
                struct online*temp = head;
                while(temp != NULL)
                {
                    write(temp->cfd,pmsg,sizeof(Msg));
                    temp = temp->next;
                }
                break;
            }
            case 4:
            {
                struct online*pNode = NULL;
                struct online*temp = head;
                while(temp != NULL)
                {
                    if(strcmp(temp->name,pmsg->name) == 0)
                    {
                        if(pNode == NULL)
                        {
                            head = temp->next;
                            free(temp);
                            temp = NULL;
                            pmsg->action = -4;
                            break;
                        }
                        pNode->next = temp->next;
                        free(temp);
                        temp = NULL;
                        pmsg->action = -4;
                        break;
                    }
                    pNode = temp;
                    temp = temp->next;
                }
                break;
            }

            case 5:
            {
                // char buff[1024];
                memset(pmsg->message,0,sizeof(pmsg->message));
                struct online * temp = head;
                while(temp != NULL)
                {
                    temp->name[strlen(temp->name)+1] = '\n';
                    strcat(pmsg->message,temp->name);
                    temp = temp->next;
                }
                write(cfd,pmsg,sizeof(Msg));
                break;
            }
            case 6:
            { 
                int to_fd = find_fd(pmsg->toname);
                if(to_fd == -1)
                {
                    pmsg->action = -2;
                    write(cfd,pmsg,sizeof(Msg));
                }
                else
                {
                    write(to_fd,pmsg,sizeof(Msg));
                }
                break;
            }
            case -6:
            {
                int to_fd = find_fd(pmsg->toname);
                write(to_fd,pmsg,sizeof(Msg));
                break;
            }       
        }
    // }
    free(pmsg);
    pmsg = NULL;
    return NULL;
}










#include <sys/epoll.h>

#define EPOLL_SIZE 1024

int main(int argc,char *argv[])
{
    createlist();
    createmessage();
    signal(SIGINT,server_exit);
    signal(SIGPIPE,SIG_IGN);

    long cfd;
    char message[1024];
    Msg pmsg;
    pthread_t id;
    socklen_t c_size;
    struct sockaddr_in s_addr;
    struct sockaddr_in c_addr;

    int epfd;

    epfd = epoll_create(2000);
    tpool_create(20);

    struct epoll_event events[EPOLL_SIZE];
    struct epoll_event event;


    if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0)
    {
        perror("socket error!");
        exit(1);
    }

    int opt = 1;
    setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));//设置套接字可以重复使用端口号

    s_addr.sin_family = AF_INET;
    s_addr.sin_port = htons(PORT);
    s_addr.sin_addr.s_addr = htons(INADDR_ANY);

    int ret = bind(sockfd,(struct sockaddr*)&s_addr,sizeof(struct sockaddr_in));
    if(ret != 0)
    {
        perror("bind error!");
        exit(1);
    }
    ret = listen(sockfd,20);
    if(ret != 0)
    {
        printf("listen error!");
        exit(1);
    }

    event.events = EPOLLIN;
    event.data.fd = sockfd;
    epoll_ctl(epfd,EPOLL_CTL_ADD,sockfd,&event);

    c_size = sizeof(struct sockaddr_in);

    while(1)
    {
        printf("start\n");
        int num = epoll_wait(epfd,events,EPOLL_SIZE,-1);
        for(int i=0; i<num; i++)
        {
            if(events[i].events == EPOLLIN)
            {
                if(events[i].data.fd == sockfd)
                {
                    cfd = accept(sockfd,(struct sockaddr *)&c_addr,&c_size);

                    if(cfd < 0)
                    {
                        perror("accpet error!");
                        exit(1);
                    }
                    printf("ip = %s port = %d\n",inet_ntoa(c_addr.sin_addr),ntohs(c_addr.sin_port));
                    
                    event.events = EPOLLIN;
                    event.data.fd = cfd;
                    epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &event);
                    if((--num) == 0)
                    {
                        break;
                    }
                }
                else
                {
                    printf("int\n");
                    long fd = events[i].data.fd;
                    // printf("%ld\n",fd);
                    // pthread_create(&id,NULL,thread_read,(void *)fd);
                    tpool_add_work(thread_read,(void*)fd);

                    if(--num <= 0)
                    {
                        break;
                    }
                }
            }
            
        }
    }
    shutdown(cfd,SHUT_RDWR);    

    return 0;
}

client.c

#include "msg.h"

char *errmsg;
sqlite3 *db;
int fd;


int my_strcmp(char *dest,char *src)
{
    printf("dest:%s\n",dest);
    printf("src:%s\n",src);
    int i = 0;
    while(dest[i] == src[i])
    {
        i++;
        if(dest[i] == '\0')
        {
            return 1;
        }
    }
    return 0;
}

int my_sqlite_callback(void *para,int contun,char **columvalue,char **columnName)
{
    strcpy((char *)para,columvalue[0]);
    return 0;
}

int querypassword(Msg *msg)
{
    char sql[1024] = {0};
    char password[20];
    sqlite3_open("test.db",&db);

    sprintf(sql,"select name from person where name = '%s';",msg->name);
    sqlite3_exec(db,sql,my_sqlite_callback,(void*)password,&errmsg);
    if(my_strcmp(password,msg->name) == 1)
    {
        return 1;
    }
    return 0;
}

void cliten_exit(int sig)
{
    exit(1);
}

void *thread_read(void *arg)
{
    pthread_t id = pthread_self();
    pthread_detach(id);

    long cfd = (long)arg;
    Msg *pmsg = (Msg *)malloc(sizeof(Msg));

    while(1)
    {
        memset(pmsg,0,sizeof(Msg));
        int r_n = read(cfd,pmsg,sizeof(Msg));

        if(r_n == 0)
        {
            printf("server exit!\n");
            pthread_exit(NULL);
        }
        switch(pmsg->action)
        {
            case 1:
            {
                printf("log succeessfully\n");
                break;
            }
            case -1:
            {
                printf("log failty!\n");
                break;
            }
            case 2:
            {
                printf("当前时间!(未解决)\n");
                printf("%s chat you:%s\n",pmsg->name, pmsg->message);
                break;
            }
            case -2:
            {
                printf("对方不在线!\n");
                break;
            }
            case 3:
            {
                printf("当前时间!(未解决)\n");
                printf("%s chat all:%s\n",pmsg->name,pmsg->message);
                break;
            }
            case 4:
            {
                printf("用户没找到!\n");
                break;
            }
            case -4:
            {
                printf("用户成功退出!\n");
                break;
            }
            case 5:
            {
                printf("%s\n",pmsg->message);
                break;
                
            }
            case 6:
            {               
                int ret=0;
                fd = open(pmsg->filename,O_WRONLY | O_CREAT | O_APPEND,0655);
                if(write(fd,pmsg->message,strlen(pmsg->message)) < 0)
                {
                    perror("write error!");
                    exit(1);
                }
      
                break;        
            }
            case -6:
            {
                printf("文件传输完成!\n");
                close(fd);
                break;
            }
        }
    }
}

int main(int argc,char *argv[])
{
    if(argc < 3)
    {
        printf("ip,pork\n");
        exit(1);
    }
    signal(SIGINT,cliten_exit);

    Msg *pmsg = (Msg *)malloc(sizeof(Msg));
    long sockfd;
    char message[1024];
    pthread_t id;
    struct sockaddr_in s_addr;
    if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0)
    {
        perror("socket error!");
        exit(1);
    }
    s_addr.sin_family = AF_INET;
    s_addr.sin_port = htons(atoi(argv[2]));
    s_addr.sin_addr.s_addr = inet_addr(argv[1]);

    if(connect(sockfd,(struct sockaddr *)&s_addr,sizeof(struct sockaddr)) != 0)
    {
        perror("connect error!");
        exit(1);
    }

    pthread_create(&id,NULL,thread_read,(void *)sockfd);

    printf("cmd log reg chat all view exit\n");
    char cmd[1024];
    memset(pmsg,0,sizeof(Msg));
    while(1)
    {
        memset(cmd,0,sizeof(cmd));
        printf("input cmd:\n");
        scanf("%s",cmd);
        if(strcmp(cmd,"log") == 0)
        {
            memset(pmsg,0,sizeof(Msg));
            pmsg->action = 1;
            printf("input name\n");
            scanf("%s",pmsg->name);

            printf("input password\n");
            scanf("%s",pmsg->password);

            if(write(sockfd,pmsg,sizeof(Msg)) < 0)
            {
                perror("write error!");
                exit(1);
            }
        }

        if((strcmp(cmd,"exit") == 0) && (pmsg->action > 0))
        {
            pmsg->action = 4;
            if(write(sockfd,pmsg,sizeof(Msg)) < 0)
            {
                perror("write error!");
                exit(1);
            }
        }
        
        if(strcmp(cmd,"reg") == 0)
        {
            memset(pmsg,0,sizeof(Msg));
            pmsg->action = 0;
            printf("input name\n");
            scanf("%s",pmsg->name);

            while(querypassword(pmsg) == 1)
            {
                printf("已存在重新input name\n");
                scanf("%s",pmsg->name);
            }

            printf("input password\n");
            scanf("%s",pmsg->password);

            if(write(sockfd,pmsg,sizeof(Msg)) < 0)
            {
                perror("write error!");
                exit(1);
            }
        }

        if((strcmp(cmd,"chat") == 0) && (pmsg->action > 0))
        {
            pmsg->action = 2;
            memset(pmsg->toname,0,sizeof(pmsg->toname));
            printf("input toname\n");
            scanf("%s",pmsg->toname);

            memset(pmsg->message,0,sizeof(pmsg->message));
            printf("input message\n");
            scanf("%s",pmsg->message);

            if(write(sockfd,pmsg,sizeof(Msg)) < 0)
            {
                perror("write error!");
                exit(1);
            }
        }
        else if(pmsg->action < 0)
        {
            printf("请登录!\n");
        }

        if((strcmp(cmd,"all") == 0) && (pmsg->action > 0))
        {
            pmsg->action = 3;

            memset(pmsg->message,0,sizeof(pmsg->message));
            printf("input message\n");
            scanf("%s",pmsg->message);
            
            if(write(sockfd,pmsg,sizeof(Msg)) < 0)
            {
                perror("write error!");
                exit(1);
            }
        }
        else if(pmsg->action < 0)
        {
            printf("请登录!\n");
        }
        
        if((strcmp(cmd,"view") == 0) && (pmsg->action > 0))
        {
            pmsg->action = 5;
            
            printf("1\n");
            if(write(sockfd,pmsg,sizeof(Msg)) < 0)
            {
                perror("write error!");
                exit(1);
            }
        }

        if((strcmp(cmd,"sfile") == 0) && (pmsg->action > 0))
        {
            pmsg->action = 6;
            memset(pmsg->toname,0,sizeof(pmsg->toname));
            printf("input toname\n");
            scanf("%s",pmsg->toname);
            
            memset(pmsg->filename,0,sizeof(pmsg->filename));
            printf("input filename\n");
            scanf("%s",pmsg->filename);
            
            int ret=0;
            int fd = open(pmsg->filename,O_RDONLY);
            
            while(1)
            {
                memset(pmsg->message,0,sizeof(pmsg->message));
                ret = read(fd,pmsg->message,sizeof(pmsg->message));
                if(ret == 0)
                {
                    pmsg->action = -6;
                    write(sockfd,pmsg,sizeof(Msg));
                    printf("发送成功!\n");
                    break;
                }
                if(write(sockfd,pmsg,sizeof(Msg)) < 0)
                {
                    perror("write error!");
                    exit(1);
                }
            }
            close(fd);
        }
        else if(pmsg->action < 0)
        {
            printf("请登录!\n");
        }
        
    }
    
    return 0;
}

msg.h:

#ifndef _MSG_H_
#define _MSG_H_

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <errno.h>
#include <signal.h>

#include <pthread.h>
#include <sqlite3.h>
#include <unistd.h>


struct online
{
    int cfd;
    char name[20];
    struct online *next;
    // struct online *pre;
};

typedef struct messgae
{
    int flag;
    int action;
    char name[20];
    char password[20];
    char toname[20];
    char filename[20];
    char message[1024];
    struct online* head;
}Msg;



#endif // !_MSG_H_

线程池:

pthreadpool.c

#include "pthreadpool.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <unistd.h>
#include <signal.h>
#include <errno.h>

static tpool_t*tpool = NULL;

//工作者线程,从任务链表中取出任务并执行
static void *thread_function(void *arg)
{
    tpool_work_t *work;

    while(1)
    {
        //任务队列为空,且线程池未关闭,线程阻塞等待任务
        pthread_mutex_lock(&tpool->queue_lock);
        while(!tpool->queue_head && !tpool->shutdown)
        {
            pthread_cond_wait(&tpool->queue_ready,&tpool->queue_lock);
        }

        //是否销毁线程
        if(tpool->shutdown)
        {
            pthread_mutex_unlock(&tpool->queue_lock);
            pthread_exit(NULL);
        }

        //取出任务,执行任务
        work = tpool->queue_head;
        tpool->queue_head = tpool->queue_head->next;
        pthread_mutex_unlock(&tpool->queue_lock);
        work->function(work->arg);

        //线程完成后,释放任务
        // free(work->arg);
        free(work);
    }
    return NULL;
}

//创建线程池
int tpool_create(int maxNum)
{
    int i;

    //创建线程池结构体
    tpool = calloc(1,sizeof(tpool_t));
    if(!tpool)
    {
        perror("create error!");
        exit(1);
    }

    //初始化任务链表,互斥量,条件变量
    tpool->maxNum = maxNum;
    tpool->shutdown = 0;
    tpool->queue_head = NULL;
    tpool->queue_tail = NULL;
    if(pthread_mutex_init(&tpool->queue_lock,NULL) != 0)
    {
        perror("mutex error!");
        exit(1);
    }
    if(pthread_cond_init(&tpool->queue_ready,NULL) != 0)
    {
        perror("cond error!");
        exit(1);
    } 

    //创建worker线程
    tpool->threadID = calloc(maxNum,sizeof(pthread_t));
    if(!tpool->threadID)
    {
        perror("threadID error!");
        exit(1);
    } 
    for(i=0; i<maxNum; i++)
    {
        pthread_create(&tpool->threadID[i],NULL,thread_function,NULL);
        // pthread_detach(threadID[i]);
    }
    return 0;
}

//向线程池添加任务
int tpool_add_work(void *(*function)(void*),void *arg)
{
    //work指向等待加入的任务
    tpool_work_t *work;
    if(!function)
    {
        perror("add error!");
        return -1;
    }

    work = malloc(sizeof(tpool_work_t));

    work->function = function;
    work->arg = arg;
    work->next = NULL;

    //将任务结点添加到任务链表
    pthread_mutex_lock(&tpool->queue_lock);
    //链表为空
    if(!tpool->queue_head)
    {
        tpool->queue_head = work;
        tpool->queue_tail = work;
    }
    //链表非空
    else
    {
        tpool->queue_tail->next = work;
        tpool->queue_tail = tpool->queue_tail->next;
    }
    //通知工作者线程,有新任务
    pthread_cond_signal(&tpool->queue_ready);
    pthread_mutex_unlock(&tpool->queue_lock);
    return 0;
}

//销毁线程池
void tpool_destroy()
{
    int i;
    tpool_work_t*member;
    if(tpool->shutdown)
    {
        return;
    }
    //关闭线程池开关
    tpool->shutdown = 1;

    //唤醒所有阻塞的线程
    pthread_mutex_lock(&tpool->queue_lock);
    pthread_cond_broadcast(&tpool->queue_ready);
    pthread_mutex_unlock(&tpool->queue_lock);

    //回收结束线程剩余资源
    for(i=0; i<tpool->maxNum; i++)
    {
        pthread_join(tpool->threadID[i],NULL);
    }

    //释放threadID数组
    free(tpool->threadID);

    //释放未完成的任务
    while(tpool->queue_head)
    {
        member = tpool->queue_head;
        tpool->queue_head = tpool->queue_head->next;
        // free(member->arg);
        free(member);
    }

    //销毁互斥量,条件变量
    pthread_mutex_destroy(&tpool->queue_lock);
    pthread_cond_destroy(&tpool->queue_ready);

    //释放进程池结构体
    free(tpool);
}

pthreadpool.h

#ifndef PTHREADPOOL_H
#define PTHREADPOOL_H

#include <pthread.h>

//任务结点
typedef struct tpool_work
{
    void *(*function)(void*);   //任务函数
    void *arg;                  //传入参数
    struct tpool_work *next;
}tpool_work_t;

//线程池
typedef struct tpool
{
    int shutdown;               //销毁线程池
    int maxNum;                 //最大线程数
    pthread_t *threadID;        //线程ID数组首地址
    tpool_work_t *queue_head;   //任务链表队首
    tpool_work_t *queue_tail;
    pthread_mutex_t queue_lock;
    pthread_cond_t queue_ready;
}tpool_t;

//创建线程
int tpool_create(int maxNum);

//销毁线程池
void tpool_destroy();

//向线程池中添加任务
int tpool_add_work(void*(*function)(void *),void*arg);

#endif // !

mmap内存映射,实现大文件传输

client客户端

client.c

 #include "work.h"

int lognum = 0;

int main(int argc, char *argv[])
{
    pthread_t id;
    long sockfd = Create_sock();

    pthread_create(&id,NULL,thread_read,(void *)sockfd);

    int num;
    Msg *pmsg = (Msg *)malloc(sizeof(Msg));
    while(1)
    {
        lognum = 0;
        viewlogin();
        scanf("%d",&num);
        if( 0 > num || num > 3)
        {
            continue;
        }
        if(num == 0)
        {
            exit(1);
        }
        if(num == 2)
        {
            reg_user(sockfd,pmsg);
        }
        if(num == 1)
        {
            log_user(sockfd, pmsg);
            while(1)
            {
                if(3 == lognum)
                {
                    break;
                }
                if(-1 == lognum)
                {
                    printf("登录失败\n");
                    sleep(1);
                    break;
                }
                else if(1 == lognum)
                {
                    view_user(sockfd, pmsg);
                    while(1)
                    {
                        viewhome(sockfd, pmsg);
                        scanf("%d",&lognum);
                        if(1 == lognum)
                        {
                            chat_user(sockfd, pmsg);
                            continue;
                        }
                        if(2 == lognum)
                        {
                            all_user(sockfd, pmsg);
                            continue;
                        }
                        if(3 == lognum)
                        {
                            quite_user(sockfd, pmsg);
                            break;
                        }
                    }
                }
            }           
        }
        if(num == 3)
        {
            log_manager(sockfd, pmsg);
            while(1)
            {
                if(-1 == lognum)
                {
                    printf("登录失败\n");
                    sleep(1);
                    break;
                }
                else if(1 == lognum)
                {
                    while(1)
                    {
                        viewhome(sockfd, pmsg);
                        scanf("%d",&lognum);
                        if(1 == lognum)
                        {
                            chat_user(sockfd, pmsg);
                            continue;
                        }
                        if(2 == lognum)
                        {
                            all_manager(sockfd, pmsg);
                            continue;
                        }
                        if(-1 == lognum)
                        {
                            break;
                        }
                    }
                }
            }           
        }
    }
}

work.h

#ifndef _MSG_H_
#define _MSG_H_

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <errno.h>
#include <signal.h>

#include <pthread.h>
#include <sqlite3.h>
#include <unistd.h>

#include <sys/mman.h>
#include <sys/epoll.h>
#include <semaphore.h>


#define EVENTS_SIEZ 1024
#define INT_SIZE 4
// #define SEND_SIZE    65536       	//64K
// #define BLOCKSIZE   134217728		//128M
// #define RECVBUF_SIZE    65536       //64K
#define SEND_SIZE    100       	//64K
#define BLOCKSIZE   500		//128M
#define RECVBUF_SIZE    100       //64K

struct conn
{
    int info_fd;       //信息交换socket:接收文件信息、文件传送通知client
    char filename[30]; //文件名
    int filesize;      //文件大小
    int bs;            //分块大小
    int count;         //分块数量
    int recvcount;     //已接收块数量,recv_count == count表示传输完毕
    char *mbegin;      // mmap起始地址
    int used;          //使用标记,1代表使用,0代表可用
};

typedef struct messgae
{
    int action;
    int sfile_id;
    int filesize;
    int count;
    int bs;
    int offset;
    long cfd;
    char name[20];
    char password[20];
    char toname[20];
    char filename[20];
    char message[1024];
    char now_time[32];
}Msg;


int Create_sock();
void viewhome();
void viewlogin();
void viewchat();
void viewall();
void reg_user(int sockfd, Msg *pmsg);
void* thread_read(void *arg);
void log_user(int sockfd, Msg *pmsg);
void chat_user(int sockfd, Msg *pmsg);
void send_file(int sockfd, Msg *pmsg);
void all_user(int sockfd, Msg *pmsg);
void client_file(int port);
void all_manager(int sockfd, Msg *pmsg);
void log_manager(int sockfd, Msg *pmsg);
void view_user(int sockfd, Msg *pmsg);
void get_time(char *now_time);
void quite_user(int sockfd, Msg *pmsg);



#endif // !_MSG_H_

work.c

#include "work.h"
#define SEM_SIZE 2

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex_send = PTHREAD_MUTEX_INITIALIZER;
sem_t sem[SEM_SIZE];
pthread_cond_t cond_sfile = PTHREAD_COND_INITIALIZER;
extern int lognum;
char *mbegin;
int fork_num;
int sfile_id;
struct conn gconn[20];

void printf_stea(Msg *pmsg)
{
    char now_time[32] = {0};
    get_time(now_time);
    printf("时间%s,%s:%s\n",now_time,pmsg->name,pmsg->message);
    return;
}

void init_sem()
{
    for(int i=0; i<SEM_SIZE; i++)
    {
        if(i == 0)
        {
            sem_init(&sem[i], 0, 1);
        }
        else
        {
            sem_init(&sem[i], 0, 0);
        }
    }
    return;
}

//创建大小为size的文件
int createfile(char *filename, int size)
{
    int tmpfd = open(filename, O_RDWR | O_CREAT, 0655);
    fchmod(tmpfd, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
    ftruncate(tmpfd, size);
    close(tmpfd);
    return 0;
}

void *recv_file(void *arg)
{
    //线程独立
    pthread_t id = pthread_self();
    pthread_detach(id);
    sem_wait(&sem[1]);
    Msg omsg;
    memcpy(&omsg, (Msg *)arg, sizeof(Msg));
    sem_post(&sem[0]);
    int sockfd1 = Create_sock();
    //发送套接字
    omsg.action = -5;
    write(sockfd1, &omsg, sizeof(Msg));

    //准备接收
    char *fp = gconn[omsg.sfile_id].mbegin + omsg.offset;
    int ret = 0;

    while(1)
    {
        if((ret = (recv(sockfd1, fp, RECVBUF_SIZE, 0))) == RECVBUF_SIZE)
        {
            fp += RECVBUF_SIZE;
        }
        else if(ret < RECVBUF_SIZE)
        {
            break;
        }
    }
    close(sockfd1);
    return NULL;
}

void get_time(char *now_time)
{
    time_t now;
    struct tm *tm_now;
    time(&now);
    tm_now = localtime(&now);
    memset(now_time, 0, sizeof(now_time));
    sprintf(now_time, "%d-%d-%d %d:%d:%d", tm_now->tm_year + 1900, tm_now->tm_mon + 1, 
                        tm_now->tm_mday, tm_now->tm_hour, tm_now->tm_min, tm_now->tm_sec);
    return;
}

void *thread_read(void *arg)
{
    init_sem();
    pthread_t id = pthread_self();
    pthread_detach(id);
    long cfd = (long)arg;

    Msg *pmsg = (Msg *)malloc(sizeof(Msg));
    
    while(1)
    {
        memset(pmsg,0,sizeof(Msg));
        int r_n = recv(cfd,pmsg,sizeof(Msg), MSG_WAITALL);
        if(r_n == 0)
        {
            printf("server exit!\n");
            pthread_exit(NULL);
        }

        switch(pmsg->action)
        {
            
            case 255:
            {
                printf("注册失败\n");
                break;
            }
            case 1:
            {
                lognum = 1;
                break;
            }
            case -1:
            {
                lognum = -1;
                break;
            }
            case 2:
            {
                printf_stea(pmsg);
                break;
            }
            case -2:
            {
                printf("未上线\n");
                break;
            }
            case 3:
            {
                sfile_id = pmsg->sfile_id;
                pthread_cond_signal(&cond_sfile);
                break;
            }
            case 4:
            {
                printf("%s对%s:%s\n",pmsg->name,pmsg->toname,pmsg->message);
                break;
            }
            case -3:
            {
                createfile("copy", pmsg->filesize);
                int fd;
                if ((fd = open("copy", O_RDWR)) == -1)
                {
                    printf("open file erro\n");
                    exit(-1);
                }
                char *map = (char *)mmap(NULL, pmsg->filesize, 
                                PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
                close(fd);
                gconn[pmsg->sfile_id].mbegin = map;
                gconn[pmsg->sfile_id].recvcount = 0;
                gconn[pmsg->sfile_id].count = pmsg->count;
                break;
            }
            case 5:
            {
                sem_wait(&sem[0]);
                Msg omsg;
                memcpy(&omsg, pmsg, sizeof(Msg));
                sem_post(&sem[1]);
                pthread_t id;
                if(pthread_create(&id, NULL, recv_file, (void*)&omsg) != 0)
                {
                    printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__, errno, strerror(errno));
                    exit(1);
                }
                break;
            }
            case 6:
            {
                char buf[100];
                FILE *fp = fopen("user.txt", "w+");
                if(fwrite(pmsg->message, strlen(pmsg->message), 1, fp) 
                                                        != sizeof(pmsg->message))
                {
                    fseek(fp, 0, SEEK_SET);
                    while(feof(fp) == 0)
                    {
                        fscanf(fp, "%s", buf);
                        printf("%s\n",buf);
                        memset(buf, 0, sizeof(buf));
                    }
                    fclose(fp);
                }
                break;
            }
            case 7:
            {
                printf("你被禁言了!\n");
                break;
            }
            case 8:
            {
                printf("禁言成功!\n");
                break;
            }
            case -8:
            {
                printf("解除禁言!\n");
                break;
            }
            case 9:
            {
                char buf[100];
                char buf2[100];
                FILE *fp = fopen("message.txt", "w+");
                while(fwrite(pmsg->message, strlen(pmsg->message), 1, fp) 
                                                        != sizeof(pmsg->message))
                {
                    fseek(fp, 0, SEEK_SET);
                    while(feof(fp) == 0)
                    {
                        // fscanf(fp, "%s %s", buf, buf2);
                        int i=0;
                        char ch;
                        fread(&ch, 1, 1, fp);
                        while(ch != '\n')
                        {
                            buf[i] = ch;
                            i++;
                            fread(&ch, 1, 1, fp);
                        }
                        printf("%s\n",buf);
                        memset(buf, 0, sizeof(buf));
                    }
                    fclose(fp);
                    break;
                }
                break;
            }
        }
    }

    return NULL;
}
int Create_sock()
{
    int ret;
    int sockfd;
    struct sockaddr_in s_addr;

    sockfd = socket(AF_INET,SOCK_STREAM,0);
    if(sockfd < 0)
    {
        perror("socket error!");
        exit(1);
    }

    s_addr.sin_family = AF_INET;
    s_addr.sin_port = htons(5556);
    s_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    ret = connect(sockfd, (struct sockaddr *)&s_addr, sizeof(struct sockaddr_in));
    if(ret < 0)
    {
        perror("connect error!");
        exit(1);
    }

    return sockfd;
}

int Create_UDP(int port)
{
    int sockfd;
    struct sockaddr_in s_addr;
    sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    s_addr.sin_family = AF_INET;
    s_addr.sin_port = htons(port);
    s_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    bind(sockfd, (struct sockaddr*)&s_addr, sizeof(struct sockaddr_in));
    return sockfd;
}

void viewlogin()
{
    system("clear");
    printf("login\n");
    printf("1.登录\n");
    printf("2.注册\n");
    printf("3.管理员登录\n");
    printf("0.退出\n");
}

void view_user(int sockfd, Msg *pmsg)
{
    pmsg->action = 6;
    write(sockfd, pmsg, sizeof(Msg));
}

void viewhome(int sockfd, Msg *pmsg)
{
    system("clear");
    printf("home\n");
    printf("2.群聊\n");
    printf("1.私聊\n");
    printf("3.退出\n");  
}

void reg_user(int sockfd, Msg *pmsg)
{
    memset(pmsg, 0, sizeof(Msg));
    pmsg->action = 0;
    printf("input name\n");
    scanf("%s",pmsg->name);

    printf("input password\n");
    scanf("%s",pmsg->password);

    if(write(sockfd, pmsg, sizeof(Msg)) < 0)
    {
        perror("write error!");
        exit(1);
    }
    return;
}

void log_user(int sockfd, Msg *pmsg)
{
    memset(pmsg, 0, sizeof(Msg));
    pmsg->action = 1;
    printf("input name\n");
    scanf("%s",pmsg->name);

    printf("input password\n");
    scanf("%s",pmsg->password);    

    if(write(sockfd, pmsg, sizeof(Msg)) < 0)
    {
        perror("write error!");
        exit(1);
    }
    return;
}

void viewchat()
{
    system("clear");
    printf("chat\n");
    printf("quite.退出\n"); 
    printf("sfile.发文件\n"); 
}

void chat_user(int sockfd, Msg *pmsg)
{
    printf("input toname\n");
    scanf("%s",pmsg->toname);
    viewchat();
    pmsg->action = 9;
    write(sockfd, pmsg, sizeof(Msg));
    while(1)
    {
        memset(pmsg->message, 0, sizeof(pmsg->message));
        scanf("%s",pmsg->message);
        if((0 == strcmp(pmsg->message, "sfile")))
        {
            send_file(sockfd, pmsg);
        }
        else if((0 == strcmp(pmsg->message,"quite")))
        {
            return;
        }
        else
        {
            pmsg->action = 2;
            if(write(sockfd,pmsg,sizeof(Msg)) < 0)
            {
                perror("write error!");
                exit(1);
            }
        }
        
    }
}

void viewall()
{
    system("clear");
    printf("all:\n");
    printf("1.群聊\n");
    printf("2.私聊\n");
    printf("0.退出\n"); 
}

void all_user(int sockfd, Msg *pmsg)
{
    strcpy(pmsg->toname, "all");
    viewall();

    while(1)
    {
        memset(pmsg->message, 0, sizeof(pmsg->message));
        scanf("%s", pmsg->message);
        if((0 == strcmp(pmsg->message, "sfile")))
        {

        }
        if((0 == strcmp(pmsg->message,"quite")))
        {
            return;
        }
        pmsg->action = 4;
        if(write(sockfd,pmsg,sizeof(Msg)) < 0)
        {
            perror("write error!");
            exit(1);
        }
    }
}

void quite_user(int sockfd, Msg *pmsg)
{
    pmsg->action = 10;
    write(sockfd, pmsg, sizeof(Msg));
    return;
}

void view_manager()
{
    system("clear");
    printf("all:\n");
    printf("quite.退出\n"); 
    printf("unable.禁言\n");
    printf("capable.解除禁言\n");
    printf("kick.踢人\n");

}

void all_manager(int sockfd, Msg *pmsg)
{
    strcpy(pmsg->toname, "all");
    view_manager();

    while(1)
    {
        memset(pmsg->message, 0, sizeof(pmsg->message));
        scanf("%s", pmsg->message);
        if((0 == strcmp(pmsg->message, "unable")))
        {
            pmsg->action = 8;
            printf("禁言的人是:\n");
            memset(pmsg->toname, 0, sizeof(pmsg->toname));
            scanf("%s",pmsg->toname);
            write(sockfd, pmsg, sizeof(Msg));
            
        }
        else if((0 == strcmp(pmsg->message, "capable")))
        {
            pmsg->action = -8;
            printf("解除禁言的人是:\n");
            memset(pmsg->toname, 0, sizeof(pmsg->toname));
            scanf("%s",pmsg->toname);
            write(sockfd, pmsg, sizeof(Msg));
            
        }
        else if((0 == strcmp(pmsg->message, "kick")))
        {
            //释放用户结点
            pmsg->action = 10;
        }
        else if((0 == strcmp(pmsg->message,"quite")))
        {
            return;
        }
        else
        {
            memset(pmsg->toname, 0, sizeof(pmsg->toname));
            strcpy(pmsg->toname, "all");
            pmsg->action = 4;
            if(write(sockfd,pmsg,sizeof(Msg)) < 0)
            {
                perror("write error!");
                exit(1);
            }
        }
    }
}


void client_file(int port)
{
    int ret;
    int sockfd;
    struct sockaddr_in s_addr;

    sockfd = socket(AF_INET,SOCK_STREAM,0);
    if(sockfd < 0)
    {
        perror("socket error!");
        exit(1);
    }

    s_addr.sin_family = AF_INET;
    s_addr.sin_port = htons(port);
    s_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    ret = connect(sockfd, (struct sockaddr *)&s_addr, sizeof(struct sockaddr_in));
    if(ret < 0)
    {
        perror("connect error!");
        exit(1);
    }

}

Msg *new_fb_head(Msg *p_msg)
{
    Msg*pmsg = (Msg*)malloc(sizeof(Msg));
    memset(pmsg, 0, sizeof(Msg));
    strcpy(pmsg->filename, p_msg->filename);
    strcpy(pmsg->name, p_msg->name);
    strcpy(pmsg->toname, p_msg->toname);
    pmsg->sfile_id = p_msg->sfile_id;
    pmsg->offset = p_msg->offset;
    pmsg->bs = BLOCKSIZE;
    p_msg->offset += BLOCKSIZE;
    return pmsg; 
}

void *send_filedata(void *arg)
{
    Msg*omsg = (Msg*)arg;
    //线程独立
    pthread_t id = pthread_self();
    pthread_detach(id);
    
    int sockfd2 = Create_sock();
    omsg->action = 5;
    //让服务器线程准备好
    write(sockfd2, omsg, sizeof(Msg));

    //连续的映射数据
    int i=0;
    int num = omsg->bs/SEND_SIZE;
    char *fp = mbegin + omsg->offset;
    for(i=0; i<num; i++)
    {
        if((write(sockfd2, fp, SEND_SIZE)) == SEND_SIZE)
        {
            fp += SEND_SIZE;
            omsg->bs = omsg->bs - SEND_SIZE;
        }
    }
    write(sockfd2, fp, omsg->bs);
    close(sockfd2);
    free(arg);
    return NULL;
}

#if 1
void send_file(int sockfd, Msg *pmsg)
{
    int last_bs = 0;
    //得到文件大小stat
    scanf("%s",pmsg->filename);
    int fd = open(pmsg->filename, O_RDWR);
    if(fd < 0)
    {
        printf("文件打开错误!\n");
        exit(1);
    }
    struct stat mystat;
    fstat(fd, &mystat);
    //输入pmsg
    pmsg->action = 3;
    pmsg->filesize = mystat.st_size;

    /*最后一个分块可能不足一个标准分块*/
    int count = pmsg->filesize/BLOCKSIZE;
    if(pmsg->filesize%BLOCKSIZE == 0){
        pmsg->count = count;
    }
    else{
        pmsg->count = count+1;
        last_bs = pmsg->filesize - BLOCKSIZE*count;
    }
    pmsg->bs = BLOCKSIZE;
    
    //发送文件信息
    write(sockfd, pmsg, sizeof(Msg));
    
    //接受数据id;
    while(1)
    {
        pthread_mutex_lock(&mutex);

        pthread_cond_wait(&cond_sfile, &mutex);
        pmsg->sfile_id = sfile_id;

        /*map,关闭fd*/
        mbegin = (char *)mmap(NULL, pmsg->filesize, 
                            PROT_WRITE|PROT_READ, MAP_SHARED, fd , 0);
        close(fd);
        
        //创建线程并发送
        int i_count = 0;
        pmsg->offset = 0;
        pthread_t pid[pmsg->count];
        memset(pid, 0, sizeof(pthread_t)*pmsg->count);
        //文件刚好分块
        if(last_bs == 0)
        {
            for(i_count=0; i_count<pmsg->count; i_count++)
            {
                Msg * p_msg = new_fb_head(pmsg);
                if(pthread_create(&pid[i_count], NULL, send_filedata, (void*)p_msg) != 0)
                {
                    printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__, errno, strerror(errno));
                    exit(1);
                }
            }
        }
        //不能被标准分块
        else
        {
            for(i_count=0; i_count<pmsg->count-1; i_count++)
            {
                Msg * p_msg = new_fb_head(pmsg);
                if(pthread_create(&pid[i_count], NULL, send_filedata, (void*)p_msg) != 0)
                {
                    printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__, errno, strerror(errno));
                    exit(1);
                }
            }
            //最后一块
            Msg*p_msg = (Msg*)malloc(sizeof(Msg));
            memset(p_msg, 0, sizeof(Msg));
            strcpy(p_msg->filename, pmsg->filename);
            strcpy(p_msg->name, pmsg->name);
            strcpy(p_msg->toname, pmsg->toname);
            p_msg->sfile_id = pmsg->sfile_id;
            p_msg->offset = pmsg->offset;
            p_msg->bs = last_bs;
            if(pthread_create(&pid[i_count], NULL, send_filedata, (void*)p_msg) != 0)
                {
                    printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__, errno, strerror(errno));
                    exit(1);
                }
        }

        //回收线程
        for(i_count=0; i_count<pmsg->count; i_count++)
        {
            pthread_join(pid[i_count],NULL);
        }
        break;
    }
    pthread_mutex_unlock(&mutex);
    return;
}

#endif

void log_manager(int sockfd, Msg *pmsg)
{
    memset(pmsg, 0, sizeof(Msg));
    pmsg->action = 7;
    printf("input name\n");
    scanf("%s",pmsg->name);

    printf("input password\n");
    scanf("%s",pmsg->password);    

    if(write(sockfd, pmsg, sizeof(Msg)) < 0)
    {
        perror("write error!");
        exit(1);
    }
    return;
}

makefile

obj := work.o client.o

client:$(obj)
	gcc $(obj) -o client -lpthread
%.o:%.c
	gcc $< -c -o $@
.PHONY:clean
clean:
	rm -rf *.o client

server服务器

server.c

#include "work.h"
#include "pthreadpool.h"

int main(int argc, char *argv[])
{
    createlist();
    tpool_create(20);
    signal(SIGPIPE,SIG_IGN);

    int sockfd = Create_sock();
    int epfd;
    struct epoll_event event;
    struct epoll_event events[EVENTS_SIEZ];

    epfd = epoll_create(EVENTS_SIEZ);

    event.events = EPOLLIN;
    event.data.fd = sockfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
    socklen_t c_size;
    struct sockaddr_in c_addr;
    c_size = sizeof(struct sockaddr_in);
    while(1)
    {
        printf("start\n");
        int count = epoll_wait(epfd, events, EVENTS_SIEZ, -1);
        for(int i=0; i<count; i++)
        {
            if(events[i].events == EPOLLIN)
            {
                if(events[i].data.fd == sockfd)
                {
                    long cfd = accept(sockfd,(struct sockaddr*)&c_addr,&c_size);
                    if(cfd < 0)
                    {
                        perror("accept error!");
                        exit(1);
                    }
                    printf("ip = %s port = %d\n",inet_ntoa(c_addr.sin_addr),ntohs(c_addr.sin_port));
                    pthread_t id;
                    // 向线程池添加任务
                    tpool_add_work(thread_read,(void*)cfd);
                }
            }
            
        }
    }
}

msg.h

#ifndef _MSG_H_
#define _MSG_H_

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <errno.h>
#include <signal.h>

#include <pthread.h>
#include <sqlite3.h>
#include <unistd.h>


struct online
{
    int cfd;
    char name[20];
    struct online *next;
    // struct online *pre;
};

typedef struct messgae
{
    int flag;
    int action;
    char name[20];
    char password[20];
    char toname[20];
    char filename[20];
    char message[1024];
    struct online* head;
}Msg;



#endif // !_MSG_H_

work.c

#include "work.h"
#include "pthreadpool.h"

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
struct online *head = NULL;
char *errmsg;
sqlite3 *db;
int addr[100] = {0};

int my_strcmp(char *dest,char *src)
{
    printf("dest:%s\n",dest);
    printf("src:%s\n",src);
    int i = 0;
    while(dest[i] == src[i])
    {
        i++;
        if(dest[i] == '\0')
        {
            return 1;
        }
    }
    return 0;
}

void createlist()
{
    char sql[1024] = {0};
    int ret = sqlite3_open("test.db",&db);

    memset(sql,0,sizeof(sql));
    strcpy(sql,"create table if not exists person(id integer primary key,name text,password text);");
    ret = sqlite3_exec(db,sql,NULL,NULL,&errmsg);
    
    memset(sql,0,sizeof(sql));
    strcpy(sql,"create table if not exists manager(id integer primary key,name text,password text);");
    ret = sqlite3_exec(db,sql,NULL,NULL,&errmsg);
    
    memset(sql,0,sizeof(sql));
    strcpy(sql,"create table if not exists storagemessage(fromname text, toname text,message text, timestamp not NULL default(datetime('now','localtime')));");
    ret = sqlite3_exec(db,sql,NULL,NULL,&errmsg);
    return;
}

void insert_link(struct online *new_user)
{
    new_user->next = head;
    head = new_user;
}

int find_fd(char *toname)
{
    struct online*temp = head;
    while(temp != NULL)
    {
        if(strcmp(temp->name,toname) == 0)
        {
            return temp->cfd;
        }
        temp = temp->next;
    }
    return -1;
}

int my_sqlite_callback(void *para,int contun,char **columvalue,char **columnName)
{
    strcpy((char *)para,columvalue[0]);

    return 0;
}
int querypassword(Msg *msg)
{
    char sql[1024] = {0};
    char password[20];

    sprintf(sql,"select password from person where name = '%s';",msg->name);
    sqlite3_exec(db,sql,my_sqlite_callback,(void*)password,&errmsg);
    if(my_strcmp(password,msg->password) == 1)
    {
        return 1;
    }
    return 0;
}

int queryname(Msg *msg)
{
    char sql[1024] = {0};
    char password[20];
    sqlite3_open("test.db",&db);

    sprintf(sql,"select name from person where name = '%s';",msg->name);
    sqlite3_exec(db,sql,my_sqlite_callback,(void*)password,&errmsg);
    if(my_strcmp(password,msg->name) == 1)
    {
        return 1;
    }
    return 0;
}

void get_time(char *now_time)
{
    time_t now;
    struct tm *tm_now;
    time(&now);
    tm_now = localtime(&now);
    memset(now_time, 0, sizeof(now_time));
    // sprintf(now_time, "%d-%d-%d %d:%d:%d", tm_now->tm_year + 1900, tm_now->tm_mon + 1, 
    //                     tm_now->tm_mday, tm_now->tm_hour, tm_now->tm_min, tm_now->tm_sec);
        sprintf(now_time, "%d-%d-%d %d:%d:%d", tm_now->tm_year + 1900, tm_now->tm_mon + 1, 
                        tm_now->tm_mday, tm_now->tm_hour, tm_now->tm_min, tm_now->tm_sec);
}

void insert_data_message(Msg *pmsg)
{
    char sql[2048] = {0};
    char now_time[32] = {0};
    get_time(now_time);
    memset(sql, 0, sizeof(sql));
    sprintf(sql, "insert into storagemessage(fromname,toname,message) values('%s', '%s', '%s');",
                pmsg->name, pmsg->toname, pmsg->message);
    sqlite3_exec(db, sql, NULL, NULL, &errmsg);
    return;
}

int my_getmessage_callback(void *para, int contun, char **columvalue, char **columnName)
{
    // printf("进入回调\n");
    struct callback* call = (struct callback*)para;
    // printf("call name = %s,toname = %s",call->name,call->toname);
    if(strcmp(call->name, columvalue[0]) == 0)
    {
        fprintf(call->fp,"%s:我,时间:%s\n",columvalue[2],columvalue[3]);
        // printf("%s:我,时间:%s\n",columvalue[2],columvalue[3]);
    }
    else if(strcmp(call->toname, columvalue[0]) == 0)
    {
        fprintf(call->fp,"时间:%s,%s:%s\n",columvalue[3],columvalue[0],columvalue[2]);
        // printf("时间:%s,%s:%s\n",columvalue[3],columvalue[0],columvalue[2]);
    }

    return 0;
}

FILE *select_message(Msg *pmsg)
{
    char sql[2048] = {0};
    FILE *fp = fopen("message.text","w+");
    struct callback call;
    call.fp = fp;
    strcpy(call.name, pmsg->name);
    strcpy(call.toname, pmsg->toname);
    printf("开始回调\n");
    sprintf(sql, "select fromname,toname,message,timestamp from storagemessage where (toname = '%s' and fromname = '%s') or (fromname = '%s' and toname = '%s') order by timestamp desc limit 8;",pmsg->toname,pmsg->name,pmsg->toname,pmsg->name);
    sqlite3_exec(db, sql, my_getmessage_callback, (void*)&call, &errmsg);
    
    return fp;
}

int Create_sock()
{
    int ret;
    int sockfd;
    struct sockaddr_in s_addr;

    sockfd = socket(AF_INET,SOCK_STREAM,0);
    if(sockfd < 0)
    {
        perror("socket error!");
        exit(1);
    }

    int opt = 1;
    setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));//设置套接字可以重复使用端口号

    s_addr.sin_family = AF_INET;
    s_addr.sin_port = htons(5556);
    s_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    ret = bind(sockfd,(struct sockaddr *)&s_addr,sizeof(struct sockaddr_in));
    if(ret < 0)
    {
        perror("bind error!");
        exit(1);
    }

    ret = listen(sockfd,2000);
    if(ret < 0)
    {
        perror("bind error!");
        exit(1);
    }

    return sockfd;
}

void createfile(char *filename, char *filesize)
{
    int fd = open(filename, O_RDWR | O_CREAT);
    fchmod(fd, S_IRGRP | S_IRUSR | S_IWUSR | S_IROTH);
    lseek(fd, (*filesize)-1, SEEK_SET);
    write(fd, " ", 1);
    close(fd);
}

void send_file(Msg*pmsg,int cofd,int tofd)
{
    char *fp = (char *)malloc(RECVBUF_SIZE);
    //从cofd读到后,写入tofd
    //接受数据,往map内存写
    int remain_size = pmsg->bs; //数据块中待接收数据大小
    int err = pmsg->bs % RECVBUF_SIZE;
    int size = 0; //一次recv接受数据大小

#if 1
    while (remain_size > 0)
    {
        if ((size = recv(cofd, fp, RECVBUF_SIZE, MSG_WAITALL)) > 0)
        {
            write(tofd, fp, size); 
            remain_size -= size;
        }
        if (remain_size <= err)
        {
            size = recv(cofd, fp, remain_size, 0);
            write(tofd, fp, size); 
            remain_size -= size;
        }
    }
    free(fp);
    printf("转发中...\n");

    close(tofd);
    close(cofd);
#endif
}
void to_all(Msg *pmsg)
{
    struct online *pNode = head;
    while(pNode != NULL)
    {
        write(pNode->cfd, pmsg, sizeof(Msg));
        pNode = pNode->next;
    }
    return;
}

struct online *get_online(char *name)
{
    struct online *pNode = head;
    while(pNode != NULL)
    {
        if(strcmp(pNode->name, name) == 0)
        {
            return pNode;
        }
        pNode = pNode->next;
    }
    return NULL;
}

void *thread_read(void *arg)
{
    pthread_t id = pthread_self();
    pthread_detach(id);
    long cfd = (long)arg;

    Msg *pmsg = (Msg *)malloc(sizeof(Msg));
    
    while(1)
    {
        memset(pmsg,0,sizeof(Msg));
        int r_n = recv(cfd,pmsg,sizeof(Msg), 0);
        if(r_n == 0)
        {
            printf("server exit!\n");
            pthread_exit(NULL);
        }
        switch(pmsg->action)
        {
            case 0:
            {
                //判断用户名是否注册
                if(1 == queryname(pmsg))
                {
                    pmsg->action = 255;
                    write(cfd, pmsg, sizeof(Msg));
                    break;
                }
                char sql[100];
                memset(sql, 0, sizeof(sql));
                
                int ret = sqlite3_open("test.db", &db);
                if(ret < 0)
                {
                    perror("sqlite3_open error!");
                }
                sprintf(sql,"insert into person(name,password)values('%s',%s);",pmsg->name,pmsg->password);
                ret = sqlite3_exec(db, sql, NULL, NULL, &errmsg);

                break;
            }
            case 1:
            {
                if(0 == querypassword(pmsg))
                {
                    pmsg->action = -1;
                    write(cfd,pmsg,sizeof(Msg));
                    break;
                }

                struct online *new_user = (struct online *)malloc(sizeof(struct online));
                strcpy(new_user->name, pmsg->name);
                new_user->cfd = cfd;
                new_user->say = 0;
                insert_link(new_user);
                
                write(cfd,pmsg,sizeof(Msg));
                break;
            }
            case 2:
            {
                insert_data_message(pmsg);
                int to_fd = find_fd(pmsg->toname);
                if(to_fd == -1)
                {
                    pmsg->action = -2;
                    write(cfd, pmsg, sizeof(Msg));
                }
                else
                {
                    write(to_fd, pmsg, sizeof(Msg));
                }
                break;
            }
            case 3:
            {
                int to_fd = find_fd(pmsg->toname);
                if(to_fd == -1)
                {
                    pmsg->action = -2;
                    write(cfd, pmsg, sizeof(Msg));
                }
                else
                {
                    int i = 0;
                    while(addr[i] == 1)
                    {
                        i++;
                    }
                    addr[i] = 1;
                    pmsg->sfile_id = i;
                    write(cfd, pmsg, sizeof(Msg));
                    pmsg->action = -3;
                    write(to_fd, pmsg, sizeof(Msg));
                }
                break;
            }
            case 4:
            {
                insert_data_message(pmsg);
                struct online *pNode = head;
                while(pNode != NULL)
                {
                    if(strcmp(pNode->name, pmsg->name) == 0)
                    {
                        if(pNode->say != 1)
                        {
                            to_all(pmsg);
                            break;
                        }
                        else if(pNode->say == 1)
                        {
                            pmsg->action = 7;
                            write(cfd, pmsg, sizeof(Msg));
                            break;
                        }
                    }

                }
                break;
            }
            case 5:
            {
                //要传输对象的套接字
                int sofd = 0;
                //找到要传输的对象
                int to_fd = find_fd(pmsg->toname);
                if(to_fd == -1)
                {
                    pmsg->action = -2;
                    write(cfd, pmsg, sizeof(Msg));
                }  
                else
                {
                    //通知接收客户端做好准备;
                    write(to_fd, pmsg, sizeof(Msg));
                    //循环找cfd,用循环队列或者循环链表储存cfd。
                    for(int i=0; i<10; i = (++i)%10)
                    {
                        pthread_mutex_lock(&mutex);
                        if(find_cfd[i].use == 1)
                        {
                            if(find_cfd[i].offset == pmsg->offset)
                            {
                                sofd = find_cfd[i].cfd;
                                find_cfd[i].use = 0;
                                pthread_mutex_unlock(&mutex);
                                break;
                            }
                            else
                            {
                            }
                        }
                        pthread_mutex_unlock(&mutex);
                    }                    

                    send_file(pmsg,cfd,sofd);
                    pthread_cancel(id);
                }
                break;
            }
            case -5:
            {
                for(int i=0; i<10; i = (++i)%10)
                {
                    pthread_mutex_lock(&mutex);
                    if(find_cfd[i].use == 0)
                    {
                        strcpy(find_cfd[i].filename, pmsg->filename);
                        find_cfd[i].cfd = cfd;
                        find_cfd[i].use = 1;
                        find_cfd[i].offset = pmsg->offset;

                        
                        pthread_mutex_unlock(&mutex);
                        break;
                    }
                    pthread_mutex_unlock(&mutex);

                }
                break;
            }
            case 6:
            {
                char buf[1024];
                int ret = -1;
                memset(buf, 0, sizeof(buf));
                FILE *fp = fopen("user.txt", "w+");
                struct online*pNode = head;
                while(1)
                {
                    if(pNode->next == NULL)
                    {
                        fprintf(fp, "用户名:%s", pNode->name);
                        break;
                    }
                    fprintf(fp, "用户名:%s\n", pNode->name);
                    memset(pmsg->message, 0, sizeof(pmsg->message));
                    pNode = pNode->next;
                }
                fseek(fp, 0, SEEK_SET);
                ret = fread(buf, sizeof(buf), 1, fp);
                {
                    memcpy(pmsg->message, buf, strlen(buf));
                    pNode = head;
                    while(pNode != NULL)
                    {
                        write(pNode->cfd, pmsg, sizeof(Msg));
                        pNode = pNode->next;
                    }
                    memset(pmsg->message, 0, sizeof(pmsg->message));
                    memset(buf, 0, sizeof(buf));
                }
                printf("ret = %d\n",ret);
                fclose(fp);
                break;
            }
            case 7:
            {
                if(0 == query_manager_password(pmsg))
                {
                    pmsg->action = -1;
                }

                struct online *new_user = (struct online *)malloc(sizeof(struct online));
                strcpy(new_user->name, pmsg->name);
                new_user->cfd = cfd;
                new_user->say = 0;
                insert_link(new_user);
                pmsg->action = 1;
                write(cfd,pmsg,sizeof(Msg));
                break;
            }
            case 8:
            {
                struct online *pNode = get_online(pmsg->toname);
                if(pNode == NULL)
                {
                    pmsg->action = -2;
                    write(cfd, pmsg, sizeof(Msg));
                    break;
                }
                pNode->say = 1;
                write(cfd, pmsg, sizeof(Msg));
                break;
            }
            case -8:
            {
                struct online *pNode = get_online(pmsg->name);
                if(pNode == NULL)
                {
                    pmsg->action = -2;
                    write(cfd, pmsg, sizeof(Msg));
                    break;
                }
                pNode->say = 0;
                write(cfd, pmsg, sizeof(Msg));
                break;
            }
            case 9:
            {
                char buf[1024];
                int ret = -1;
                memset(buf, 0, sizeof(buf));
                FILE *fp = select_message(pmsg);

                fseek(fp, 0, SEEK_SET);
                while(feof(fp) == 0)
                {
                    fread(buf, sizeof(buf), 1, fp);
                    memcpy(pmsg->message, buf, strlen(buf));
                    
                    write(cfd, pmsg, sizeof(Msg));
                    memset(pmsg->message, 0, sizeof(pmsg->message));
                    memset(buf, 0, sizeof(buf));
                }
                fclose(fp);
                break; 
            }
            case 10:
            {
                struct online *pNode = head;
                struct online *temp = NULL;
                while(pNode != NULL)
                {
                    if(strcmp(pNode->name, pmsg->name) == 0)
                    {
                        if(temp != NULL)
                        {
                            temp->next = pNode->next;
                        }
                        else
                        {
                            head = pNode->next;
                        }
                        if(pNode != NULL)
                        {
                            free(pNode);
                            pNode = NULL;
                        }
                        break;
                    }
                    temp = pNode;
                    pNode = pNode->next;
                }  
                break;              
            }
        }

    } 
}

int query_manager_password(Msg *msg)
{
    char sql[1024] = {0};
    char password[20];

    sprintf(sql,"select password from manager where name = '%s';",msg->name);
    sqlite3_exec(db,sql,my_sqlite_callback,(void*)password,&errmsg);
    if(my_strcmp(password,msg->password) == 1)
    {
        return 1;
    }
    return 0;
}

//先写在函数内,调试好以后用execl调用
void server_file()//端口号先写死5858.                
{
    int ret;
    int sockfd;
    struct sockaddr_in s_addr;

    sockfd = socket(AF_INET,SOCK_STREAM,0);
    if(sockfd < 0)
    {
        perror("socket error!");
        exit(1);
    }

    int opt = 1;
    setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));//设置套接字可以重复使用端口号

    s_addr.sin_family = AF_INET;
    s_addr.sin_port = htons(5858);
    s_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    ret = bind(sockfd,(struct sockaddr *)&s_addr,sizeof(struct sockaddr_in));
    if(ret < 0)
    {
        perror("bind error!");
        exit(1);
    }

    ret = listen(sockfd,20);
    if(ret < 0)
    {
        perror("bind error!");
        exit(1);
    }

    socklen_t c_size;
    struct sockaddr_in c_addr;
    c_size = sizeof(struct sockaddr_in);

    while(1)
    {
        int cfd = accept(sockfd,(struct sockaddr*)&c_addr,&c_size);
        if(cfd < 0) 
        {
            perror("accept error!");
            exit(1);
        }
        printf("ip = %s port = %d\n",inet_ntoa(c_addr.sin_addr),ntohs(c_addr.sin_port));

    }
}

work.h

#ifndef _MSG_H_
#define _MSG_H_

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <errno.h>
#include <signal.h>

#include <pthread.h>
#include <sqlite3.h>
#include <unistd.h>

#include <sys/epoll.h>


#define EVENTS_SIEZ 1024
// #define RECVBUF_SIZE    65536       //64K
#define RECVBUF_SIZE    100       //64K
#define INT_SIZE 4

struct callback
{
    FILE *fp;
    char name[20];
    char toname[20];
};

struct online
{
    int cfd;
    int say;
    char name[20];
    struct online *next;
};

typedef struct oooo
{
    int cfd;
    int use;
    int offset;
    char filename[20];
}stro_sockfd;

typedef struct messgae
{
    int action;
    int sfile_id;
    int filesize;
    int count;
    int bs;
    int offset;
    long cfd;
    char name[20];
    char password[20];
    char toname[20];
    char filename[20];
    char message[1024];
    char now_time[32];
}Msg;
stro_sockfd find_cfd[20];


int Create_sock();
void createlist();
void *thread_read(void *arg);
void server_file();//端口号先写死5858.                
int query_manager_password(Msg *msg);




#endif // !_MSG_H_

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐