linux下的网络聊天室
linux下的网络聊天室一版:多reactor模式实现高并发服务器。sever.c#include "msg.h"#include "pthreadpool.h"#define PORT 5554int sockfd;char *errmsg;sqlite3 *db;struct online *head = NULL;void createlist(){char sql[1024] = {0};
·
linux下的网络聊天室
一版:多reactor模式实现高并发服务器。
sever.c
#include "msg.h"
#include "pthreadpool.h"
#define PORT 5554
int sockfd;
char *errmsg;
sqlite3 *db;
struct online *head = NULL;
void createlist()
{
char sql[1024] = {0};
int ret = sqlite3_open("test.db",&db);
memset(sql,0,sizeof(sql));
strcpy(sql,"create table if not exists person(id integer primary key,name text,password text);");
ret = sqlite3_exec(db,sql,NULL,NULL,&errmsg);
return;
}
void createmessage()
{
char sql[1024] = {0};
int ret = sqlite3_open("test.db",&db);
memset(sql,0,sizeof(sql));
strcpy(sql,"create table if not exists message(id integer primary key,name text,toname text,message text);");
ret = sqlite3_exec(db,sql,NULL,NULL,&errmsg);
return;
}
// void insert_message(Msg *pmsg)
// {
// char sql[1024] = {0};
// sprintf(sql,"insert into person(name,toname,message)values('%s','%s','%s');",pmsg->name,pmsg->toname,pmsg->message);
// sqlite3_exec(db,sql,NULL,NULL,&errmsg);
// return ;
// }
int my_strcmp(char *dest,char *src)
{
printf("dest:%s\n",dest);
printf("src:%s\n",src);
int i = 0;
while(dest[i] == src[i])
{
i++;
if(dest[i] == '\0')
{
return 1;
}
}
return 0;
}
int my_sqlite_callback(void *para,int contun,char **columvalue,char **columnName)
{
strcpy((char *)para,columvalue[0]);
return 0;
}
int querypassword(Msg *msg)
{
char sql[1024] = {0};
char password[20];
sprintf(sql,"select password from person where name = '%s';",msg->name);
sqlite3_exec(db,sql,my_sqlite_callback,(void*)password,&errmsg);
if(my_strcmp(password,msg->password) == 1)
{
return 1;
}
return 0;
}
void insert_link(struct online *new_user)
{
new_user->next = head;
head = new_user;
}
void server_exit(int sig)
{
shutdown(sockfd,SHUT_RDWR);
exit(1);
}
int find_fd(char *toname)
{
struct online*temp = head;
while(temp != NULL)
{
if(strcmp(temp->name,toname) == 0)
{
return temp->cfd;
}
temp = temp->next;
}
return -1;
}
void *thread_read(void *arg)
{
pthread_t id = pthread_self();
pthread_detach(id);
long cfd = (long)arg;
Msg *pmsg = (Msg *)malloc(sizeof(Msg));
// while(1)
// {
memset(pmsg,0,sizeof(Msg));
int r_n = read(cfd,pmsg,sizeof(Msg));
if(r_n == 0)
{
printf("server exit!\n");
pthread_exit(NULL);
}
switch(pmsg->action)
{
case 1:
{
if(!querypassword(pmsg))
{
pmsg->action = -1;
}
struct online *new_user = (struct online*)malloc(sizeof(struct online));
strcpy(new_user->name,pmsg->name);
new_user->cfd = cfd;
insert_link(new_user);
write(cfd,pmsg,sizeof(Msg));
break;
}
case 2:
{
int to_fd = find_fd(pmsg->toname);
if(to_fd == -1)
{
// insert_message(pmsg);
pmsg->action = -2;
write(cfd,pmsg,sizeof(Msg));
}
else
{
write(to_fd,pmsg,sizeof(Msg));
}
break;
}
case 0:
{
char sql[1024] = {0};
int ret = sqlite3_open("test.db",&db);
memset(sql,0,sizeof(sql));
sprintf(sql,"insert into person(name,password)values('%s',%s);",pmsg->name,pmsg->password);
ret = sqlite3_exec(db,sql,NULL,NULL,&errmsg);
break;
}
case 3:
{
struct online*temp = head;
while(temp != NULL)
{
write(temp->cfd,pmsg,sizeof(Msg));
temp = temp->next;
}
break;
}
case 4:
{
struct online*pNode = NULL;
struct online*temp = head;
while(temp != NULL)
{
if(strcmp(temp->name,pmsg->name) == 0)
{
if(pNode == NULL)
{
head = temp->next;
free(temp);
temp = NULL;
pmsg->action = -4;
break;
}
pNode->next = temp->next;
free(temp);
temp = NULL;
pmsg->action = -4;
break;
}
pNode = temp;
temp = temp->next;
}
break;
}
case 5:
{
// char buff[1024];
memset(pmsg->message,0,sizeof(pmsg->message));
struct online * temp = head;
while(temp != NULL)
{
temp->name[strlen(temp->name)+1] = '\n';
strcat(pmsg->message,temp->name);
temp = temp->next;
}
write(cfd,pmsg,sizeof(Msg));
break;
}
case 6:
{
int to_fd = find_fd(pmsg->toname);
if(to_fd == -1)
{
pmsg->action = -2;
write(cfd,pmsg,sizeof(Msg));
}
else
{
write(to_fd,pmsg,sizeof(Msg));
}
break;
}
case -6:
{
int to_fd = find_fd(pmsg->toname);
write(to_fd,pmsg,sizeof(Msg));
break;
}
}
// }
free(pmsg);
pmsg = NULL;
return NULL;
}
#include <sys/epoll.h>
#define EPOLL_SIZE 1024
int main(int argc,char *argv[])
{
createlist();
createmessage();
signal(SIGINT,server_exit);
signal(SIGPIPE,SIG_IGN);
long cfd;
char message[1024];
Msg pmsg;
pthread_t id;
socklen_t c_size;
struct sockaddr_in s_addr;
struct sockaddr_in c_addr;
int epfd;
epfd = epoll_create(2000);
tpool_create(20);
struct epoll_event events[EPOLL_SIZE];
struct epoll_event event;
if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0)
{
perror("socket error!");
exit(1);
}
int opt = 1;
setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));//设置套接字可以重复使用端口号
s_addr.sin_family = AF_INET;
s_addr.sin_port = htons(PORT);
s_addr.sin_addr.s_addr = htons(INADDR_ANY);
int ret = bind(sockfd,(struct sockaddr*)&s_addr,sizeof(struct sockaddr_in));
if(ret != 0)
{
perror("bind error!");
exit(1);
}
ret = listen(sockfd,20);
if(ret != 0)
{
printf("listen error!");
exit(1);
}
event.events = EPOLLIN;
event.data.fd = sockfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,sockfd,&event);
c_size = sizeof(struct sockaddr_in);
while(1)
{
printf("start\n");
int num = epoll_wait(epfd,events,EPOLL_SIZE,-1);
for(int i=0; i<num; i++)
{
if(events[i].events == EPOLLIN)
{
if(events[i].data.fd == sockfd)
{
cfd = accept(sockfd,(struct sockaddr *)&c_addr,&c_size);
if(cfd < 0)
{
perror("accpet error!");
exit(1);
}
printf("ip = %s port = %d\n",inet_ntoa(c_addr.sin_addr),ntohs(c_addr.sin_port));
event.events = EPOLLIN;
event.data.fd = cfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &event);
if((--num) == 0)
{
break;
}
}
else
{
printf("int\n");
long fd = events[i].data.fd;
// printf("%ld\n",fd);
// pthread_create(&id,NULL,thread_read,(void *)fd);
tpool_add_work(thread_read,(void*)fd);
if(--num <= 0)
{
break;
}
}
}
}
}
shutdown(cfd,SHUT_RDWR);
return 0;
}
client.c
#include "msg.h"
char *errmsg;
sqlite3 *db;
int fd;
int my_strcmp(char *dest,char *src)
{
printf("dest:%s\n",dest);
printf("src:%s\n",src);
int i = 0;
while(dest[i] == src[i])
{
i++;
if(dest[i] == '\0')
{
return 1;
}
}
return 0;
}
int my_sqlite_callback(void *para,int contun,char **columvalue,char **columnName)
{
strcpy((char *)para,columvalue[0]);
return 0;
}
int querypassword(Msg *msg)
{
char sql[1024] = {0};
char password[20];
sqlite3_open("test.db",&db);
sprintf(sql,"select name from person where name = '%s';",msg->name);
sqlite3_exec(db,sql,my_sqlite_callback,(void*)password,&errmsg);
if(my_strcmp(password,msg->name) == 1)
{
return 1;
}
return 0;
}
void cliten_exit(int sig)
{
exit(1);
}
void *thread_read(void *arg)
{
pthread_t id = pthread_self();
pthread_detach(id);
long cfd = (long)arg;
Msg *pmsg = (Msg *)malloc(sizeof(Msg));
while(1)
{
memset(pmsg,0,sizeof(Msg));
int r_n = read(cfd,pmsg,sizeof(Msg));
if(r_n == 0)
{
printf("server exit!\n");
pthread_exit(NULL);
}
switch(pmsg->action)
{
case 1:
{
printf("log succeessfully\n");
break;
}
case -1:
{
printf("log failty!\n");
break;
}
case 2:
{
printf("当前时间!(未解决)\n");
printf("%s chat you:%s\n",pmsg->name, pmsg->message);
break;
}
case -2:
{
printf("对方不在线!\n");
break;
}
case 3:
{
printf("当前时间!(未解决)\n");
printf("%s chat all:%s\n",pmsg->name,pmsg->message);
break;
}
case 4:
{
printf("用户没找到!\n");
break;
}
case -4:
{
printf("用户成功退出!\n");
break;
}
case 5:
{
printf("%s\n",pmsg->message);
break;
}
case 6:
{
int ret=0;
fd = open(pmsg->filename,O_WRONLY | O_CREAT | O_APPEND,0655);
if(write(fd,pmsg->message,strlen(pmsg->message)) < 0)
{
perror("write error!");
exit(1);
}
break;
}
case -6:
{
printf("文件传输完成!\n");
close(fd);
break;
}
}
}
}
int main(int argc,char *argv[])
{
if(argc < 3)
{
printf("ip,pork\n");
exit(1);
}
signal(SIGINT,cliten_exit);
Msg *pmsg = (Msg *)malloc(sizeof(Msg));
long sockfd;
char message[1024];
pthread_t id;
struct sockaddr_in s_addr;
if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0)
{
perror("socket error!");
exit(1);
}
s_addr.sin_family = AF_INET;
s_addr.sin_port = htons(atoi(argv[2]));
s_addr.sin_addr.s_addr = inet_addr(argv[1]);
if(connect(sockfd,(struct sockaddr *)&s_addr,sizeof(struct sockaddr)) != 0)
{
perror("connect error!");
exit(1);
}
pthread_create(&id,NULL,thread_read,(void *)sockfd);
printf("cmd log reg chat all view exit\n");
char cmd[1024];
memset(pmsg,0,sizeof(Msg));
while(1)
{
memset(cmd,0,sizeof(cmd));
printf("input cmd:\n");
scanf("%s",cmd);
if(strcmp(cmd,"log") == 0)
{
memset(pmsg,0,sizeof(Msg));
pmsg->action = 1;
printf("input name\n");
scanf("%s",pmsg->name);
printf("input password\n");
scanf("%s",pmsg->password);
if(write(sockfd,pmsg,sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
}
if((strcmp(cmd,"exit") == 0) && (pmsg->action > 0))
{
pmsg->action = 4;
if(write(sockfd,pmsg,sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
}
if(strcmp(cmd,"reg") == 0)
{
memset(pmsg,0,sizeof(Msg));
pmsg->action = 0;
printf("input name\n");
scanf("%s",pmsg->name);
while(querypassword(pmsg) == 1)
{
printf("已存在重新input name\n");
scanf("%s",pmsg->name);
}
printf("input password\n");
scanf("%s",pmsg->password);
if(write(sockfd,pmsg,sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
}
if((strcmp(cmd,"chat") == 0) && (pmsg->action > 0))
{
pmsg->action = 2;
memset(pmsg->toname,0,sizeof(pmsg->toname));
printf("input toname\n");
scanf("%s",pmsg->toname);
memset(pmsg->message,0,sizeof(pmsg->message));
printf("input message\n");
scanf("%s",pmsg->message);
if(write(sockfd,pmsg,sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
}
else if(pmsg->action < 0)
{
printf("请登录!\n");
}
if((strcmp(cmd,"all") == 0) && (pmsg->action > 0))
{
pmsg->action = 3;
memset(pmsg->message,0,sizeof(pmsg->message));
printf("input message\n");
scanf("%s",pmsg->message);
if(write(sockfd,pmsg,sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
}
else if(pmsg->action < 0)
{
printf("请登录!\n");
}
if((strcmp(cmd,"view") == 0) && (pmsg->action > 0))
{
pmsg->action = 5;
printf("1\n");
if(write(sockfd,pmsg,sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
}
if((strcmp(cmd,"sfile") == 0) && (pmsg->action > 0))
{
pmsg->action = 6;
memset(pmsg->toname,0,sizeof(pmsg->toname));
printf("input toname\n");
scanf("%s",pmsg->toname);
memset(pmsg->filename,0,sizeof(pmsg->filename));
printf("input filename\n");
scanf("%s",pmsg->filename);
int ret=0;
int fd = open(pmsg->filename,O_RDONLY);
while(1)
{
memset(pmsg->message,0,sizeof(pmsg->message));
ret = read(fd,pmsg->message,sizeof(pmsg->message));
if(ret == 0)
{
pmsg->action = -6;
write(sockfd,pmsg,sizeof(Msg));
printf("发送成功!\n");
break;
}
if(write(sockfd,pmsg,sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
}
close(fd);
}
else if(pmsg->action < 0)
{
printf("请登录!\n");
}
}
return 0;
}
msg.h:
#ifndef _MSG_H_
#define _MSG_H_
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <signal.h>
#include <pthread.h>
#include <sqlite3.h>
#include <unistd.h>
struct online
{
int cfd;
char name[20];
struct online *next;
// struct online *pre;
};
typedef struct messgae
{
int flag;
int action;
char name[20];
char password[20];
char toname[20];
char filename[20];
char message[1024];
struct online* head;
}Msg;
#endif // !_MSG_H_
线程池:
pthreadpool.c
#include "pthreadpool.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <errno.h>
static tpool_t*tpool = NULL;
//工作者线程,从任务链表中取出任务并执行
static void *thread_function(void *arg)
{
tpool_work_t *work;
while(1)
{
//任务队列为空,且线程池未关闭,线程阻塞等待任务
pthread_mutex_lock(&tpool->queue_lock);
while(!tpool->queue_head && !tpool->shutdown)
{
pthread_cond_wait(&tpool->queue_ready,&tpool->queue_lock);
}
//是否销毁线程
if(tpool->shutdown)
{
pthread_mutex_unlock(&tpool->queue_lock);
pthread_exit(NULL);
}
//取出任务,执行任务
work = tpool->queue_head;
tpool->queue_head = tpool->queue_head->next;
pthread_mutex_unlock(&tpool->queue_lock);
work->function(work->arg);
//线程完成后,释放任务
// free(work->arg);
free(work);
}
return NULL;
}
//创建线程池
int tpool_create(int maxNum)
{
int i;
//创建线程池结构体
tpool = calloc(1,sizeof(tpool_t));
if(!tpool)
{
perror("create error!");
exit(1);
}
//初始化任务链表,互斥量,条件变量
tpool->maxNum = maxNum;
tpool->shutdown = 0;
tpool->queue_head = NULL;
tpool->queue_tail = NULL;
if(pthread_mutex_init(&tpool->queue_lock,NULL) != 0)
{
perror("mutex error!");
exit(1);
}
if(pthread_cond_init(&tpool->queue_ready,NULL) != 0)
{
perror("cond error!");
exit(1);
}
//创建worker线程
tpool->threadID = calloc(maxNum,sizeof(pthread_t));
if(!tpool->threadID)
{
perror("threadID error!");
exit(1);
}
for(i=0; i<maxNum; i++)
{
pthread_create(&tpool->threadID[i],NULL,thread_function,NULL);
// pthread_detach(threadID[i]);
}
return 0;
}
//向线程池添加任务
int tpool_add_work(void *(*function)(void*),void *arg)
{
//work指向等待加入的任务
tpool_work_t *work;
if(!function)
{
perror("add error!");
return -1;
}
work = malloc(sizeof(tpool_work_t));
work->function = function;
work->arg = arg;
work->next = NULL;
//将任务结点添加到任务链表
pthread_mutex_lock(&tpool->queue_lock);
//链表为空
if(!tpool->queue_head)
{
tpool->queue_head = work;
tpool->queue_tail = work;
}
//链表非空
else
{
tpool->queue_tail->next = work;
tpool->queue_tail = tpool->queue_tail->next;
}
//通知工作者线程,有新任务
pthread_cond_signal(&tpool->queue_ready);
pthread_mutex_unlock(&tpool->queue_lock);
return 0;
}
//销毁线程池
void tpool_destroy()
{
int i;
tpool_work_t*member;
if(tpool->shutdown)
{
return;
}
//关闭线程池开关
tpool->shutdown = 1;
//唤醒所有阻塞的线程
pthread_mutex_lock(&tpool->queue_lock);
pthread_cond_broadcast(&tpool->queue_ready);
pthread_mutex_unlock(&tpool->queue_lock);
//回收结束线程剩余资源
for(i=0; i<tpool->maxNum; i++)
{
pthread_join(tpool->threadID[i],NULL);
}
//释放threadID数组
free(tpool->threadID);
//释放未完成的任务
while(tpool->queue_head)
{
member = tpool->queue_head;
tpool->queue_head = tpool->queue_head->next;
// free(member->arg);
free(member);
}
//销毁互斥量,条件变量
pthread_mutex_destroy(&tpool->queue_lock);
pthread_cond_destroy(&tpool->queue_ready);
//释放进程池结构体
free(tpool);
}
pthreadpool.h
#ifndef PTHREADPOOL_H
#define PTHREADPOOL_H
#include <pthread.h>
//任务结点
typedef struct tpool_work
{
void *(*function)(void*); //任务函数
void *arg; //传入参数
struct tpool_work *next;
}tpool_work_t;
//线程池
typedef struct tpool
{
int shutdown; //销毁线程池
int maxNum; //最大线程数
pthread_t *threadID; //线程ID数组首地址
tpool_work_t *queue_head; //任务链表队首
tpool_work_t *queue_tail;
pthread_mutex_t queue_lock;
pthread_cond_t queue_ready;
}tpool_t;
//创建线程
int tpool_create(int maxNum);
//销毁线程池
void tpool_destroy();
//向线程池中添加任务
int tpool_add_work(void*(*function)(void *),void*arg);
#endif // !
mmap内存映射,实现大文件传输
client客户端
client.c
#include "work.h"
int lognum = 0;
int main(int argc, char *argv[])
{
pthread_t id;
long sockfd = Create_sock();
pthread_create(&id,NULL,thread_read,(void *)sockfd);
int num;
Msg *pmsg = (Msg *)malloc(sizeof(Msg));
while(1)
{
lognum = 0;
viewlogin();
scanf("%d",&num);
if( 0 > num || num > 3)
{
continue;
}
if(num == 0)
{
exit(1);
}
if(num == 2)
{
reg_user(sockfd,pmsg);
}
if(num == 1)
{
log_user(sockfd, pmsg);
while(1)
{
if(3 == lognum)
{
break;
}
if(-1 == lognum)
{
printf("登录失败\n");
sleep(1);
break;
}
else if(1 == lognum)
{
view_user(sockfd, pmsg);
while(1)
{
viewhome(sockfd, pmsg);
scanf("%d",&lognum);
if(1 == lognum)
{
chat_user(sockfd, pmsg);
continue;
}
if(2 == lognum)
{
all_user(sockfd, pmsg);
continue;
}
if(3 == lognum)
{
quite_user(sockfd, pmsg);
break;
}
}
}
}
}
if(num == 3)
{
log_manager(sockfd, pmsg);
while(1)
{
if(-1 == lognum)
{
printf("登录失败\n");
sleep(1);
break;
}
else if(1 == lognum)
{
while(1)
{
viewhome(sockfd, pmsg);
scanf("%d",&lognum);
if(1 == lognum)
{
chat_user(sockfd, pmsg);
continue;
}
if(2 == lognum)
{
all_manager(sockfd, pmsg);
continue;
}
if(-1 == lognum)
{
break;
}
}
}
}
}
}
}
work.h
#ifndef _MSG_H_
#define _MSG_H_
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <signal.h>
#include <pthread.h>
#include <sqlite3.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/epoll.h>
#include <semaphore.h>
#define EVENTS_SIEZ 1024
#define INT_SIZE 4
// #define SEND_SIZE 65536 //64K
// #define BLOCKSIZE 134217728 //128M
// #define RECVBUF_SIZE 65536 //64K
#define SEND_SIZE 100 //64K
#define BLOCKSIZE 500 //128M
#define RECVBUF_SIZE 100 //64K
struct conn
{
int info_fd; //信息交换socket:接收文件信息、文件传送通知client
char filename[30]; //文件名
int filesize; //文件大小
int bs; //分块大小
int count; //分块数量
int recvcount; //已接收块数量,recv_count == count表示传输完毕
char *mbegin; // mmap起始地址
int used; //使用标记,1代表使用,0代表可用
};
typedef struct messgae
{
int action;
int sfile_id;
int filesize;
int count;
int bs;
int offset;
long cfd;
char name[20];
char password[20];
char toname[20];
char filename[20];
char message[1024];
char now_time[32];
}Msg;
int Create_sock();
void viewhome();
void viewlogin();
void viewchat();
void viewall();
void reg_user(int sockfd, Msg *pmsg);
void* thread_read(void *arg);
void log_user(int sockfd, Msg *pmsg);
void chat_user(int sockfd, Msg *pmsg);
void send_file(int sockfd, Msg *pmsg);
void all_user(int sockfd, Msg *pmsg);
void client_file(int port);
void all_manager(int sockfd, Msg *pmsg);
void log_manager(int sockfd, Msg *pmsg);
void view_user(int sockfd, Msg *pmsg);
void get_time(char *now_time);
void quite_user(int sockfd, Msg *pmsg);
#endif // !_MSG_H_
work.c
#include "work.h"
#define SEM_SIZE 2
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex_send = PTHREAD_MUTEX_INITIALIZER;
sem_t sem[SEM_SIZE];
pthread_cond_t cond_sfile = PTHREAD_COND_INITIALIZER;
extern int lognum;
char *mbegin;
int fork_num;
int sfile_id;
struct conn gconn[20];
void printf_stea(Msg *pmsg)
{
char now_time[32] = {0};
get_time(now_time);
printf("时间%s,%s:%s\n",now_time,pmsg->name,pmsg->message);
return;
}
void init_sem()
{
for(int i=0; i<SEM_SIZE; i++)
{
if(i == 0)
{
sem_init(&sem[i], 0, 1);
}
else
{
sem_init(&sem[i], 0, 0);
}
}
return;
}
//创建大小为size的文件
int createfile(char *filename, int size)
{
int tmpfd = open(filename, O_RDWR | O_CREAT, 0655);
fchmod(tmpfd, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
ftruncate(tmpfd, size);
close(tmpfd);
return 0;
}
void *recv_file(void *arg)
{
//线程独立
pthread_t id = pthread_self();
pthread_detach(id);
sem_wait(&sem[1]);
Msg omsg;
memcpy(&omsg, (Msg *)arg, sizeof(Msg));
sem_post(&sem[0]);
int sockfd1 = Create_sock();
//发送套接字
omsg.action = -5;
write(sockfd1, &omsg, sizeof(Msg));
//准备接收
char *fp = gconn[omsg.sfile_id].mbegin + omsg.offset;
int ret = 0;
while(1)
{
if((ret = (recv(sockfd1, fp, RECVBUF_SIZE, 0))) == RECVBUF_SIZE)
{
fp += RECVBUF_SIZE;
}
else if(ret < RECVBUF_SIZE)
{
break;
}
}
close(sockfd1);
return NULL;
}
void get_time(char *now_time)
{
time_t now;
struct tm *tm_now;
time(&now);
tm_now = localtime(&now);
memset(now_time, 0, sizeof(now_time));
sprintf(now_time, "%d-%d-%d %d:%d:%d", tm_now->tm_year + 1900, tm_now->tm_mon + 1,
tm_now->tm_mday, tm_now->tm_hour, tm_now->tm_min, tm_now->tm_sec);
return;
}
void *thread_read(void *arg)
{
init_sem();
pthread_t id = pthread_self();
pthread_detach(id);
long cfd = (long)arg;
Msg *pmsg = (Msg *)malloc(sizeof(Msg));
while(1)
{
memset(pmsg,0,sizeof(Msg));
int r_n = recv(cfd,pmsg,sizeof(Msg), MSG_WAITALL);
if(r_n == 0)
{
printf("server exit!\n");
pthread_exit(NULL);
}
switch(pmsg->action)
{
case 255:
{
printf("注册失败\n");
break;
}
case 1:
{
lognum = 1;
break;
}
case -1:
{
lognum = -1;
break;
}
case 2:
{
printf_stea(pmsg);
break;
}
case -2:
{
printf("未上线\n");
break;
}
case 3:
{
sfile_id = pmsg->sfile_id;
pthread_cond_signal(&cond_sfile);
break;
}
case 4:
{
printf("%s对%s:%s\n",pmsg->name,pmsg->toname,pmsg->message);
break;
}
case -3:
{
createfile("copy", pmsg->filesize);
int fd;
if ((fd = open("copy", O_RDWR)) == -1)
{
printf("open file erro\n");
exit(-1);
}
char *map = (char *)mmap(NULL, pmsg->filesize,
PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
close(fd);
gconn[pmsg->sfile_id].mbegin = map;
gconn[pmsg->sfile_id].recvcount = 0;
gconn[pmsg->sfile_id].count = pmsg->count;
break;
}
case 5:
{
sem_wait(&sem[0]);
Msg omsg;
memcpy(&omsg, pmsg, sizeof(Msg));
sem_post(&sem[1]);
pthread_t id;
if(pthread_create(&id, NULL, recv_file, (void*)&omsg) != 0)
{
printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__, errno, strerror(errno));
exit(1);
}
break;
}
case 6:
{
char buf[100];
FILE *fp = fopen("user.txt", "w+");
if(fwrite(pmsg->message, strlen(pmsg->message), 1, fp)
!= sizeof(pmsg->message))
{
fseek(fp, 0, SEEK_SET);
while(feof(fp) == 0)
{
fscanf(fp, "%s", buf);
printf("%s\n",buf);
memset(buf, 0, sizeof(buf));
}
fclose(fp);
}
break;
}
case 7:
{
printf("你被禁言了!\n");
break;
}
case 8:
{
printf("禁言成功!\n");
break;
}
case -8:
{
printf("解除禁言!\n");
break;
}
case 9:
{
char buf[100];
char buf2[100];
FILE *fp = fopen("message.txt", "w+");
while(fwrite(pmsg->message, strlen(pmsg->message), 1, fp)
!= sizeof(pmsg->message))
{
fseek(fp, 0, SEEK_SET);
while(feof(fp) == 0)
{
// fscanf(fp, "%s %s", buf, buf2);
int i=0;
char ch;
fread(&ch, 1, 1, fp);
while(ch != '\n')
{
buf[i] = ch;
i++;
fread(&ch, 1, 1, fp);
}
printf("%s\n",buf);
memset(buf, 0, sizeof(buf));
}
fclose(fp);
break;
}
break;
}
}
}
return NULL;
}
int Create_sock()
{
int ret;
int sockfd;
struct sockaddr_in s_addr;
sockfd = socket(AF_INET,SOCK_STREAM,0);
if(sockfd < 0)
{
perror("socket error!");
exit(1);
}
s_addr.sin_family = AF_INET;
s_addr.sin_port = htons(5556);
s_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
ret = connect(sockfd, (struct sockaddr *)&s_addr, sizeof(struct sockaddr_in));
if(ret < 0)
{
perror("connect error!");
exit(1);
}
return sockfd;
}
int Create_UDP(int port)
{
int sockfd;
struct sockaddr_in s_addr;
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
s_addr.sin_family = AF_INET;
s_addr.sin_port = htons(port);
s_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
bind(sockfd, (struct sockaddr*)&s_addr, sizeof(struct sockaddr_in));
return sockfd;
}
void viewlogin()
{
system("clear");
printf("login\n");
printf("1.登录\n");
printf("2.注册\n");
printf("3.管理员登录\n");
printf("0.退出\n");
}
void view_user(int sockfd, Msg *pmsg)
{
pmsg->action = 6;
write(sockfd, pmsg, sizeof(Msg));
}
void viewhome(int sockfd, Msg *pmsg)
{
system("clear");
printf("home\n");
printf("2.群聊\n");
printf("1.私聊\n");
printf("3.退出\n");
}
void reg_user(int sockfd, Msg *pmsg)
{
memset(pmsg, 0, sizeof(Msg));
pmsg->action = 0;
printf("input name\n");
scanf("%s",pmsg->name);
printf("input password\n");
scanf("%s",pmsg->password);
if(write(sockfd, pmsg, sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
return;
}
void log_user(int sockfd, Msg *pmsg)
{
memset(pmsg, 0, sizeof(Msg));
pmsg->action = 1;
printf("input name\n");
scanf("%s",pmsg->name);
printf("input password\n");
scanf("%s",pmsg->password);
if(write(sockfd, pmsg, sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
return;
}
void viewchat()
{
system("clear");
printf("chat\n");
printf("quite.退出\n");
printf("sfile.发文件\n");
}
void chat_user(int sockfd, Msg *pmsg)
{
printf("input toname\n");
scanf("%s",pmsg->toname);
viewchat();
pmsg->action = 9;
write(sockfd, pmsg, sizeof(Msg));
while(1)
{
memset(pmsg->message, 0, sizeof(pmsg->message));
scanf("%s",pmsg->message);
if((0 == strcmp(pmsg->message, "sfile")))
{
send_file(sockfd, pmsg);
}
else if((0 == strcmp(pmsg->message,"quite")))
{
return;
}
else
{
pmsg->action = 2;
if(write(sockfd,pmsg,sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
}
}
}
void viewall()
{
system("clear");
printf("all:\n");
printf("1.群聊\n");
printf("2.私聊\n");
printf("0.退出\n");
}
void all_user(int sockfd, Msg *pmsg)
{
strcpy(pmsg->toname, "all");
viewall();
while(1)
{
memset(pmsg->message, 0, sizeof(pmsg->message));
scanf("%s", pmsg->message);
if((0 == strcmp(pmsg->message, "sfile")))
{
}
if((0 == strcmp(pmsg->message,"quite")))
{
return;
}
pmsg->action = 4;
if(write(sockfd,pmsg,sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
}
}
void quite_user(int sockfd, Msg *pmsg)
{
pmsg->action = 10;
write(sockfd, pmsg, sizeof(Msg));
return;
}
void view_manager()
{
system("clear");
printf("all:\n");
printf("quite.退出\n");
printf("unable.禁言\n");
printf("capable.解除禁言\n");
printf("kick.踢人\n");
}
void all_manager(int sockfd, Msg *pmsg)
{
strcpy(pmsg->toname, "all");
view_manager();
while(1)
{
memset(pmsg->message, 0, sizeof(pmsg->message));
scanf("%s", pmsg->message);
if((0 == strcmp(pmsg->message, "unable")))
{
pmsg->action = 8;
printf("禁言的人是:\n");
memset(pmsg->toname, 0, sizeof(pmsg->toname));
scanf("%s",pmsg->toname);
write(sockfd, pmsg, sizeof(Msg));
}
else if((0 == strcmp(pmsg->message, "capable")))
{
pmsg->action = -8;
printf("解除禁言的人是:\n");
memset(pmsg->toname, 0, sizeof(pmsg->toname));
scanf("%s",pmsg->toname);
write(sockfd, pmsg, sizeof(Msg));
}
else if((0 == strcmp(pmsg->message, "kick")))
{
//释放用户结点
pmsg->action = 10;
}
else if((0 == strcmp(pmsg->message,"quite")))
{
return;
}
else
{
memset(pmsg->toname, 0, sizeof(pmsg->toname));
strcpy(pmsg->toname, "all");
pmsg->action = 4;
if(write(sockfd,pmsg,sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
}
}
}
void client_file(int port)
{
int ret;
int sockfd;
struct sockaddr_in s_addr;
sockfd = socket(AF_INET,SOCK_STREAM,0);
if(sockfd < 0)
{
perror("socket error!");
exit(1);
}
s_addr.sin_family = AF_INET;
s_addr.sin_port = htons(port);
s_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
ret = connect(sockfd, (struct sockaddr *)&s_addr, sizeof(struct sockaddr_in));
if(ret < 0)
{
perror("connect error!");
exit(1);
}
}
Msg *new_fb_head(Msg *p_msg)
{
Msg*pmsg = (Msg*)malloc(sizeof(Msg));
memset(pmsg, 0, sizeof(Msg));
strcpy(pmsg->filename, p_msg->filename);
strcpy(pmsg->name, p_msg->name);
strcpy(pmsg->toname, p_msg->toname);
pmsg->sfile_id = p_msg->sfile_id;
pmsg->offset = p_msg->offset;
pmsg->bs = BLOCKSIZE;
p_msg->offset += BLOCKSIZE;
return pmsg;
}
void *send_filedata(void *arg)
{
Msg*omsg = (Msg*)arg;
//线程独立
pthread_t id = pthread_self();
pthread_detach(id);
int sockfd2 = Create_sock();
omsg->action = 5;
//让服务器线程准备好
write(sockfd2, omsg, sizeof(Msg));
//连续的映射数据
int i=0;
int num = omsg->bs/SEND_SIZE;
char *fp = mbegin + omsg->offset;
for(i=0; i<num; i++)
{
if((write(sockfd2, fp, SEND_SIZE)) == SEND_SIZE)
{
fp += SEND_SIZE;
omsg->bs = omsg->bs - SEND_SIZE;
}
}
write(sockfd2, fp, omsg->bs);
close(sockfd2);
free(arg);
return NULL;
}
#if 1
void send_file(int sockfd, Msg *pmsg)
{
int last_bs = 0;
//得到文件大小stat
scanf("%s",pmsg->filename);
int fd = open(pmsg->filename, O_RDWR);
if(fd < 0)
{
printf("文件打开错误!\n");
exit(1);
}
struct stat mystat;
fstat(fd, &mystat);
//输入pmsg
pmsg->action = 3;
pmsg->filesize = mystat.st_size;
/*最后一个分块可能不足一个标准分块*/
int count = pmsg->filesize/BLOCKSIZE;
if(pmsg->filesize%BLOCKSIZE == 0){
pmsg->count = count;
}
else{
pmsg->count = count+1;
last_bs = pmsg->filesize - BLOCKSIZE*count;
}
pmsg->bs = BLOCKSIZE;
//发送文件信息
write(sockfd, pmsg, sizeof(Msg));
//接受数据id;
while(1)
{
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond_sfile, &mutex);
pmsg->sfile_id = sfile_id;
/*map,关闭fd*/
mbegin = (char *)mmap(NULL, pmsg->filesize,
PROT_WRITE|PROT_READ, MAP_SHARED, fd , 0);
close(fd);
//创建线程并发送
int i_count = 0;
pmsg->offset = 0;
pthread_t pid[pmsg->count];
memset(pid, 0, sizeof(pthread_t)*pmsg->count);
//文件刚好分块
if(last_bs == 0)
{
for(i_count=0; i_count<pmsg->count; i_count++)
{
Msg * p_msg = new_fb_head(pmsg);
if(pthread_create(&pid[i_count], NULL, send_filedata, (void*)p_msg) != 0)
{
printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__, errno, strerror(errno));
exit(1);
}
}
}
//不能被标准分块
else
{
for(i_count=0; i_count<pmsg->count-1; i_count++)
{
Msg * p_msg = new_fb_head(pmsg);
if(pthread_create(&pid[i_count], NULL, send_filedata, (void*)p_msg) != 0)
{
printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__, errno, strerror(errno));
exit(1);
}
}
//最后一块
Msg*p_msg = (Msg*)malloc(sizeof(Msg));
memset(p_msg, 0, sizeof(Msg));
strcpy(p_msg->filename, pmsg->filename);
strcpy(p_msg->name, pmsg->name);
strcpy(p_msg->toname, pmsg->toname);
p_msg->sfile_id = pmsg->sfile_id;
p_msg->offset = pmsg->offset;
p_msg->bs = last_bs;
if(pthread_create(&pid[i_count], NULL, send_filedata, (void*)p_msg) != 0)
{
printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__, errno, strerror(errno));
exit(1);
}
}
//回收线程
for(i_count=0; i_count<pmsg->count; i_count++)
{
pthread_join(pid[i_count],NULL);
}
break;
}
pthread_mutex_unlock(&mutex);
return;
}
#endif
void log_manager(int sockfd, Msg *pmsg)
{
memset(pmsg, 0, sizeof(Msg));
pmsg->action = 7;
printf("input name\n");
scanf("%s",pmsg->name);
printf("input password\n");
scanf("%s",pmsg->password);
if(write(sockfd, pmsg, sizeof(Msg)) < 0)
{
perror("write error!");
exit(1);
}
return;
}
makefile
obj := work.o client.o
client:$(obj)
gcc $(obj) -o client -lpthread
%.o:%.c
gcc $< -c -o $@
.PHONY:clean
clean:
rm -rf *.o client
server服务器
server.c
#include "work.h"
#include "pthreadpool.h"
int main(int argc, char *argv[])
{
createlist();
tpool_create(20);
signal(SIGPIPE,SIG_IGN);
int sockfd = Create_sock();
int epfd;
struct epoll_event event;
struct epoll_event events[EVENTS_SIEZ];
epfd = epoll_create(EVENTS_SIEZ);
event.events = EPOLLIN;
event.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
socklen_t c_size;
struct sockaddr_in c_addr;
c_size = sizeof(struct sockaddr_in);
while(1)
{
printf("start\n");
int count = epoll_wait(epfd, events, EVENTS_SIEZ, -1);
for(int i=0; i<count; i++)
{
if(events[i].events == EPOLLIN)
{
if(events[i].data.fd == sockfd)
{
long cfd = accept(sockfd,(struct sockaddr*)&c_addr,&c_size);
if(cfd < 0)
{
perror("accept error!");
exit(1);
}
printf("ip = %s port = %d\n",inet_ntoa(c_addr.sin_addr),ntohs(c_addr.sin_port));
pthread_t id;
// 向线程池添加任务
tpool_add_work(thread_read,(void*)cfd);
}
}
}
}
}
msg.h
#ifndef _MSG_H_
#define _MSG_H_
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <signal.h>
#include <pthread.h>
#include <sqlite3.h>
#include <unistd.h>
struct online
{
int cfd;
char name[20];
struct online *next;
// struct online *pre;
};
typedef struct messgae
{
int flag;
int action;
char name[20];
char password[20];
char toname[20];
char filename[20];
char message[1024];
struct online* head;
}Msg;
#endif // !_MSG_H_
work.c
#include "work.h"
#include "pthreadpool.h"
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
struct online *head = NULL;
char *errmsg;
sqlite3 *db;
int addr[100] = {0};
int my_strcmp(char *dest,char *src)
{
printf("dest:%s\n",dest);
printf("src:%s\n",src);
int i = 0;
while(dest[i] == src[i])
{
i++;
if(dest[i] == '\0')
{
return 1;
}
}
return 0;
}
void createlist()
{
char sql[1024] = {0};
int ret = sqlite3_open("test.db",&db);
memset(sql,0,sizeof(sql));
strcpy(sql,"create table if not exists person(id integer primary key,name text,password text);");
ret = sqlite3_exec(db,sql,NULL,NULL,&errmsg);
memset(sql,0,sizeof(sql));
strcpy(sql,"create table if not exists manager(id integer primary key,name text,password text);");
ret = sqlite3_exec(db,sql,NULL,NULL,&errmsg);
memset(sql,0,sizeof(sql));
strcpy(sql,"create table if not exists storagemessage(fromname text, toname text,message text, timestamp not NULL default(datetime('now','localtime')));");
ret = sqlite3_exec(db,sql,NULL,NULL,&errmsg);
return;
}
void insert_link(struct online *new_user)
{
new_user->next = head;
head = new_user;
}
int find_fd(char *toname)
{
struct online*temp = head;
while(temp != NULL)
{
if(strcmp(temp->name,toname) == 0)
{
return temp->cfd;
}
temp = temp->next;
}
return -1;
}
int my_sqlite_callback(void *para,int contun,char **columvalue,char **columnName)
{
strcpy((char *)para,columvalue[0]);
return 0;
}
int querypassword(Msg *msg)
{
char sql[1024] = {0};
char password[20];
sprintf(sql,"select password from person where name = '%s';",msg->name);
sqlite3_exec(db,sql,my_sqlite_callback,(void*)password,&errmsg);
if(my_strcmp(password,msg->password) == 1)
{
return 1;
}
return 0;
}
int queryname(Msg *msg)
{
char sql[1024] = {0};
char password[20];
sqlite3_open("test.db",&db);
sprintf(sql,"select name from person where name = '%s';",msg->name);
sqlite3_exec(db,sql,my_sqlite_callback,(void*)password,&errmsg);
if(my_strcmp(password,msg->name) == 1)
{
return 1;
}
return 0;
}
void get_time(char *now_time)
{
time_t now;
struct tm *tm_now;
time(&now);
tm_now = localtime(&now);
memset(now_time, 0, sizeof(now_time));
// sprintf(now_time, "%d-%d-%d %d:%d:%d", tm_now->tm_year + 1900, tm_now->tm_mon + 1,
// tm_now->tm_mday, tm_now->tm_hour, tm_now->tm_min, tm_now->tm_sec);
sprintf(now_time, "%d-%d-%d %d:%d:%d", tm_now->tm_year + 1900, tm_now->tm_mon + 1,
tm_now->tm_mday, tm_now->tm_hour, tm_now->tm_min, tm_now->tm_sec);
}
void insert_data_message(Msg *pmsg)
{
char sql[2048] = {0};
char now_time[32] = {0};
get_time(now_time);
memset(sql, 0, sizeof(sql));
sprintf(sql, "insert into storagemessage(fromname,toname,message) values('%s', '%s', '%s');",
pmsg->name, pmsg->toname, pmsg->message);
sqlite3_exec(db, sql, NULL, NULL, &errmsg);
return;
}
int my_getmessage_callback(void *para, int contun, char **columvalue, char **columnName)
{
// printf("进入回调\n");
struct callback* call = (struct callback*)para;
// printf("call name = %s,toname = %s",call->name,call->toname);
if(strcmp(call->name, columvalue[0]) == 0)
{
fprintf(call->fp,"%s:我,时间:%s\n",columvalue[2],columvalue[3]);
// printf("%s:我,时间:%s\n",columvalue[2],columvalue[3]);
}
else if(strcmp(call->toname, columvalue[0]) == 0)
{
fprintf(call->fp,"时间:%s,%s:%s\n",columvalue[3],columvalue[0],columvalue[2]);
// printf("时间:%s,%s:%s\n",columvalue[3],columvalue[0],columvalue[2]);
}
return 0;
}
FILE *select_message(Msg *pmsg)
{
char sql[2048] = {0};
FILE *fp = fopen("message.text","w+");
struct callback call;
call.fp = fp;
strcpy(call.name, pmsg->name);
strcpy(call.toname, pmsg->toname);
printf("开始回调\n");
sprintf(sql, "select fromname,toname,message,timestamp from storagemessage where (toname = '%s' and fromname = '%s') or (fromname = '%s' and toname = '%s') order by timestamp desc limit 8;",pmsg->toname,pmsg->name,pmsg->toname,pmsg->name);
sqlite3_exec(db, sql, my_getmessage_callback, (void*)&call, &errmsg);
return fp;
}
int Create_sock()
{
int ret;
int sockfd;
struct sockaddr_in s_addr;
sockfd = socket(AF_INET,SOCK_STREAM,0);
if(sockfd < 0)
{
perror("socket error!");
exit(1);
}
int opt = 1;
setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));//设置套接字可以重复使用端口号
s_addr.sin_family = AF_INET;
s_addr.sin_port = htons(5556);
s_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
ret = bind(sockfd,(struct sockaddr *)&s_addr,sizeof(struct sockaddr_in));
if(ret < 0)
{
perror("bind error!");
exit(1);
}
ret = listen(sockfd,2000);
if(ret < 0)
{
perror("bind error!");
exit(1);
}
return sockfd;
}
void createfile(char *filename, char *filesize)
{
int fd = open(filename, O_RDWR | O_CREAT);
fchmod(fd, S_IRGRP | S_IRUSR | S_IWUSR | S_IROTH);
lseek(fd, (*filesize)-1, SEEK_SET);
write(fd, " ", 1);
close(fd);
}
void send_file(Msg*pmsg,int cofd,int tofd)
{
char *fp = (char *)malloc(RECVBUF_SIZE);
//从cofd读到后,写入tofd
//接受数据,往map内存写
int remain_size = pmsg->bs; //数据块中待接收数据大小
int err = pmsg->bs % RECVBUF_SIZE;
int size = 0; //一次recv接受数据大小
#if 1
while (remain_size > 0)
{
if ((size = recv(cofd, fp, RECVBUF_SIZE, MSG_WAITALL)) > 0)
{
write(tofd, fp, size);
remain_size -= size;
}
if (remain_size <= err)
{
size = recv(cofd, fp, remain_size, 0);
write(tofd, fp, size);
remain_size -= size;
}
}
free(fp);
printf("转发中...\n");
close(tofd);
close(cofd);
#endif
}
void to_all(Msg *pmsg)
{
struct online *pNode = head;
while(pNode != NULL)
{
write(pNode->cfd, pmsg, sizeof(Msg));
pNode = pNode->next;
}
return;
}
struct online *get_online(char *name)
{
struct online *pNode = head;
while(pNode != NULL)
{
if(strcmp(pNode->name, name) == 0)
{
return pNode;
}
pNode = pNode->next;
}
return NULL;
}
void *thread_read(void *arg)
{
pthread_t id = pthread_self();
pthread_detach(id);
long cfd = (long)arg;
Msg *pmsg = (Msg *)malloc(sizeof(Msg));
while(1)
{
memset(pmsg,0,sizeof(Msg));
int r_n = recv(cfd,pmsg,sizeof(Msg), 0);
if(r_n == 0)
{
printf("server exit!\n");
pthread_exit(NULL);
}
switch(pmsg->action)
{
case 0:
{
//判断用户名是否注册
if(1 == queryname(pmsg))
{
pmsg->action = 255;
write(cfd, pmsg, sizeof(Msg));
break;
}
char sql[100];
memset(sql, 0, sizeof(sql));
int ret = sqlite3_open("test.db", &db);
if(ret < 0)
{
perror("sqlite3_open error!");
}
sprintf(sql,"insert into person(name,password)values('%s',%s);",pmsg->name,pmsg->password);
ret = sqlite3_exec(db, sql, NULL, NULL, &errmsg);
break;
}
case 1:
{
if(0 == querypassword(pmsg))
{
pmsg->action = -1;
write(cfd,pmsg,sizeof(Msg));
break;
}
struct online *new_user = (struct online *)malloc(sizeof(struct online));
strcpy(new_user->name, pmsg->name);
new_user->cfd = cfd;
new_user->say = 0;
insert_link(new_user);
write(cfd,pmsg,sizeof(Msg));
break;
}
case 2:
{
insert_data_message(pmsg);
int to_fd = find_fd(pmsg->toname);
if(to_fd == -1)
{
pmsg->action = -2;
write(cfd, pmsg, sizeof(Msg));
}
else
{
write(to_fd, pmsg, sizeof(Msg));
}
break;
}
case 3:
{
int to_fd = find_fd(pmsg->toname);
if(to_fd == -1)
{
pmsg->action = -2;
write(cfd, pmsg, sizeof(Msg));
}
else
{
int i = 0;
while(addr[i] == 1)
{
i++;
}
addr[i] = 1;
pmsg->sfile_id = i;
write(cfd, pmsg, sizeof(Msg));
pmsg->action = -3;
write(to_fd, pmsg, sizeof(Msg));
}
break;
}
case 4:
{
insert_data_message(pmsg);
struct online *pNode = head;
while(pNode != NULL)
{
if(strcmp(pNode->name, pmsg->name) == 0)
{
if(pNode->say != 1)
{
to_all(pmsg);
break;
}
else if(pNode->say == 1)
{
pmsg->action = 7;
write(cfd, pmsg, sizeof(Msg));
break;
}
}
}
break;
}
case 5:
{
//要传输对象的套接字
int sofd = 0;
//找到要传输的对象
int to_fd = find_fd(pmsg->toname);
if(to_fd == -1)
{
pmsg->action = -2;
write(cfd, pmsg, sizeof(Msg));
}
else
{
//通知接收客户端做好准备;
write(to_fd, pmsg, sizeof(Msg));
//循环找cfd,用循环队列或者循环链表储存cfd。
for(int i=0; i<10; i = (++i)%10)
{
pthread_mutex_lock(&mutex);
if(find_cfd[i].use == 1)
{
if(find_cfd[i].offset == pmsg->offset)
{
sofd = find_cfd[i].cfd;
find_cfd[i].use = 0;
pthread_mutex_unlock(&mutex);
break;
}
else
{
}
}
pthread_mutex_unlock(&mutex);
}
send_file(pmsg,cfd,sofd);
pthread_cancel(id);
}
break;
}
case -5:
{
for(int i=0; i<10; i = (++i)%10)
{
pthread_mutex_lock(&mutex);
if(find_cfd[i].use == 0)
{
strcpy(find_cfd[i].filename, pmsg->filename);
find_cfd[i].cfd = cfd;
find_cfd[i].use = 1;
find_cfd[i].offset = pmsg->offset;
pthread_mutex_unlock(&mutex);
break;
}
pthread_mutex_unlock(&mutex);
}
break;
}
case 6:
{
char buf[1024];
int ret = -1;
memset(buf, 0, sizeof(buf));
FILE *fp = fopen("user.txt", "w+");
struct online*pNode = head;
while(1)
{
if(pNode->next == NULL)
{
fprintf(fp, "用户名:%s", pNode->name);
break;
}
fprintf(fp, "用户名:%s\n", pNode->name);
memset(pmsg->message, 0, sizeof(pmsg->message));
pNode = pNode->next;
}
fseek(fp, 0, SEEK_SET);
ret = fread(buf, sizeof(buf), 1, fp);
{
memcpy(pmsg->message, buf, strlen(buf));
pNode = head;
while(pNode != NULL)
{
write(pNode->cfd, pmsg, sizeof(Msg));
pNode = pNode->next;
}
memset(pmsg->message, 0, sizeof(pmsg->message));
memset(buf, 0, sizeof(buf));
}
printf("ret = %d\n",ret);
fclose(fp);
break;
}
case 7:
{
if(0 == query_manager_password(pmsg))
{
pmsg->action = -1;
}
struct online *new_user = (struct online *)malloc(sizeof(struct online));
strcpy(new_user->name, pmsg->name);
new_user->cfd = cfd;
new_user->say = 0;
insert_link(new_user);
pmsg->action = 1;
write(cfd,pmsg,sizeof(Msg));
break;
}
case 8:
{
struct online *pNode = get_online(pmsg->toname);
if(pNode == NULL)
{
pmsg->action = -2;
write(cfd, pmsg, sizeof(Msg));
break;
}
pNode->say = 1;
write(cfd, pmsg, sizeof(Msg));
break;
}
case -8:
{
struct online *pNode = get_online(pmsg->name);
if(pNode == NULL)
{
pmsg->action = -2;
write(cfd, pmsg, sizeof(Msg));
break;
}
pNode->say = 0;
write(cfd, pmsg, sizeof(Msg));
break;
}
case 9:
{
char buf[1024];
int ret = -1;
memset(buf, 0, sizeof(buf));
FILE *fp = select_message(pmsg);
fseek(fp, 0, SEEK_SET);
while(feof(fp) == 0)
{
fread(buf, sizeof(buf), 1, fp);
memcpy(pmsg->message, buf, strlen(buf));
write(cfd, pmsg, sizeof(Msg));
memset(pmsg->message, 0, sizeof(pmsg->message));
memset(buf, 0, sizeof(buf));
}
fclose(fp);
break;
}
case 10:
{
struct online *pNode = head;
struct online *temp = NULL;
while(pNode != NULL)
{
if(strcmp(pNode->name, pmsg->name) == 0)
{
if(temp != NULL)
{
temp->next = pNode->next;
}
else
{
head = pNode->next;
}
if(pNode != NULL)
{
free(pNode);
pNode = NULL;
}
break;
}
temp = pNode;
pNode = pNode->next;
}
break;
}
}
}
}
int query_manager_password(Msg *msg)
{
char sql[1024] = {0};
char password[20];
sprintf(sql,"select password from manager where name = '%s';",msg->name);
sqlite3_exec(db,sql,my_sqlite_callback,(void*)password,&errmsg);
if(my_strcmp(password,msg->password) == 1)
{
return 1;
}
return 0;
}
//先写在函数内,调试好以后用execl调用
void server_file()//端口号先写死5858.
{
int ret;
int sockfd;
struct sockaddr_in s_addr;
sockfd = socket(AF_INET,SOCK_STREAM,0);
if(sockfd < 0)
{
perror("socket error!");
exit(1);
}
int opt = 1;
setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));//设置套接字可以重复使用端口号
s_addr.sin_family = AF_INET;
s_addr.sin_port = htons(5858);
s_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
ret = bind(sockfd,(struct sockaddr *)&s_addr,sizeof(struct sockaddr_in));
if(ret < 0)
{
perror("bind error!");
exit(1);
}
ret = listen(sockfd,20);
if(ret < 0)
{
perror("bind error!");
exit(1);
}
socklen_t c_size;
struct sockaddr_in c_addr;
c_size = sizeof(struct sockaddr_in);
while(1)
{
int cfd = accept(sockfd,(struct sockaddr*)&c_addr,&c_size);
if(cfd < 0)
{
perror("accept error!");
exit(1);
}
printf("ip = %s port = %d\n",inet_ntoa(c_addr.sin_addr),ntohs(c_addr.sin_port));
}
}
work.h
#ifndef _MSG_H_
#define _MSG_H_
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <signal.h>
#include <pthread.h>
#include <sqlite3.h>
#include <unistd.h>
#include <sys/epoll.h>
#define EVENTS_SIEZ 1024
// #define RECVBUF_SIZE 65536 //64K
#define RECVBUF_SIZE 100 //64K
#define INT_SIZE 4
struct callback
{
FILE *fp;
char name[20];
char toname[20];
};
struct online
{
int cfd;
int say;
char name[20];
struct online *next;
};
typedef struct oooo
{
int cfd;
int use;
int offset;
char filename[20];
}stro_sockfd;
typedef struct messgae
{
int action;
int sfile_id;
int filesize;
int count;
int bs;
int offset;
long cfd;
char name[20];
char password[20];
char toname[20];
char filename[20];
char message[1024];
char now_time[32];
}Msg;
stro_sockfd find_cfd[20];
int Create_sock();
void createlist();
void *thread_read(void *arg);
void server_file();//端口号先写死5858.
int query_manager_password(Msg *msg);
#endif // !_MSG_H_
更多推荐
已为社区贡献1条内容
所有评论(0)