业务场景

使用cloud gateway 作为服务网关,服务上线下线时,gateway可能会产生请求404现象

产生原因

gateway中有个缓存 CachingRouteLocator ,而网关服务使用的是lb模式,服务在上线或者下线之后,未能及时刷新这个缓存

解决方案

思路

查看源码

观察CachingRouteLocator源码,发现其为Spring的ApplicationListener一个子类实现,监听事件为RefreshRoutesEvent,同时在事件处理onApplicationEvent中,重新调用了刷新路由方法

/*
 * Copyright 2013-2019 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.gateway.route;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.cache.CacheFlux;
import reactor.core.publisher.Flux;

import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
import org.springframework.cloud.gateway.event.RefreshRoutesResultEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.ApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;

/**
 * @author Spencer Gibb
 */
public class CachingRouteLocator implements Ordered, RouteLocator,
		ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware {

	private static final Log log = LogFactory.getLog(CachingRouteLocator.class);

	private static final String CACHE_KEY = "routes";

	private final RouteLocator delegate;

	private final Flux<Route> routes;

	private final Map<String, List> cache = new ConcurrentHashMap<>();

	private ApplicationEventPublisher applicationEventPublisher;

	public CachingRouteLocator(RouteLocator delegate) {
		this.delegate = delegate;
		routes = CacheFlux.lookup(cache, CACHE_KEY, Route.class)
				.onCacheMissResume(this::fetch);
	}

	private Flux<Route> fetch() {
		return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
	}

	@Override
	public Flux<Route> getRoutes() {
		return this.routes;
	}

	/**
	 * Clears the routes cache.
	 * @return routes flux
	 */
	public Flux<Route> refresh() {
		this.cache.clear();
		return this.routes;
	}

	@Override
	public void onApplicationEvent(RefreshRoutesEvent event) {
		try {
			fetch().collect(Collectors.toList()).subscribe(list -> Flux.fromIterable(list)
					.materialize().collect(Collectors.toList()).subscribe(signals -> {
						applicationEventPublisher
								.publishEvent(new RefreshRoutesResultEvent(this));
						cache.put(CACHE_KEY, signals);
					}, throwable -> handleRefreshError(throwable)));
		}
		catch (Throwable e) {
			handleRefreshError(e);
		}
	}

	private void handleRefreshError(Throwable throwable) {
		if (log.isErrorEnabled()) {
			log.error("Refresh routes error !!!", throwable);
		}
		applicationEventPublisher
				.publishEvent(new RefreshRoutesResultEvent(this, throwable));
	}

	@Deprecated
	/* for testing */ void handleRefresh() {
		refresh();
	}

	@Override
	public int getOrder() {
		return 0;
	}

	@Override
	public void setApplicationEventPublisher(
			ApplicationEventPublisher applicationEventPublisher) {
		this.applicationEventPublisher = applicationEventPublisher;
	}

}

解决方案

所以实现方案为,编写一个nacos的事件监听器,用于监听服务上下线信息,而后,在nacos的事件监听器中,调用spring的发布事件即可

解决方案代码

服务实例变更事件处理

import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.client.naming.event.InstancesChangeEvent;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
import org.springframework.cloud.gateway.route.CachingRouteLocator;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.RouteRefreshListener;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 路由刷新监听器
 * @author chunyang.leng
 * @date 2021-08-02 7:27 下午
 */
@Component
public class InstancesChangeEventListener extends Subscriber<InstancesChangeEvent> implements ApplicationEventPublisherAware {
    private static final Logger logger = LoggerFactory.getLogger(InstancesChangeEventListener.class);

    private ApplicationEventPublisher applicationEventPublisher;
    @Autowired
    private RouteRefreshListener routeRefreshListener;
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private RouteLocator routeLocator;
    @PostConstruct
    private void post(){
        NotifyCenter.registerSubscriber(this);
    }


    /**
     * Event callback.
     *
     * @param event {@link Event}
     */
    @Override
    public void onEvent(InstancesChangeEvent event) {
        logger.info("接收到 InstancesChangeEvent 订阅事件:{}", JSON.toJSONString(event));
        publishEvent();
    }

    /**
     * Type of this subscriber's subscription.
     *
     * @return Class which extends {@link Event}
     */
    @Override
    public Class<? extends com.alibaba.nacos.common.notify.Event> subscribeType() {
        return InstancesChangeEvent.class;
    }

    public void publishEvent(){
        CachingRouteLocator cachingRouteLocator = (CachingRouteLocator)routeLocator;
        cachingRouteLocator.refresh();
        applicationEventPublisher.publishEvent(new RefreshRoutesEvent(new Object()));

        routeRefreshListener.onApplicationEvent(new ContextRefreshedEvent(applicationContext));

        cachingRouteLocator.onApplicationEvent(new RefreshRoutesEvent(new Object()));
    }

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }
}

服务列表事件变更处理

import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.client.config.impl.ServerlistChangeEvent;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
import org.springframework.cloud.gateway.route.CachingRouteLocator;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.RouteRefreshListener;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 路由列表刷新监听器
 * @author chunyang.leng
 * @date 2021-08-03 12:37 下午
 */
@Component
public class ServerListChangeEventListener extends Subscriber<ServerlistChangeEvent> implements ApplicationEventPublisherAware {

    private static final Logger logger = LoggerFactory.getLogger(ServerListChangeEventListener.class);

    private ApplicationEventPublisher applicationEventPublisher;
    @Autowired
    private RouteRefreshListener routeRefreshListener;
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private RouteLocator routeLocator;

    @PostConstruct
    private void post(){
        NotifyCenter.registerSubscriber(this);
    }


    /**
     * Event callback.
     *
     * @param event {@link Event}
     */
    @Override
    public void onEvent(ServerlistChangeEvent event) {
        logger.info("接收到 ServerListChangeEvent 订阅事件:{}", JSON.toJSONString(event));
        publishEvent();
    }

    /**
     * Type of this subscriber's subscription.
     *
     * @return Class which extends {@link Event}
     */
    @Override
    public Class<? extends com.alibaba.nacos.common.notify.Event> subscribeType() {
        return ServerlistChangeEvent.class;
    }

    public void publishEvent(){
        CachingRouteLocator cachingRouteLocator = (CachingRouteLocator)routeLocator;
        cachingRouteLocator.refresh();
        applicationEventPublisher.publishEvent(new RefreshRoutesEvent(new Object()));
        routeRefreshListener.onApplicationEvent(new ContextRefreshedEvent(applicationContext));
        cachingRouteLocator.onApplicationEvent(new RefreshRoutesEvent(new Object()));
    }

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }
}

备注:
1、修改nacos的心跳检测时间应该也可以实现相同的效果,但未做测试
2、业务服务应该使用kill停止,不应该使用 kill -9
3、如果为mvc等使用nacos原生chient注册服务,应添加kill 通知机制
示例代码:

 Runtime.getRuntime()
 .addShutdownHook(new Thread(()->{
                try {
                    logger.info("正在尝试向nacos注销服务");
                    namingService.deregisterInstance("你的服务名字", "你的服务group", instance);
                    logger.info("nacos注销服务完毕");
                } catch (NacosException e) {
                   logger.error("nacos服务注销出现异常",e);
                }
            }));
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐