一、前言

本系列为个人Dubbo学习笔记,内容基于《深度剖析Apache Dubbo 核心技术内幕》, 过程参考官方源码分析文章,仅用于个人笔记记录。本文分析基于Dubbo2.7.0版本,由于个人理解的局限性,若文中不免出现错误,感谢指正。

系列文章地址:Dubbo源码分析:全集整理


服务发布流程

  1. Dubbo笔记 ③ :服务发布流程 - ServiceConfig#export
  2. Dubbo笔记 ④ :服务发布流程 - doExportUrlsFor1Protocol
  3. Dubbo笔记 ⑤ :服务发布流程 - Protocol#export
  4. Dubbo笔记 ⑥ :服务发布流程 - NettyServer

本文衍生篇:

  1. Dubbo笔记衍生篇④:ChannelHandler

在之前的文章中,我们讲述了提供者发布服务的流程。本文来看一下当消费者发起调用后,服务提供者时如何处理调用请求的。

二、流程概述

Dubbo笔记 ⑥ :服务发布流程 - NettyServer 中 我们分析了 Dubbo线程模型应用后 整个 Handler的结构是一层套一层。因此提供者启动后通过 NettyServer 开启 Netty 服务,之后交由 业务Handler 来处理Netty 事件,整个消息的传递过程如下时序图:

图源:《深入理解Apache Dubbo 技术内幕》
在这里插入图片描述

根据上面的时序图,我们可以将服务方处理请求分为两个过程 :

  1. 服务方处理消费者的连接消息: 消费者在调用服务之前需要先与服务端建立连接(实际上消费者在启动时就会和服务端建立连接),此时服务提供方会接收到 connected 事件,此阶段对应时序图的步骤1-11。
  2. 服务方处理消费者的调用消息:当连接建立成功后,消费者可以调用服务端的服务,此时服务端会受到 received 消息。此阶段对应时序图的步骤12-22。

从上面时序图我们也可以得知,消息是在多个 ChannelHandler 之间层层传递的,其结构大致如下:

在这里插入图片描述

Dubbo笔记衍生篇④:ChannelHandler,我们对上图中的 Handler 都有过介绍,所以这里消息在每个Handler的流转过程就不再具体分析。

下面我们直接从 HeaderExchangeHandler 处理消息 开始分析。如果消费者想要对服务进行消费,第一个阶段是创建与提供者的服务连接,第二个阶段是发送请求。而对于服务提供者来说也是相同。

三、 连接消息处理

而当消费端发起TCP链接并完成后,服务提供方的 NettyServer 的 connected 方法会被激活,根据Dubbo线程模型,默认的线程模型为All,所以I/O线程会将该请求交由业务线程池来处理。这里对应的是AllChannelHandler 类 把 I/O 线程收到的所有消息包装成 ChannelEventRunnable 任务并投递到了业务线程池里。

默认情况下,消费端与服务端的连接是非惰性的,即消费端在启动后就会连接服务端,这一部分消费者文章中会介绍。

1. HeaderExchangeHandler#connected

HeaderExchangeHandler#connected 没有对消息做什么特殊处理,仅仅记录了读写时间。下面为 HeaderExchangeHandler#connected 的具体实现:

	// org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#connected
    @Override
    public void connected(Channel channel) throws RemotingException {
    	// 保存通道中读写时间
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
        	// 直接交由下一层的 handler 处理,即DubboProtocol#requestHandler
            handler.connected(exchangeChannel);
        } finally {
        	// 如果通道关闭则移除
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

2. DubboProtocol#requestHandler.connected

当连接消息经历 HeaderExchangeHandler#connected 后,便会透传到 DubboProtocol#requestHandler.connected 方法中,其实现如下:

		// 消费端连接时触发消息
        @Override
        public void connected(Channel channel) throws RemotingException {
            invoke(channel, Constants.ON_CONNECT_KEY);
        }
        
		// 消费端断开连接时触发消息
        @Override
        public void disconnected(Channel channel) throws RemotingException {
            invoke(channel, Constants.ON_DISCONNECT_KEY);
        }

        private void invoke(Channel channel, String methodKey) {
        	// 这里判断如果有回调方法则进行回调响应
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                	// 调用 onconnect 指定的回调方法
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }

        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
        	// 如果没有指定了 onconnect 回调方法,直接返回
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }
            // 根据 method 创建 RpcInvocation  对象
            RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
            invocation.setAttachment(Constants.PATH_KEY, url.getPath());
            invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
            invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
            invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
            if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
            }
            return invocation;
        }

DubboProtocol#requestHandler.connected 对连接回调进行了判断,如果设置了 onconnect 回调,则进行回调,这个回调调用的是自身发布的服务。


可以看到, 消费者在连接过程并没有什么过多处理。下面我们来看一下请求的处理过程。

四、请求消息处理

当消费者与服务端连接后,消费者进行服务调用时会触发 时序图12-22部分。

当消费者发起服务调用后,服务提供者这边Handler#received 方法会被触发。根据上面的时序图可以看到到,received事件被投递到线程池后进行异步处理(如果是线程模型是all)。线程池任务被激活后调用了 HeaderExchangeHandler#received() 方法,

1. HeaderExchangeHandler#received

HeaderExchangeHandler#received 对根据请求类型进行了简单的分发处理,其的实现如下:

	// org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received 
	// 当消费者发送请求时会激活此方法
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
    	// 记录通道读的时间戳
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
        	// 如果是请求类型消息
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                	// 处理事件对象,如心跳事件
                    handlerEvent(channel, request);
                } else {
                	// 如果需要有返回值的请求req-res,twoway,此时为双向通信,需要获取调用结果并返回给服务消费端
                    if (request.isTwoWay()) {
                        handleRequest(exchangeChannel, request);
                    } else {
                    	// 不需要有返回值的请求 req, oneway,此时为单向通信,仅向后调用指定服务即可,无需返回调用结果
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {	// 响应消息,服务消费方会执行此处逻辑
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
	            // telnet 相关	Dubbo Qos 相关
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }   
	

对于双向通信,HeaderExchangeHandler 首先向后进行调用,得到调用结果。然后将调用结果封装到 Response 对象中,最后再将该对象返回给服务消费方。如果请求不合法,或者调用失败,则将错误信息封装到 Response 对象中,并返回给服务消费方。

因为提供者和消费者都会通过 HeaderExchangeHandler 来处理消息,对提供者来说接收的就是Request 请求消息,对于消费者则为 Response 响应消息,对于 Dubbo QOS 就是 telnet 消息。所以 HeaderExchangeHandler 会区分各种消息类型。

我们这里只讨论请求消息Request,其处理逻辑如下:

  1. 对于事件请求:使用 handlerEvent(channel, request); 来处理消息,这里处理了只读事件。

    	// 处理事件消息
        void handlerEvent(Channel channel, Request req) throws RemotingException {
        	// 如果是只读事件,设置通道为只读
            if (req.getData() != null && req.getData().equals(Request.READONLY_EVENT)) {
                channel.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);
            }
        }
    
  2. 对于需要返回值的请求: 会调用 handleRequest(exchangeChannel, request); 来处理消息,并将返回值写回通道

  3. 对于无需返回值的请求 :会调用 handler.received(exchangeChannel, request.getData()); 来处理消息。此时 handler 实例为 DubboProtocol#requestHandler。

1.2 HeaderExchangeHandler#handleRequest

上面我们看到,对于请求消息

  • 如果需要返回值, 则调用 HeaderExchangeHandler#handleRequest 来处理
  • 如果请求不需要返回值,则调用 DubboProtocol#requestHandler 的 received 方法来处理

代码如下:

 	// 如果需要有返回值的请求req-res,twoway,此时为双向通信,需要获取调用结果并返回给服务消费端
     if (request.isTwoWay()) {
         handleRequest(exchangeChannel, request);
     } else {
     	// 不需要有返回值的请求 req, oneway,此时为单向通信,仅向后调用指定服务即可,无需返回调用结果
         handler.received(exchangeChannel, request.getData());
     }

关于 DubboProtocol#requestHandler 我们下面再讲,我们这里先来看 HeaderExchangeHandler#handleRequest 的实现, 如下:

	// 处理请求消息
 	void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
 		
        Response res = new Response(req.getId(), req.getVersion());
        // 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
             // 设置 BAD_REQUEST 状态
            res.setStatus(Response.BAD_REQUEST);

            channel.send(res);
            return;
        }
        // find handler by message class.
         // 获取 data 字段值,也就是 RpcInvocation 对象
        Object msg = req.getData();
        try {
        	/*************************** 处理请求 ***************************/
            // handle data.
             // 1. 继续向下调用, 这里调用的是 replay 方法,这里的handler 为 DubboProtocol#requestHandler
             
            CompletableFuture<Object> future = handler.reply(channel, msg);
            // 2. 如果请求已经完成则写回结果
            if (future.isDone()) {
                res.setStatus(Response.OK);
                res.setResult(future.get());
                channel.send(res);
                return;
            }
            // 3. 如果还没完成,则等待结束后写回结果
            future.whenComplete((result, t) -> {
                try {
                    if (t == null) {
                        res.setStatus(Response.OK);
                        res.setResult(result);
                    } else {
                        res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    channel.send(res);
                } catch (RemotingException e) {
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                } finally {
                    // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
                }
            });
            /******************************************************/
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }

其逻辑如下:

  1. 调用 DubboProtocol#requestHandler 的 reply 方法来处理请求。注意该方法的返回类型为 CompletableFuture,即无论同步还是异步调用,返回类型都是 CompletableFuture。
  2. CompletableFuture#isDone 判断请求处理是否完成,如果完成则返回true。将完成的结果写回通道。
  3. 执行到这一步说明请求尚未完成。这里通过 CompletableFuture#whenComplete 回调的方式,当请求处理完成后会调用 whenComplete 方法。而在 whenComplete 方法中会将请求结果写回通道。当提供者开启异步执行时,可能会出现这种情况。

关于Dubbo 的异步调用和异步执行,如有需要,详参: Dubbo笔记 ⑳ :消费者的异步调用
Dubbo笔记 ㉑ :提供者的异步执行


到这里我们可以发现 HeaderExchangeHandler 处理请求消息的主要逻辑还是透传给了下层处理,这里的下层是 DubboProtocol#requestHandler。不同的是

  • 如果需要返回值, 则调用 DubboProtocol#requestHandler 的 reply 方法 来处理
  • 如果请求不需要返回值,则调用 DubboProtocol#requestHandler 的 received 方法来处理

下面我们来看一下 DubboProtocol#requestHandler 的实现

2. DubboProtocol#requestHandler

DubboProtocol#requestHandler 的部分代码如下:

		// 处理不需要返回值的请求
  		@Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                // 1. 获取 invoker 实例
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                // 如果当前服务是回调服务,则校验回调方法是否存在
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || !methodsStr.contains(",")) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
						// ... 日志打印
                        return null;
                    }
                }
                // 获取 rpc 上下文
                RpcContext rpcContext = RpcContext.getContext();
                // 设置远端调用地址
                rpcContext.setRemoteAddress(channel.getRemoteAddress());
                // 2. 通过Invoker调用方法
                Result result = invoker.invoke(inv);
				// 3. 写回结果
				// 如果结果为 AsyncRpcResult类型则说明为服务提供方的异步执行
                if (result instanceof AsyncRpcResult) {
                    return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
                } else {
                	// 否则为同步执行,将结果转换为 CompletableFuture
                    return CompletableFuture.completedFuture(result);
                }
            }
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

		//  处理需要返回值的请求
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
        	// 如果消息类型是 Invocation 则调用 reply 方法,一般是 RpcInvocation
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

上面可以看到,无论是有无返回值的请求最终都会调用 reply 方法。我们可以划分出 reply 方法的三个阶段 :

  1. Invoker<?> invoker = getInvoker(channel, inv); : 这里调用的是 DubboProtocol#getInvoker,获取了 Invoker 实例,需要注意的是这里获取的Invoker 并不是DubboInvoker, 而是 被 ProtocolFilterWrapper 和 ProtocolListenerWrapper 包装后的 Invoker。
  2. Result result = invoker.invoke(inv); : 调用 Invoker 方法。
  3. 请求结果写回。对请求结果的写回分为同步和异步的情况。

下面我们来详细看一下这个三个阶段:

2.1 DubboProtocol#getInvoker

第一个阶段 Dubbo会通过 DubboProtocol#getInvoker 获取到 Invoker。 getInvoker 方法的实现为 DubboProtocol#getInvoker,其实现如下:

	Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
		
        boolean isCallBackServiceInvoke = false;
        boolean isStubServiceInvoke = false;
        int port = channel.getLocalAddress().getPort();
        String path = inv.getAttachments().get(Constants.PATH_KEY);
        // if it's callback service on client side
        // 本地存根和回调方法处理
        isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
        if (isStubServiceInvoke) {
            port = channel.getRemoteAddress().getPort();
        }
        //callback
        isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
        if (isCallBackServiceInvoke) {
            path = inv.getAttachments().get(Constants.PATH_KEY) + "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
            inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
        }
        // 获取服务key
        // 计算 service key,格式为 groupName/serviceName:serviceVersion:port。
        //  比如: dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
  		// 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象,
        // 服务导出过程中会将 <serviceKey, DubboExporter> 映射关系存储到 exporterMap 集合中
        // 在服务暴露过程中我们知道,DubboProtocol 将暴露的服务保存到了 exporterMap 中。这里获取到了暴露服务的 exporter 
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null) {
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
        }
		// 从exporter 中获取到 invoker 
        return exporter.getInvoker();
    }

之前的文章中我们提到过 Exporter 的结构如下图:

在这里插入图片描述
可以说获取到Exporter 就获取到了引用服务的实例,所以这里从 AbstractProtocol#exporterMap 中根据 serviceKey 获取到对应服务的 DubboExporter。从DubboExporter 中获取到服务的Invoker。

2.2 invoker.invoke(inv);

当我们获取到Invoker实例时,便可以通过 Invoker#invoke 进行服务方法调用。但是这里有个调用链(调用链的生成是在 Protocol 在Invoker 创建时,通过 ProtocolFilterWrapper包装类时,被过滤器装饰的结果,这一部分详参 :Dubbo笔记衍生篇③:ProtocolWrapper), 经过调用链后最终调用了服务提供方启动时AbstractProxyInvoker代理类创建的invoke方法,其调用如时序图(这里只列出来了Filter链中的一部分Filter):

在经过Filter 链后,会调用顺序如下:

AbstractProxyInvoker#invoke => AbstractProxyInvoker#doInvoke => RefWrapper#invokeMethod => 调用ref实例方法

Dubbo笔记④ : 服务发布流程 - doExportUrlsFor1Protocol 2.2.1 生成代理类 章节中 我们曾描述过Wrapper类的实例。Wrapper 根据methodName 来确定调用ref 的哪个方法,随后调用内部ref实例的方法。此时便完成了ref 实例的调用。

2.3 结果写回

当前两步结束后, ref#method 的调用已经结束,下面就要将方法返回值写回通道,即返回给消费者(如果需要返回值)。

AbstractProxyInvoker#invoke 中我们看到其根据同步请求或者异步请求来返回了不同的Result

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        RpcContext rpcContext = RpcContext.getContext();
   		///... try
            Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            // 根据invocation判断当前请求是否是异步请求
            if (RpcUtils.isReturnTypeFuture(invocation)) {
                return new AsyncRpcResult((CompletableFuture<Object>) obj);
            } else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back.
                return new AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture());
            } else {
                return new RpcResult(obj);
            }
       	/// catch
    }

而在 DubboProtocol#requestHandler.reply 方法中根据 Result 的类型进行同步异步调用的处理。

	// 如果结果为 AsyncRpcResult类型则说明为服务提供方的异步执行
	if (result instanceof AsyncRpcResult) {
         return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
     } else {
     	// 否则为同步执行,将结果转换为 CompletableFuture
         return CompletableFuture.completedFuture(result);
     }

这里返回的类型是 CompletableFuture。而在 HeaderExchangeHandler#received 接收到 DubboProtocol#requestHandler 的返回后做了如下操作:
在这里插入图片描述


关于 Dubbo的异步调用和执行的内容,如有需要详参:

  1. Dubbo笔记 ⑳ :消费者的异步调用
  2. Dubbo笔记 ㉑ :提供者的异步执行
  3. Dubbo笔记衍生篇⑦:异步场景下的问题

五、总结

  1. 消费者在与消费者连接时会触发 connected 事件,该事件中并没有什么特殊处理。
  2. 当消费者调用提供者服务时,服务提供者会触发提供者 received 事件,提供者会根据消息类型分发事件(如事件消息、请求消息、响应消息等)
  3. 对于请求消息,提供者会根据消息内容获取到serviceKey( 协议/服务:版本:端口号) 从 ExporterMap 中获取到 发布的服务信息Exporter
  4. 从 Exporter 中获取到Invoker,通过Invoker.invoke 调用来直接调用 ref 的指定方法。并对返回值进行处理。

以上:内容部分参考
《深度剖析Apache Dubbo 核心技术内幕》
https://dubbo.apache.org/zh/docs/v2.7/dev/source/
https://inetyoung.blog.csdn.net/article/details/108309338
https://blog.csdn.net/yuanshangshenghuo/article/details/108315800
如有侵扰,联系删除。 内容仅用于自我记录学习使用。如有错误,欢迎指正

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐