背景描述

两地域部署K8S集群,每个集群部署的都有网关的实例,请求通过ingrees路由到Spring cloud gateway后,gateway需要实现例如北京的请求转发到北京的实例,减少网络损耗

客户端改造

spring:
  cloud:
    nacos:
      discovery:
        metadata:
          service-zone: ${service-zone}

追加配置,通过启动参数获取,部署该实例的时候指定实例所在的区域

网关改造

@Configuration(proxyBeanMethods = false)
@LoadBalancerClients(defaultConfiguration = RJLoadBalancerConfig.class)
public class RJLoadBalancerAutoConfig {
}

@Configuration(proxyBeanMethods = false)
public class RJLoadBalancerConfig {
    private final String ZONE_KEY = "gateway.lb.zone";
    @Bean
    public ReactorServiceInstanceLoadBalancer rjLoadBalance(LoadBalancerClientFactory loadBalancerClientFactory, Environment environment) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        String gatewayZone = environment.getProperty(ZONE_KEY);
        return new RJLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class),name,gatewayZone);
    }
}

网关也通过环境变量指定了当前所在的区域

public class RJLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private final Logger log = LoggerFactory.getLogger(RJLoadBalancer.class);
    //网关所在区域
    private final String gatewayZone;
    //转发服务的名称
    private final String serviceId;
    //轮询随机数
    private final AtomicInteger position;
    //区域轮询随机数
    private final AtomicInteger zonePosition;
    private final String CLIENT_ZONE_KEY = "service-zone";
    private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;

    public RJLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, String gatewayZone) {
        this(serviceInstanceListSupplierProvider, serviceId, gatewayZone, new Random().nextInt(1000), new Random().nextInt(1000));
    }

    public RJLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, String gatewayZone, int seedPosition, int zoneSeedPosition) {
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
        this.gatewayZone = gatewayZone;
        this.serviceId = serviceId;
        this.position = new AtomicInteger(seedPosition);
        this.zonePosition = new AtomicInteger(zoneSeedPosition);
    }

    public Mono<Response<ServiceInstance>> choose(Request request) {
        ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
                .getIfAvailable(NoopServiceInstanceListSupplier::new);
        return supplier.get(request).next()
                .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
    }

    private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) {
        Response<ServiceInstance> serviceInstanceResponse = this.getInstanceResponse(serviceInstances);
        if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
            ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
        }
        return serviceInstanceResponse;
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
        if (instances.isEmpty()) {
            if (log.isWarnEnabled()) {
                log.warn("Gateway No servers available:" + serviceId);
            }
            return new EmptyResponse();
        } else {
            List<ServiceInstance> zoneHitService = instances.parallelStream().filter((service) -> {
                String zone = service.getMetadata().get(CLIENT_ZONE_KEY);
                if (gatewayZone.equals(zone)) {
                    return true;
                } else {
                    return false;
                }
            }).collect(Collectors.toList());
            if (log.isWarnEnabled()) {
                log.warn("Gateway load balance zone hit:" + JSON.toJSONString(zoneHitService));
            }
            if (zoneHitService.isEmpty()) {
                return roundRobin(position, instances);
            } else {
                return roundRobin(zonePosition, zoneHitService);
            }
        }
    }

    private Response<ServiceInstance> roundRobin(AtomicInteger position, List<ServiceInstance> instances) {
        int pos = Math.abs(position.incrementAndGet());
        return new DefaultResponse(instances.get(pos % instances.size()));
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐