gateway官方文档

gateway是spring cloud中一个用于替换zuul网关作用的子项目,基于webflux实现了异步非阻塞处理。gateway工程首先需在启动时注入Route,在请求进来时,会根据Route的Predicate匹配路由规则,然后经过GatewayFilter以及GlobalFilter的逐层处理定向至真正的后台服务。下面讲解下两种动态路由的实现方式。

1.动态刷新Route

这种方式是通过向spring注入一个动态的路由构造器,并在后台服务发生变化时发送RefreshRoutesEvent事件通知spring重新加载路由表,以实现动态路由的效果。

首先创建自定义的动态路由构造器,如下示例的是一个匹配全部请求的路由构造器,路由地址需通过setUri动态注入。

@Component
public class TestRouteLocator implements RouteLocator {
    private String uri = "";

    public Flux<Route> getRoutes() {
        Route route = Route.async()
                .id("test-route")
                .order(Integer.MAX_VALUE)
                .asyncPredicate(exchange -> Mono.just(true))
                .uri(uri)
                .build();
        return Flux.just(route);
    }

    public void setUri(String uri) {
        this.uri = uri;
    }
}

然后需要创建一个监听后台地址变化的处理器,我这里使用zookeeper作为服务注册中心,后台服务会在启动时将自身地址写入/test路径

@Component
public class TestHostHandle implements NodeCacheListener {
    @Resource
    private CuratorFramework curatorFramework;

    @Resource
    private TestRouteLocator testRouteLocator;

    @Resource
    private ApplicationEventPublisher publisher;

    private NodeCache nodeCache;

    @PostConstruct
    public void init() throws Exception {
        // 等到zookeeper连接成功
        curatorFramework.blockUntilConnected();
        // 同步/test节点
        nodeCache = new NodeCache(curatorFramework, "/test");
        // 注册节点变化监听器
        nodeCache.getListenable().addListener(this);
        nodeCache.start();
    }

    /**
     * zookeeper发生变化是触发该方法
     */
    public void nodeChanged() {
        // 读取后台服务节点地址
        String host = new String(nodeCache.getCurrentData().getData(), StandardCharsets.UTF_8);
        // 将地址注入到路由构造器
        testRouteLocator.setUri(host);
        // 触发RefreshRoutesEvent事件
        publisher.publishEvent(new RefreshRoutesEvent(this));
    }
}

如上在触发RefreshRoutesEvent事件后,spring会重新调用的路由构造器的getRoutes方法,这样新的地址就被注入到了spring的路由列表中了。

2.自定义Filter动态解析服务地址

这种方式是参照gateway集成ribbon负载均衡的方式编写的,其动态地址转换由ReactiveLoadBalancerClientFilter负责。

首先参照spring负载均衡前缀lb定义一个自己的前缀,这里我定义的是gateway,然后以gateway为前缀编写配置文件。

spring:
  cloud:
    gateway:
      routes:
        - id: test-route
          uri: gateway://test
          predicates:
            - Path=/test/**

如上所示,我自定义了一个地址gateway://test,然后针对这个地址我编写了一个GlobalFilter。

/**
 * /test服务真实路径解析及负载均衡
 */
@Component
public class TestBalancerFilter implements GlobalFilter, Ordered, PathChildrenCacheListener {
    @Resource
    private CuratorFramework curatorFramework;

    private RandomList pathList = new RandomList();

    private PathChildrenCache pathChildrenCache;

    @PostConstruct
    public void init() throws Exception {
        // 阻塞等待zookeeper客户端启动
        curatorFramework.blockUntilConnected();
        // 创建节点同步客户端
        pathChildrenCache = new PathChildrenCache(curatorFramework, "/test", true);
        // 添加节点变化事件监听
        pathChildrenCache.getListenable().addListener(this);
        // 启动客户端
        pathChildrenCache.start();
    }

    /**
     * 实现Gateway过滤器方法 如果地址符合gateway://address格式则解析后台服务真实地址
     */
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // 获取路由后的地址
        URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
        // 当格式符合gateway时开始当前滤器逻辑 否则进入下游过滤器
        if (uri == null || "gateway".endsWith(uri.getScheme())) {
            // 获取随机地址
            URI gateway = pathList.getRandom();

            if (gateway == null) {
                exchange.getAttributes().put("RESPONSE_NO_SERVER", true);
                return chain.filter(exchange);
            }
            // 替换路由地址为真实地址 并通过GATEWAY_REQUEST_URL_ATTR参数向下游传递
            gateway = UriComponentsBuilder.fromUri(uri).scheme(gateway.getScheme()).host(gateway.getHost()).port(gateway.getPort()).build().toUri();

            exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, gateway);
        }

        return chain.filter(exchange);
    }

    public int getOrder() {
        return 10150;
    }

    /**
     * 监听zookeeper节点变化事件
     */
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
        RandomList pathList = new RandomList();

        pathChildrenCache.getCurrentData().stream().map(ChildData::getData).map(String::new).forEach(host -> {
            URI uri = UriComponentsBuilder.fromHttpUrl("http://" + host).build().toUri();
            pathList.add(uri);
        });

        this.pathList = pathList;
    }

    /**
     * 提供随机获取方法的自定义集合 RandomList继承ArrayList 不支持并发读写
     */
    static class RandomList extends ArrayList<URI> {
        public URI getRandom() {
            if (this.size() == 0) {
                return null;
            }
            int index = (int) (System.nanoTime() % this.size());
            return this.get(index);
        }
    }
}

这个类首先实现了zookeeper的PathChildrenCacheListener监听节点变化,然后实现GlobalFilter,针对以gateway为前缀的地址转换为从zookeeper中随机获取的地址,并将新地址放入GATEWAY_REQUEST_URL_ATTR参数中,这样后续spring内置的filter就可以将请求转发到我们动态注入的服务地址了。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐