1、nacos客户端主要功能

nacos客户端核心功能有三点,服务注册,服务心跳和服务发现

服务注册:Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务,提供自身的元数据,比如ip地址、端口等信息。 Nacos Server接收到注册请求后,就会把这些元数据信息存储在一个双层的内存Map中。

服务心跳:在服务注册后,Nacos Client会维护一个定时心跳来持续通知Nacos Server,说明服务一直处于可用状态,防止被剔除。默认 5s发送一次心跳。

服务发现:服务消费者(Nacos Client)在调用服务提供者的服务时,会发送一个REST请求给Nacos Server,获取上面注册的服务清 单,并且缓存在Nacos Client本地,同时会在Nacos Client本地开启一个定时任务定时(1秒执行1次)拉取服务端最新的注册表信息更新到本地缓存

本文环境是基于spring-cloud(Greenwich.SR3)

2、nacos客户端的启动

在Dalston.SR4版本之前,nacos客户端的启动,需要在spring-cloud主函数上添加**@EnableDiscoveryClient**注解。

比如在Greenwich.SR3版本中@EnableDiscoveryClient的作用是注册AutoServiceRegistrationConfiguration,但是AutoServiceRegistrationConfiguration是一个空实现

@Configuration
@EnableConfigurationProperties(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public class AutoServiceRegistrationConfiguration {

}

所以我们只要引入下面的依赖,在配置文件,nacos客户端就会启动

<dependency>
         <groupId>com.alibaba.cloud</groupId>
         <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

下面开始其启动流程研究之旅

老规矩,从spring.factories文件开始
在这里插入图片描述
这里面比较主要的配置文件是NacosDiscoveryAutoConfiguration

@Configuration
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class })
public class NacosDiscoveryAutoConfiguration {

	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosServiceRegistry(nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(nacosDiscoveryProperties, context);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}
}

在这里插入图片描述

NacosAutoServiceRegistration从名字就知道它作用是Nacos自动服务注册,他是一个监听器,监听WebServerInitializedEvent事件,这个事件在Servlet容器启动后,会发布

public abstract class AbstractAutoServiceRegistration<R extends Registration>
		implements AutoServiceRegistration, ApplicationContextAware,
		ApplicationListener<WebServerInitializedEvent> {

   //忽略...
	@Override
	@SuppressWarnings("deprecation")
	public void onApplicationEvent(WebServerInitializedEvent event) {
		bind(event);
	}

	@Deprecated
	public void bind(WebServerInitializedEvent event) {
		ApplicationContext context = event.getApplicationContext();
		if (context instanceof ConfigurableWebServerApplicationContext) {
			if ("management".equals(((ConfigurableWebServerApplicationContext) context)
					.getServerNamespace())) {
				return;
			}
		}
		this.port.compareAndSet(0, event.getWebServer().getPort());
		this.start();
	}

    public void start() {
		if (!isEnabled()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Discovery Lifecycle disabled. Not starting");
			}
			return;
		}
		if (!this.running.get()) {
			this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
			//调用nacos.client包下的NacosNamingService,进行服务注册
			register();
			if (shouldRegisterManagement()) {
				registerManagement();
			}
			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, getConfiguration()));
			this.running.compareAndSet(false, true);
		}

	}
 //忽略...
}

客户端通过NacosNamingService来管理和注册服务

public class NacosNamingService implements NamingService {

	public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
	        //实例有2种,临时实例和持久化实例,默认临时实例
	        if (instance.isEphemeral()) {
	            BeatInfo beatInfo = new BeatInfo();
	            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
	            beatInfo.setIp(instance.getIp());
	            beatInfo.setPort(instance.getPort());
	            beatInfo.setCluster(instance.getClusterName());
	            beatInfo.setWeight(instance.getWeight());
	            beatInfo.setMetadata(instance.getMetadata());
	            beatInfo.setScheduled(false);
	            long instanceInterval = instance.getInstanceHeartBeatInterval();
	            beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
	            //发送心跳
	            this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
	        }
	       // 服务注册
	        this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
	    }
}

临时实例向Nacos注册,Nacos会将它存储到内存中,通过心跳方式保活。默认模式是:客户端心跳上报Nacos实例健康状态,默认间隔5秒,Nacos在15秒内未收到该实例的心跳,则会设置为不健康状态,超过30秒则将实例删除。

持久化实例向Nacos注册,Nacos会对其进行持久化处理。当该实例不存在时,Nacos只会将其健康状态设置为不健康,但并不会对将其从服务端删除。

3、发送心跳

在服务注册后,Nacos Client会维护一个定时心跳来持续通知Nacos Server,说明服务一直处于可用状态,防止被剔除。默认 5s发送一次心跳

public class BeatReactor {

 public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        //fix #1733
        if ((existBeat = dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }
        dom2Beat.put(key, beatInfo);
        //创建调度线程,5秒执行一次
        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }   
}

BeatTask是发送心跳的任务

public class BeatReactor {

    //忽略代码...
    class BeatTask implements Runnable {

        //忽略代码...
        @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            //底层是调用put接口/instance/beat
            long result = serverProxy.sendBeat(beatInfo);
            long nextTime = result > 0 ? result : beatInfo.getPeriod();
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }
}
4、服务注册

Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务,提供自身的元数据,比如ip地址、端口等信息。 Nacos Server接收到注册请求后,就会把这些元数据信息存储在一个双层的内存Map中

public class NamingProxy {

   public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

        NAMING_LOGGER.info("”);

        final Map<String, String> params = new HashMap<String, String>(9);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));
        //调用服务注册接口 /instance
        reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

    }
}
5、服务发现

服务消费者(Nacos Client)在调用服务提供者的服务时,会发送一个REST请求给Nacos Server,获取上面注册的服务清 单,并且缓存在Nacos Client本地,同时会在Nacos Client本地开启一个定时任务定时拉取服务端最新的注册表信息更新到本地缓存

nacos是通过NacosNamingService的getAllInstances方法进行服务发现的

public class NacosNamingService implements NamingService {
@Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {

        ServiceInfo serviceInfo;
        // 是否订阅了服务
        if (subscribe) { 
            // 从缓存或注册中心获取服务信息
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            // 直接从注册中心获取服务
            // getServiceInfoDirectlyFromServer方法很简单,底层调的是/instance/list接口获取服务
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
        return list;
    }
 }

HostReactor.getServiceInfo

public class HostReactor {

    private Map<String, ServiceInfo> serviceInfoMap; //服务缓存

	public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
	
	        String key = ServiceInfo.getKey(serviceName, clusters);
	        // 注册中心与服务提供方失去联系,会把该服务置成Failover状态,下面的代码会直接缓存取服务信息,或者取不到返回空服务
	        if (failoverReactor.isFailoverSwitch()) {
	            return failoverReactor.getService(key);
	        }
	       // 从本地缓存serviceInfoMap中取
	        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
	        // 如果本地缓存serviceInfoMap没有,则请求注册中心,并更新本地缓存。
	        if (null == serviceObj) {
	            serviceObj = new ServiceInfo(serviceName, clusters);
	            //设置键
	            serviceInfoMap.put(serviceObj.getKey(), serviceObj);

	            updatingMap.put(serviceName, new Object());
	             // 请求注册中心,并更新本地缓存。底层调的是/instance/list接口
	            updateServiceNow(serviceName, clusters);
	            updatingMap.remove(serviceName);
	
	        } else if (updatingMap.containsKey(serviceName)) {
	            if (UPDATE_HOLD_INTERVAL > 0) {
	                synchronized (serviceObj) {
	                    try {
	                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
	                    } catch (InterruptedException e) {
	                        NAMING_LOGGER.error(");
	                    }
	                }
	            }
	        }
	        // 生成定时任务,等一段时间再执行更新操作
	        scheduleUpdateIfAbsent(serviceName, clusters);
	        return serviceInfoMap.get(serviceObj.getKey());
	    }
}

如果第一次执行scheduleUpdateIfAbsent方法,那么会在缓存futureMap生成一个key,利用这个key判断定时任务是否第一次执行

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
     if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
         return;
     }

     synchronized (futureMap) {
         if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
             return;
         }

         ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
         futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
     }
 }
//1秒执行1次
public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
        return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

具体任务详情如下

public class HostReactor {

    public class UpdateTask implements Runnable {
        long lastRefTime = Long.MAX_VALUE;
        private String clusters;
        private String serviceName;

        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }

        @Override
        public void run() {
            try {
                //从缓存中获取实例
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                //获取不到
                if (serviceObj == null) {
                    updateServiceNow(serviceName, clusters);
                    executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                    return;
                }
                //
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    updateServiceNow(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // 只调用/instance/list接口,传递数据,不更新数据
                    refreshOnly(serviceName, clusters);
                }
                // 循环回调自身
                executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);

                lastRefTime = serviceObj.getLastRefTime();
            } catch (Throwable e) {
                NAMING_LOGGER.warn("“);
            }

        }
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐