服务的发布与订阅

这里的发布订阅机制针对的是注册在nacos中的服务而言,如果服务A被另一个服务B所订阅(subscribe),那么,当服务A的某些参数变化时,订阅服务B会监听到服务A的某些变更。实际使用时,某些业务场景下,这种发布订阅模式比较灵活实用。

| 发布订阅使用

| 工作原理

| 一些注意事项

一、发布订阅使用

1、发布与订阅角色

这里我们已先前构建的服务provider-service作为被订阅服务,也就是发布一方,而服务client-service则作为订阅一方,当我们在nacos后台修改服务provider-service的相关参数后,订阅服务client-service通过调取subscribe订阅方法中的回调事件EventListener可及时获取变更内容。

2、订阅服务发起订阅

我们可在控制器DemoController中,添加一个新方法,当然也可以是其它Bean均可,内容如下:

@PostConstruct
public void testSubscribe() throws NacosException {
    Properties properties =
new Properties();
   
properties.setProperty("serverAddr","127.0.0.1:8848");
   
properties.setProperty("namespace","4d973c7e-0a7e-4488-8e45-83ee5441f9d9");

   
NamingService naming = NamingFactory.createNamingService(properties);
   
naming.subscribe("provider-service","provider_group", new EventListener() {
       
@Override
       
public void onEvent(Event event) {
            System.
out.println("serviceName:" + ((NamingEvent)event).getServiceName());
           
System.out.println("instances:" + ((NamingEvent)event).getInstances());
           
// 获取信息后,可做相关拓展业务
            // TODO
       
}
    })
;
}

如上,我们使用了@PostConstruct注解,当Spring初始化完成后,执行该注解修饰的方法,进而发起服务订阅,整体使用比较简单,需结合实际业务场景需要来实现拓展。

说明:

A、serverAddr为要订阅的服务所在的nacos注册地址;

B、namespace为要订阅的服务在nacos中的命名空间;

C、naming.subscribe(x,y,z):这里的x为要订阅服务名字,y为要订阅服务所在的分组,默认是DEFAULT_GROUP,z则为订阅时注入的事件监听,我们通过其回调方法onEvent(Event event)可及时获取到被订阅服务的变更内容。

3、变更服务,验证订阅

首先,我们先启动被订阅服务provider-servide,在启动订阅者服务client-service时,发现订阅者服务控制台会打印如下从被订阅者服务初始获取的订阅参数:

其次,我们在nacos后台修改provider-service的某些参数,从该服务右侧的“详情”点击进入修改即可。

如上图,编辑服务基本参数后,并不发布更新,只有点击下方的集群区域中服务列表右侧“编辑”弹出的内容时,才发布更新:

订阅服务接收到的变更信息如下,其中权重和元数据已变更:

serviceName:provider_group@@provider-service

instances:[{"clusterName":"DEFAULT","enabled":true,"ephemeral":true,"healthy":true,"instanceHeartBeatInterval":5000,"instanceHeartBeatTimeOut":15000,"instanceId":"xxx.xxx.xxx.xxx#10001#DEFAULT#provider_group@@provider-service","ip":"xxx.xxx.xxx.xxx","ipDeleteTimeout":30000,"metadata":{"preserved.register.source":"SPRING_CLOUD_TEST"},"port":10001,"serviceName":"provider_group@@provider-service","weight":2.0}]

二、工作原理

Nacos服务参数变更的发布与订阅模式的实现,主要是由消息生产者生产消息,将消息放入到指定位置,并发布事件通知事先已注册的订阅者;消息订阅者先将自己的服务订阅进行注册,已备发布者更新发布消息时能够接收到消息,获取变化的内容后,订阅者可做后续的业务处理。

1、如何发布变更

首先,这里介绍的是单实例模式的nacos,所以其提供了如下子类,并通过其下的put(key,value)方法放入变更数据:

StandalonePersistentServiceProcessor

然后,通过其继承的如下父类中的onApply(WriteRequest request)来存储变更的数据:

BasePersistentServiceProcessor

而变更的数据存放在哪里昵?实际上,nacos将这些变更的数据存放在内存和本地文件中,采用了如下的内存集合:

Map<Key, byte[]> storage = new ConcurrentSkipListMap<>()

本地文件存放位置为nacos/naming/${namespace},文件名的规则为:${group}@@${serviceName},文件内容如下:

{"hosts":[{"ip":"xxx.xxx.xxx.xxx","port":10001,"valid":true,"healthy":true,"marked":false,"instanceId":"xxx.xxx.xxx.xxx#10001#DEFAULT#provider_group@@provider-service","metadata":{"preserved.register.source":"SPRING_CLOUD_123"},"enabled":true,"weight":1.0,"clusterName":"DEFAULT","serviceName":"provider_group@@provider-service","ephemeral":true}],"dom":"provider_group@@provider-service","name":"provider_group@@provider-service","cacheMillis":10000,"lastRefTime":1657419078071,"checksum":"ec764828ae0792ef15fd2a1aa8f2a3f5","useSpecifiedURL":false,"clusters":"","env":"","metadata":{"test":"12333"}}

最后,通过调取NotifyCenter的publishEvent(final Event event)方法及时发布变更通知,源码如下:

private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
   
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
       
return INSTANCE.sharePublisher.publish(event);
   
}
   
   
final String topic = ClassUtils.getCanonicalName(eventType);
   
   
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
    if
(publisher != null) {
       
return publisher.publish(event);
   
}
   
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
    return false;

}

如上,publisher.publish(event)执行的是DefaultPublisher的publish(event),其源码如下:

public boolean publish(Event event) {
    checkIsStart()
;
    boolean
success = this.queue.offer(event);
    if
(!success) {
       
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
       
receiveEvent(event);
        return true;
   
}
   
return true;
}

其中this.queue.offer(event)代表从内存事件队列BlockingQueue<Event>中消费一个事件,而其中的receiveEvent(event)则是以异步线程方式通过onEvent(event)来回调订阅者注册时提供的EventListener回调事件,这样订阅者就可以及时收到变更的数据了。

2、如何订阅监听

NacosNamingService接口位于client模块,实现了api模块的NamingService接口,其提供了如下订阅方法,通过提供要订阅的服务名字、所属分组,并注册一个监听回调事件EventListener,就可以通过其中的onEvent获取变更内容:

/**
 * Subscribe service to receive events of instances alteration.
 *
 * @param
serviceName name of service
 * @param
groupName   group of service
 * @param
listener    event listener
 * @throws NacosException nacos exception
 */

void subscribe(String serviceName, String groupName, EventListener listener) throws NacosException;

然后,通过HostReactor类的如下订阅方法订阅:

/**
 * subscribe instancesChangeEvent.
 *
 * @param
serviceName   combineServiceName, such as 'xxx@@xxx'
 * @param
clusters      clusters, concat by ','. such as 'xxx,yyy'
 * @param
eventListener custom listener
 */

public void subscribe(String serviceName, String clusters, EventListener eventListener) {
   
notifier.registerListener(serviceName, clusters, eventListener);
   
getServiceInfo(serviceName, clusters);
}

进而再调取InstancesChangeNotifier的如下方法:

/**
 * register listener.
 *
 * @param
serviceName combineServiceName, such as 'xxx@@xxx'
 * @param
clusters    clusters, concat by ','. such as 'xxx,yyy'
 * @param
listener    custom listener
 */

public void registerListener(String serviceName, String clusters, EventListener listener) {
    String key = ServiceInfo.getKey(serviceName
, clusters);
   
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if
(eventListeners == null) {
       
synchronized (lock) {
            eventListeners =
listenerMap.get(key);
            if
(eventListeners == null) {
                eventListeners =
new ConcurrentHashSet<EventListener>();
               
listenerMap.put(key, eventListeners);
           
}
        }
    }
    eventListeners.add(listener)
;
}

将订阅监听信息存放再如下内存结构中:

Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>()

到这里,nacos的发布/订阅基本流程已介绍完成,因篇幅有限,若读者想要了解更多实现细节,请自行查看源码。

三、一些注意事项

1、命名空间和分组识别

这里的命名空间和分组更多针对的是订阅者,如果被订阅的服务不在默认的空间和分组时,订阅时就需要提供明确的namespace和group,再加上服务名字就可以精确定位到要订阅的服务,否则订阅不到服务变更。

2、从实际业务场景出发

虽然nacos提供了服务订阅与发布的功能,但也不能乱用,就笔者所了解,大多情况下,服务的订阅与发布使用频率不高,仅针对某些特殊需求,如:某个服务的某些实例参数变化会影响其它功能的或是要记录和分析服务变化日志的才会用到,所以,要从实际业务场景考虑,看是否需要开启该功能。

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐