分布式网络通信rpc框架

项目是分布式网络通信rpc框架(项目源代码链接
博文中提到单机服务器的缺点:

  1. 硬件资源的限制影响并发:受限于硬件资源,聊天服务器承受的用户的并发有限
  2. 模块的编译部署难:任何模块小的修改,都导致整个项目代码重新编译、部署
  3. 模块对硬件资源的需求不同:各模块是CPU或IO密集型,各模块对资源需求不同

尽管集群服务器可以扩展硬件资源,提高用户的并发,但缺点2和3仍存在,此时就引出分布式服务器,分布式系统中系统由“微服务”组成,常用RPC(remote procedure call)解决分布式系统中微服务之间的调用问题(当然基于HTTP的restful形式的广义远程调用也可,这暂时不提),简言之就是开发者能像调用本地方法一样调用远程服务

RPC概述

Cip5yF_vL2GAftSLAAOCKnZEdrY576

RPC调用过程

完整的RPC过程如下图:

image-20220814221421663

远程调用需传递服务对象、函数方法、函数参数,经序列化成字节流后传给提供服务的服务器,服务器接收到数据后反序列化成服务对象、函数方法、函数参数,并发起本地调用,将响应结果序列化成字节流,发送给调用方,调用方接收到后反序列化得到结果,并传给本地调用。

序列化和反序列化

  • 序列化:对象转为字节序列称为对象的序列化
  • 反序列化:字节序列转为对象称为对象的反序列化

image-20220814235216125

常见序列化和反序列化协议有XML、JSON、PB,相比于其他PB更有优势:跨平台语言支持,序列化和反序列化效率高速度快,且序列化后体积比XML和JSON都小很多,适合网络传输。

XMLJSONPB
保存方式文本文本二进制
可读性较好较好不可读
解析效率一般
语言支持所有语言所有语言C++/Java/Python及第三方支持
适用范围文件存储、数据交互文件存储、数据交互文件存储、数据交互

注意:序列化和反序列化可能对系统的消耗较大,因此原则是:远程调用函数传入参数和返回值对象要尽量简单,具体来说应避免:

  • 远程调用函数传入参数和返回值对象体积较大,如传入参数是List或Map,序列化后字节长度较长,对网络负担较大
  • 远程调用函数传入参数和返回值对象有复杂关系,传入参数和返回值对象有复杂的嵌套、包含、聚合关系等,性能开销大
  • 远程调用函数传入参数和返回值对象继承关系复杂,性能开销大

数据传输格式

考虑到可靠性和长连接,因此使用TCP协议,而TCP是字节流协议,因此需自己处理拆包粘包问题,即自定义数据传输格式!如图:

image-20220815120520672

定义protobuf类型的结构体消息RpcHeader,包含服务对象、函数方法、函数参数,因为参数可变,参数长不定,因此不能和RpcHeader一起定义,否则多少函数就有多少RpcHeader,因此需为每个函数定义不同protobuf结构体消息,然后对该结构体消息序列化(字符串形式存储),就得到两个序列化后的二进制字符串,拼接起来就是要发送的消息,同时消息前需记录序列化后的RpcHeader数据的长度,这样才能分开RpcHeader和函数参数的二进制数据,从而反序列化得到函数参数

//消息头
message RpcHeader
{
    bytes service_name = 1;
    bytes method_name = 2;
    uint32 args_size = 3;
}

框架

RPC通信过程中的代码调用流程图如图:

image-20220815133628010

业务层实现

数据结构定义

以user.proto中的Login函数请求参数为例说明参数结构体定义,其他函数类似:

//Login函数的参数
message LoginRequest
{
    bytes name = 1;
    bytes pwd = 2;
}

使用protoc编译proto文件:

protoc user.proto -I ./ -cpp_out=./user

得到user.pb.cc和user.pb.h,每个message结构体都生成个类,如上面说的LoginRequest生成继承自google::protobuf::Message的LoginRequest类,主要包含定义的私有成员变量及读取设置变量的成员函数,如name对应的name()set_name()两个读取和设置函数;每个service结构体都生成两个关键类,以user.proto中定义的UserServiceRpc为例,生成继承自google::protobuf::Service的UserServiceRpc和继承自UserServiceRpc的UserServiceRpc_Stub类,前者给服务方callee使用,后者给调用方caller使用,并且都生成LoginRegister虚函数,UserServiceRpc类中还包括重要的CallMethod方法

service UserServiceRpc
{
    rpc Login(LoginRequest) returns(LoginResponse);
    rpc Register(RegisterRequest) returns(RegisterResponse);
}

Login方法为例,Caller调用远程方法Login,Callee中的Login接收LoginRequest消息体,执行完Login后将结果写入LoginResponse消息体,再返回给Caller

caller

调用方将继承自google::protobuf::RpcChannel的MprpcChannel的对象,传入UserServiceRpc_Stub构造函数生成对象stub,设置远程调用Login的请求参数request,并由stub调用成员函数Login一直阻塞等待远程调用的响应,而Login函数实际被传入的channel对象调用CallMethod,在CallMethod中设置controller对象和response对象,前者在函数出错时设置rpc调用过程的状态,后者是远程调用的响应

int main(int argc, char **argv)
{
    // 整个程序启动以后,想使用mprpc框架来享受rpc服务调用,一定需要先调用框架的初始化函数(只初始化一次)
    MprpcApplication::Init(argc, argv);

    // 演示调用远程发布的rpc方法Login
    UserServiceRpc_Stub stub(new MprpcChannel());
    // rpc方法的请求参数
    LoginRequest request;
    request.set_name("zhang san");
    request.set_pwd("123456");
    // rpc方法的响应
    LoginResponse response;
    // 发起rpc方法的调用  同步的rpc调用过程  MprpcChannel::callmethod
    MprpcController controller;
    stub.Login(&controller, &request, &response, nullptr); // RpcChannel->RpcChannel::callMethod 集中来做所有rpc方法调用的参数序列化和网络发送
    // 一次rpc调用完成,读调用的结果
    if (controller.Failed())
    {
        std::cout << controller.ErrorText() << std::endl;
    }
    else
    {
        // 一次rpc调用完成,读调用的结果
        if (0 == response.result().errcode())
        {
            std::cout << "rpc login response success:" << response.sucess() << std::endl;
        }
        else
        {
            std::cout << "rpc login response error : " << response.result().errmsg() << std::endl;
        }
    }
}

所有通过stub代理对象调用的rpc方法,通过C++多态最终都会通过调用CallMethod实现,该函数首先序列化并拼接发送的 send_rpc_str字符串,其次从zk服务器中拿到注册的rpc服务端的 ip 和 port,连接到rpc服务器并发送请求,接受服务端返回的字节流,并反序列化响应response

// 所有通过stub代理对象调用的rpc方法,都走到这里了,统一做rpc方法调用的数据数据序列化和网络发送 
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
                                google::protobuf::RpcController* controller, 
                                const google::protobuf::Message* request,
                                google::protobuf::Message* response,
                                google::protobuf:: Closure* done)
{
    const google::protobuf::ServiceDescriptor* sd = method->service();
    std::string service_name = sd->name(); // service_name
    std::string method_name = method->name(); // method_name

    // 获取参数的序列化字符串长度 args_size
    uint32_t args_size = 0;
    std::string args_str;
    if (request->SerializeToString(&args_str))
    {
        args_size = args_str.size();
    }
    else
    {
        controller->SetFailed("serialize request error!");
        return;
    }
    
    // 定义rpc的请求header
    mprpc::RpcHeader rpcHeader;
    rpcHeader.set_service_name(service_name);
    rpcHeader.set_method_name(method_name);
    rpcHeader.set_args_size(args_size);

    uint32_t header_size = 0;
    std::string rpc_header_str;
    if (rpcHeader.SerializeToString(&rpc_header_str))
    {
        header_size = rpc_header_str.size();
    }
    else
    {
        controller->SetFailed("serialize rpc header error!");
        return;
    }

    // 组织待发送的rpc请求的字符串
    std::string send_rpc_str;
    send_rpc_str.insert(0, std::string((char*)&header_size, 4)); // header_size
    send_rpc_str += rpc_header_str; // rpcheader
    send_rpc_str += args_str; // args

    // 使用tcp编程,完成rpc方法的远程调用
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == clientfd)
    {
        char errtxt[512] = {0};
        sprintf(errtxt, "create socket error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    // rpc调用方想调用service_name的method_name服务,需要查询zk上该服务所在的host信息
    ZkClient zkCli;
    zkCli.Start();
    //  /UserServiceRpc/Login
    std::string method_path = "/" + service_name + "/" + method_name;
    // 127.0.0.1:8000
    std::string host_data = zkCli.GetData(method_path.c_str());
    if (host_data == "")
    {
        controller->SetFailed(method_path + " is not exist!");
        return;
    }
    int idx = host_data.find(":");
    if (idx == -1)
    {
        controller->SetFailed(method_path + " address is invalid!");
        return;
    }
    std::string ip = host_data.substr(0, idx);
    uint16_t port = atoi(host_data.substr(idx+1, host_data.size()-idx).c_str()); 

    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = inet_addr(ip.c_str());

    // 连接rpc服务节点
    if (-1 == connect(clientfd, (struct sockaddr*)&server_addr, sizeof(server_addr)))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "connect error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    // 发送rpc请求
    if (-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "send error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    // 接收rpc请求的响应值
    char recv_buf[1024] = {0};
    int recv_size = 0;
    if (-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "recv error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    // 反序列化rpc调用的响应数据
    if (!response->ParseFromArray(recv_buf, recv_size))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "parse error! response_str:%s", recv_buf);
        controller->SetFailed(errtxt);
        return;
    }

    close(clientfd);
}
callee

服务方将继承自UserServiceRpc的UserService类的对象,传入RpcProvider类的构造函数生成对象provider,provider是rpc服务对象,调用NotifyService将UserService对象发布到rpc节点上,调用Run启动rpc服务节点,提供rpc远程调用服务

int main(int argc, char **argv)
{
    // 调用框架的初始化操作
    MprpcApplication::Init(argc, argv);

    // provider是一个rpc网络服务对象。把UserService对象发布到rpc节点上
    RpcProvider provider;
    provider.NotifyService(new UserService());
    // 启动一个rpc服务发布节点   Run以后,进程进入阻塞状态,等待远程的rpc调用请求
    provider.Run();

    return 0;
}

NotifyService将传入进来的服务对象service发布到rpc节点上。其实就是将服务对象及其方法的抽象描述,存储在map中

// 这里是框架提供给外部使用的,可以发布rpc方法的函数接口
void RpcProvider::NotifyService(google::protobuf::Service *service)
{
    ServiceInfo service_info;

    // 获取了服务对象的描述信息
    const google::protobuf::ServiceDescriptor *pserviceDesc = service->GetDescriptor();
    // 获取服务的名字
    std::string service_name = pserviceDesc->name();
    // 获取服务对象service的方法的数量
    int methodCnt = pserviceDesc->method_count();

    LOG_INFO("service_name:%s", service_name.c_str());

    for (int i=0; i < methodCnt; ++i)
    {
        // 获取了服务对象指定下标的服务方法的描述(抽象描述) UserService   Login
        const google::protobuf::MethodDescriptor* pmethodDesc = pserviceDesc->method(i);
        std::string method_name = pmethodDesc->name();
        service_info.m_methodMap.insert({method_name, pmethodDesc});

        LOG_INFO("method_name:%s", method_name.c_str());
    }
    service_info.m_service = service;
    m_serviceMap.insert({service_name, service_info});
}

Run创建TcpServer对象并绑定连接回调和消息可读回调及线程数,将rpc节点上要发布的服务全注册到zk服务器上,并启动网络服务和事件循环,等待客户端的连接和写入,从而触发对应的回调

// 启动rpc服务节点,开始提供rpc远程网络调用服务
void RpcProvider::Run()
{
    // 读取配置文件rpcserver的信息
    std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
    muduo::net::InetAddress address(ip, port);

    // 创建TcpServer对象
    muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider");

    // 绑定连接回调和消息读写回调方法  分离了网络代码和业务代码
    server.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));
    server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, 
            std::placeholders::_2, std::placeholders::_3));

    // 设置muduo库的线程数量
    server.setThreadNum(4);

    // 把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以从zk上发现服务
    ZkClient zkCli;
    zkCli.Start();
    // service_name为永久性节点    method_name为临时性节点
    for (auto &sp : m_serviceMap) 
    {
        // /service_name   /UserServiceRpc
        std::string service_path = "/" + sp.first;
        zkCli.Create(service_path.c_str(), nullptr, 0);
        for (auto &mp : sp.second.m_methodMap)
        {
            // /service_name/method_name   /UserServiceRpc/Login 存储当前这个rpc服务节点主机的ip和port
            std::string method_path = service_path + "/" + mp.first;
            char method_path_data[128] = {0};
            sprintf(method_path_data, "%s:%d", ip.c_str(), port);
            // ZOO_EPHEMERAL表示znode是一个临时性节点
            zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
        }
    }

    // rpc服务端准备启动,打印信息
    std::cout << "RpcProvider start service at ip:" << ip << " port:" << port << std::endl;
 
    // 启动网络服务
    server.start();
    m_eventLoop.loop(); 
}

连接回调教简单,不在赘述,主要解释可读事件的回调OnMessage,接收远程rpc调用请求的字节流并反序列化 ,解析出service_name 和 method_name 和 args_str参数,并查找存储服务对象的map,找到服务对象service和方法对象描述符method,生成rpc方法调用的请求request和响应response,并设置发送响应的回调SendRpcResponse,并调用CallMethod

void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, 
                            muduo::net::Buffer *buffer, 
                            muduo::Timestamp)
{
    // 网络上接收的远程rpc调用请求的字符流    Login args
    std::string recv_buf = buffer->retrieveAllAsString();

    // 从字符流中读取前4个字节的内容
    uint32_t header_size = 0;
    recv_buf.copy((char*)&header_size, 4, 0);

    // 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
    std::string rpc_header_str = recv_buf.substr(4, header_size);
    mprpc::RpcHeader rpcHeader;
    std::string service_name;
    std::string method_name;
    uint32_t args_size;
    if (rpcHeader.ParseFromString(rpc_header_str))
    {
        // 数据头反序列化成功
        service_name = rpcHeader.service_name();
        method_name = rpcHeader.method_name();
        args_size = rpcHeader.args_size();
    }
    else
    {
        // 数据头反序列化失败
        std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl;
        return;
    }

    // 获取rpc方法参数的字符流数据
    std::string args_str = recv_buf.substr(4 + header_size, args_size);

    // 获取service对象和method对象
    auto it = m_serviceMap.find(service_name);
    if (it == m_serviceMap.end())
    {
        std::cout << service_name << " is not exist!" << std::endl;
        return;
    }

    auto mit = it->second.m_methodMap.find(method_name);
    if (mit == it->second.m_methodMap.end())
    {
        std::cout << service_name << ":" << method_name << " is not exist!" << std::endl;
        return;
    }

    google::protobuf::Service *service = it->second.m_service; // 获取service对象  new UserService
    const google::protobuf::MethodDescriptor *method = mit->second; // 获取method对象  Login

    // 生成rpc方法调用的请求request和响应response参数
    google::protobuf::Message *request = service->GetRequestPrototype(method).New();
    if (!request->ParseFromString(args_str))
    {
        std::cout << "request parse error, content:" << args_str << std::endl;
        return;
    }
    google::protobuf::Message *response = service->GetResponsePrototype(method).New();

    // 给下面的method方法的调用,绑定一个Closure的回调函数
    google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider, 
                                                                    const muduo::net::TcpConnectionPtr&, 
                                                                    google::protobuf::Message*>
                                                                    (this, 
                                                                    &RpcProvider::SendRpcResponse, 
                                                                    conn, response);

    // 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
    // new UserService().Login(controller, request, response, done)
    service->CallMethod(method, nullptr, request, response, done);
}

通过CallMethod调用UserService中的Login函数,获取参数并调用本地的Login函数,并写入响应,调用前面设置的response回调函数SendRpcResponse

void Login(::google::protobuf::RpcController* controller,
                       const ::fixbug::LoginRequest* request,
                       ::fixbug::LoginResponse* response,
                       ::google::protobuf::Closure* done)
    {
        // 框架给业务上报了请求参数LoginRequest,应用获取相应数据做本地业务
        std::string name = request->name();
        std::string pwd = request->pwd();

        // 做本地业务
        bool login_result = Login(name, pwd); 

        // 把响应写入  包括错误码、错误消息、返回值
        fixbug::ResultCode *code = response->mutable_result();
        code->set_errcode(0);
        code->set_errmsg("");
        response->set_sucess(login_result);

        // 执行回调操作   执行响应对象数据的序列化和网络发送(都是由框架来完成的)
        done->Run();
    }

SendRpcResponse函数用于将响应序列化并发送出去

// Closure的回调操作,用于序列化rpc的响应和网络发送
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message *response)
{
    std::string response_str;
    if (response->SerializeToString(&response_str)) // response进行序列化
    {
        // 序列化成功后,通过网络把rpc方法执行的结果发送会rpc的调用方
        conn->send(response_str);
    }
    else
    {
        std::cout << "serialize response_str error!" << std::endl; 
    }
    conn->shutdown(); // 模拟http的短链接服务,由rpcprovider主动断开连接
}

zookeeper

实现的rpc,发起的rpc请求需知道请求的服务在哪台机器,所以需要分布式服务配置中心,所有提供rpc的节点,都需向配置中心注册服务,ip+port+服务,当然zookeeper不止分布式服务配置,还有其他协调功能,如分布式锁,这里不细述。

callee启动时,将UserService对象发布到rpc节点上,也就是将每个类的方法所对应的分布式节点地址和端口记录在zk服务器上,当调用远程rpc方法时,就去 zk服务器上面查询对应要调用的服务的ip和端口

注册服务:

// 把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以从zk上发现服务
ZkClient zkCli;
zkCli.Start();
// service_name为永久性节点    method_name为临时性节点
for (auto &sp : m_serviceMap) 
{
    // /service_name   /UserServiceRpc
    std::string service_path = "/" + sp.first;
    zkCli.Create(service_path.c_str(), nullptr, 0);
    for (auto &mp : sp.second.m_methodMap)
    {
        // /service_name/method_name   /UserServiceRpc/Login 存储当前这个rpc服务节点主机的ip和port
        std::string method_path = service_path + "/" + mp.first;
        char method_path_data[128] = {0};
        sprintf(method_path_data, "%s:%d", ip.c_str(), port);
        // ZOO_EPHEMERAL表示znode是一个临时性节点
        zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
    }
}

查询服务:

// rpc调用方想调用service_name的method_name服务,需要查询zk上该服务所在的host信息
ZkClient zkCli;
zkCli.Start();
//  /UserServiceRpc/Login
std::string method_path = "/" + service_name + "/" + method_name;
// 127.0.0.1:8000
std::string host_data = zkCli.GetData(method_path.c_str());

日志

借助线程安全的消息队列(日志队列为空则线程进入wait状态,否则唤醒线程),将要写的日志都压入消息队列中,再单独开一个线程负责将消息队列的日志写入到磁盘文件中

Logger::Logger()
{
    // 启动专门的写日志线程
    std::thread writeLogTask([&](){
        for (;;)
        {
            // 获取当前的日期,然后取日志信息,写入相应的日志文件当中 a+
            time_t now = time(nullptr);
            tm *nowtm = localtime(&now);

            char file_name[128];
            sprintf(file_name, "%d-%d-%d-log.txt", nowtm->tm_year+1900, nowtm->tm_mon+1, nowtm->tm_mday);

            FILE *pf = fopen(file_name, "a+");
            if (pf == nullptr)
            {
                std::cout << "logger file : " << file_name << " open error!" << std::endl;
                exit(EXIT_FAILURE);
            }

            std::string msg = m_lckQue.Pop();

            char time_buf[128] = {0};
            sprintf(time_buf, "%d:%d:%d =>[%s] ", 
                    nowtm->tm_hour, 
                    nowtm->tm_min, 
                    nowtm->tm_sec,
                    (m_loglevel == INFO ? "info" : "error"));
            msg.insert(0, time_buf);
            msg.append("\n");

            fputs(msg.c_str(), pf);
            fclose(pf);
        }
    });
    // 设置分离线程,守护线程
    writeLogTask.detach();
}

总结

整个系统时序图如下所示,整个项目源代码的链接

image-20220815184439376

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐