本文分析Nacos基于Nacos 2.0

Nacos中服务注册中心默认是AP模式,如果设置为CP模式
那么客户端设置 spring.cloud.nacos.discovery.ephemeral=false (默认为true) ,表示是启用AP模式

接下来我们看看Nacos中对于AP、CP模式是怎么实现的。

首先说明一下,在Nacos中默认基于HTTP的端口号是8848 ,Nacos2.0增加了gRPC,而gRPC有两个地方,一个是和客户端通信,一个是集群节点之间的通信。 Nacos中gRPC的端口号都是基于HTTP端口进行一定的漂移,客户端通信端口是漂移1000,即默认为:8848+1000=9848,而集群之间的通信端口漂移 1001,即8848+·1001=9849,这里需要注意如果网络安全需要开启相关端口的话,那么这里需要开通,8848,9848,9849,三个端口号

AP模式

在Nacos服务注册,即naming服务中,对于AP模式,是采用gRPC通信的,而协议则是自己实现了一个名为Distro协议。我们看下客户端是怎么进行服务注册的,客户端进行服务注册主要是通过NacosNamingService来实现的:

    public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
            throws NacosException {
        Instance instance = new Instance();
        instance.setIp(ip);
        instance.setPort(port);
        instance.setWeight(1.0);
        instance.setClusterName(clusterName);
        registerInstance(serviceName, groupName, instance);
    }
    public void registerInstance(String serviceName, Instance instance) throws NacosException {
        registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
    }
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        clientProxy.registerService(serviceName, groupName, instance);
    }

而这里最终会通过clientProxy去进行服务注册,实现为NamingClientProxyDelegate:

 public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
    }
private NamingClientProxy getExecuteClientProxy(Instance instance) {
        return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
    }

可以看到在NamingClientProxyDelegate中会判断注册的服务的实例是否是临时的,如果是临时的则用gRPCClient否则httpClient请求
我们知道,默认instance.isEphemeral=true即是临时的,应采用gRPCClient去进行服务注册。
这里gRPCClient实现为NamingGrpcClientProxy:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
                instance);
        InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
                NamingRemoteConstants.REGISTER_INSTANCE, instance);
        requestToServer(request, Response.class);
        namingGrpcConnectionEventListener.cacheInstanceForRedo(serviceName, groupName, instance);
    }

最后通过gRPC向服务端发送了一个InstanceRequest请求。
到这里,客户端就完成了服务注册请求的发送,接下来看看服务端怎么处理的。
服务端gRPC实现都是继承BaseGrpcServer,其子类主要是不同线程池的选择,其中GrpcSdkServer用来处理和客户端之间的通信,GrpcClusterServer用来集群节点之间的通信
而相关请求在GrpcRequestAcceptor根据不同请求类型获取RequestHandlerRegistry对应的RequestHandler进行处理。

这里服务注册客户端发送的请求类型为InstanceRequest,请求将交由InstanceRequestHandler处理:

public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM,
                        String.format("Unsupported request type %s", request.getType()));
        }
    }
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }

可以看到,InstanceRequestHandler将会处理服务注册和服务下线两个处理,而这里实际处理会交给AP模式的实现EphemeralClientOperationServiceImpl:

public void registerInstance(Service service, Instance instance, String clientId) {
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        Client client = clientManager.getClient(clientId);
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        client.addServiceInstance(singleton, instanceInfo);
        client.setLastUpdatedTime();
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }

这里可以看到,在AP模式下,首先就会将注册的实例信息通过clientManager获取到对应的Client信息,直接写入到Client中,而这里写入的则是一个InstancePublishInfo的信息,后续会通过ClientManager能够获取到各个客户端节点发布的服务信息

这里比较需要注意的是client.addServiceInstance(singleton, instanceInfo);,在Nacos中AP是将服务信息放入到了client和ServiceManager中。而client的addServiceInstance实现如下:

public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        if (null == publishers.put(service, instancePublishInfo)) {
            MetricsMonitor.incrementInstanceCount();
        }
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
        return true;
    }

这里会发布ClientEvent.ClientChangedEvent事件,然后会异步的将注册的服务信息同步和集群其他节点,而这个事件的处理在DistroClientDataProcessor:

public void onEvent(Event event) {
        if (EnvUtil.getStandaloneMode()) {
            return;
        }
        if (!upgradeJudgement.isUseGrpcFeatures()) {
            return;
        }
        if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
            syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
        } else {
            syncToAllServer((ClientEvent) event);
        }
    }

最终会走syncToAllServer的逻辑:

// DistroClientDataProcessor.java
private void syncToAllServer(ClientEvent event) {
        Client client = event.getClient();
        if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
            return;
        }
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.DELETE);
        } else if (event instanceof ClientEvent.ClientChangedEvent) {
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.CHANGE);
        }
    }
// DistroProtocol.java
public void sync(DistroKey distroKey, DataOperation action, long delay) {
        for (Member each : memberManager.allMembersWithoutSelf()) {
            syncToTarget(distroKey, action, each.getAddress(), delay);
        }
    }

具体实现则是在DistroProtocol中,则是一个Distro协议的实现,这里最后是封装成一个DistroSyncChangeTask,实现了Runnable接口:

public void run() {
        String type = getDistroKey().getResourceType();
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
        if (null == transportAgent) {
            Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
            return;
        }
        Loggers.DISTRO.info("[DISTRO-START] {}", toString());
        if (transportAgent.supportCallbackTransport()) {
            doExecuteWithCallback(new DistroExecuteCallback());
        } else {
            executeDistroTask();
        }
    }
    
    private void executeDistroTask() {
        try {
            boolean result = doExecute();
            if (!result) {
                handleFailedTask();
            }
            Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
        } catch (Exception e) {
            handleFailedTask();
        }
    }

我们看看doExecute:

protected boolean doExecute() {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return true;
        }
        return getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer());
    }

这里会获取需要同步的数据,通过getDistroData,而最终会调用DistroClientDataProcessor.getDistroData:

public DistroData getDistroData(DistroKey distroKey) {
        Client client = clientManager.getClient(distroKey.getResourceKey());
        if (null == client) {
            return null;
        }
        byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
        return new DistroData(distroKey, data);
    }

通过clientManager获取到了该客户端节点注册的实例的信息。
另外一个就是在NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));发布该事件的时候,会调用ClientServiceIndexesManager.onEvent方法,添加到ClientServiceIndexesManager.publisherIndexes 这是一个Map<Service, Set<String>>结构,记录了一个服务的所有提供方,然后会发布ServiceEvent.ServiceChangedEvent事件,这个事件最后会在NamingSubscriberServiceV2Impl.onEvent中处理,会通过push方式(gRPC)将新上线的服务信息推送给消费方

另外在服务注册的时候发布ClientOperationEvent.ClientRegisterServiceEvent的时候也会在ClientServiceIndexesManager中处理该事件:

private void addPublisherIndexes(Service service, String clientId) {
        publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }

可以看到,处理很简单就是将发布的服务信息和对应的客户端节点放入到了一个ConcurrentMap<Service, Set<String>>中,这样
后面需要获取某个服务的所有的实例的时候,通过ClientServiceIndexesManager和Service能够获取到所有的该服务实例的clientId,然后通过ClientManager和clientId集合能够获取到该服务所有实例节点信息

CP模式

对于CP模式来说,Nacos2使用了Raft协议,在Nacos2中则是使用阿里开源的SOFA-Jraft来实现Raft协议,想了解的可以看这篇 Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制

客户端则是通过NamingHttpClientProxy模式发送相关请求。

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        final Map<String, String> params = new HashMap<String, String>(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, groupedServiceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); 
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
    }

在服务端则是InstanceController进行处理:

@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        final Instance instance = parseInstance(request);
        getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

最终在PersistentServiceProcessor.put中进行持久化:

public void put(String key, Record value) throws NacosException {
        final BatchWriteRequest req = new BatchWriteRequest();
        Datum datum = Datum.createDatum(key, value);
        req.append(ByteUtils.toBytes(key), serializer.serialize(datum));
        final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
                .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build();
        try {
            protocol.write(request);
        } catch (Exception e) {
            throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
        }
    }

这里的protocol实现为JRaftProtocol:

// JRaftProtocol.java
public Response write(WriteRequest request) throws Exception {
        CompletableFuture<Response> future = writeAsync(request);
        return future.get(10_000L, TimeUnit.MILLISECONDS);
    }
// JRaftServer.java
public CompletableFuture<Response> commit(final String group, final Message data,
            final CompletableFuture<Response> future) {
        LoggerUtils.printIfDebugEnabled(Loggers.RAFT, "data requested this time : {}", data);
        final RaftGroupTuple tuple = findTupleByGroup(group);
        if (tuple == null) {
            future.completeExceptionally(new IllegalArgumentException("No corresponding Raft Group found : " + group));
            return future;
        }

        FailoverClosureImpl closure = new FailoverClosureImpl(future);

        final Node node = tuple.node;
        if (node.isLeader()) {
            applyOperation(node, data, closure);
        } else {
            invokeToLeader(group, data, rpcRequestTimeoutMs, closure);
        }
        return future;
    }

可以看到,这里如果当前节点不是Leader节点, 则当前节点会查找Leader节点,然后将请求转发给Leader节点进行数据的写入。
在Nacos的Raft状态机实现为NacosStateMachine
当我们将服务注册的信息成功写入Raft集群过半节点之后,会触发NacosStateMachine.onApply方法,核心逻辑主要如下:

if (message instanceof WriteRequest) {
                        Response response = processor.onApply((WriteRequest) message);
                        postProcessor(response, closure);
                    }
                    
                    if (message instanceof ReadRequest) {
                        Response response = processor.onRequest((ReadRequest) message);
                        postProcessor(response, closure);
                    }

最终会调用PersistentClientOperationServiceImpl.onApply方法:

private void onInstanceRegister(Service service, Instance instance, String clientId) {
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        Client client = clientManager.computeIfAbsent(clientId, () -> new IpPortBasedClient(clientId, false));
        InstancePublishInfo instancePublishInfo = getPublishInfo(instance);
        client.addServiceInstance(singleton, instancePublishInfo);
        client.setLastUpdatedTime();
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    }

而这块的代码则是和AP模式的基本类似,这里在 client.addServiceInstance逻辑中会发布相关事件,通知服务消费端服务的变更信息,和AP模式一样也是通过PushExecutorDelegate代理来选择通过什么方式推送给消费客户端,而选择的逻辑则是判断建立连接的clientId是否包含"#",如果包含则采用UDP的实现方式:PushExecutorUdpImpl 。对于CP模式下采用HTTP请求,其clientId通过如下方式拼接:address + "#" + ephemeral
因此,CP模式下采用UDP来通知对应的服务消费端,而在服务消费客户端则有一个对应的PushReceiver来接收服务端返回的信息,这是一个Runnable实现类,run方法逻辑如下:

public void run() {
        while (!closed) {
            try {
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                
                udpSocket.receive(packet);
                
                String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
                
                PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    serviceInfoHolder.processServiceInfo(pushPacket.data);
                    
                    // send ack to server
                    ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                            + "\"\"}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                            + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
                            + "\"}";
                } else {
                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                            + "\", \"data\":" + "\"\"}";
                }
                
                udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                        packet.getSocketAddress()));
            } catch (Exception e) {
                if (closed) {
                    return;
                }
            }
        }
    }

如果有节点或者服务变化,会调用serviceInfoHolder.processServiceInfo(pushPacket.data);进行处理,另外,这里的UDP端口号取值逻辑为: 如果配置了系统变量push.receiver.udp.port那么取该值,否则随机生成,PushReceiver则会在不断监听来自UDP信息并处理。

另外,在CP模式下,客户端在注册服务信息的同时,对于每个服务会启动一个心跳服务,默认每隔5S时间会发送服务实例的心跳信息。

总结一下,在Nacos2服务发现的的使用场景下,使用CP模式还是AP模式主要是看服务提供客户端选择,如果服务注册指定为临时,那么走AP模式,否则走CP模式。

而对于服务消费客户端来说,订阅服务的的时候都是通过gRPC发布订阅,在消费服务客户端启动的时候同时启动了gRPC和HTTP两种通信方式,在HTTP冲初始化了PushReceiver也就是UDP监听方式,如果服务是AP模式,那么当服务提供方发生变动的时候则通过gRPC来通知服务消费方,如果是CP模式,那么通过UDP方式来通知消费方。
可以看到在管理服务注册信息时,CP模式与APM模式基本一样的,只不过AP模式上来就写入,而CP模式则是必须通过Raft算法集群过半节点写入成功之后才写入。

另外Nacos中不管CP还是AP模式获取服务注册成功都会向ClientServiceIndexesManagerClientManager写入信息,需要获取某个服务的所有的实例的时候,通过ClientServiceIndexesManager和Service能够获取到所有的该服务实例的clientId,然后通过ClientManager和clientId集合能够获取到该服务所有实例节点信息`

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐