关键词:负载均衡    Spring Cloud LoadBalancer    


Spring Cloud不仅提供了使用Ribbon进行客户端负载均衡,还提供了Spring Cloud LoadBalancer。相比较于Ribbon,Spring Cloud LoadBalancer不仅能够支持RestTemplate,还支持WebClient。WeClient是Spring Web Flux中提供的功能,可以实现响应式异步请求,因此学习Spring Cloud LoadBalancer之前,建议先了解下Spring Web Flux。


█ 使用

公共依赖

Spring Cloud依赖,版本是Hoxton.RELEASE。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>Hoxton.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

搭建服务注册中心-Eureka

关于Eureka的使用,请戳《服务注册与发现-Spring Cloud Netflix-Eureka

搭建服务端

(1)引入依赖:

<dependency>
    <!-- 使用web,使用Spring MVC对外提供服务   -->
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <!-- Eureka客户端,用于向Eureka服务端注册服务 -->
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

(2)创建Controller,对外提供访问接口:

@RestController
@RequestMapping("/base")
public class BaseController {

    @Value("${server.port}")
    private Integer port;

    @GetMapping("/string")
    public String getString() {
        return "my server get string ---->"+port;
    }

}

(3)编写application.yml配置:

# 服务端口号
server:
  port: 8081

# 服务名
spring:
  application:
    name: myServer

# Eureka
eureka:
  client:
    # 不从Eureka注册中心获取服务列表
    fetch-registry: false
    # 向Eureka注册中心注册当前服务
    register-with-eureka: true
    # Eureka服务地址
    service-url:
      defaultZone: http://localhost:8899/eureka

(4)分别以两个不同的端口号启动服务端项目,比如:8081、8082

搭建客户端

(1)引入依赖:

<dependency>
    <!-- Spring Cloud loadbalancer 负载均衡-->
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<dependency>
    <!-- Eureka客户端,用于向Eureka服务端获取已注册服务列表 -->
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

(2)编写application.yml配置:

server:
  port: 80

eureka:
  client:
    fetch-registry: true
    register-with-eureka: false
    service-url:
      defaultZone: http://localhost:8899/eureka

在《服务负载均衡-Spring Cloud Netflix-Ribbon》中,Spring Cloud提供了@LoadBalanced注解配合RestTemplate使用Spring Cloud Ribbon实现了客户端的负载均衡。对于Spring Cloud LoadBalancer也可以使用相同的方式。另外Spring Cloud LoadBalancer也提供了接合Spring Web Flux的负载均衡,如果你客户端使用Spring Web Flux完成服务端的访问,也只需相似的配置就能实现客户端负载均衡。

如果在项目的类路径下存在Spring Cloud Ribbon相关的类,需要通过配置关闭Ribbon功能,因为Spring Cloud默认优先使用Ribbon:

spring:
  cloud:
    loadbalancer:
      ribbon:
        enabled: false

(3)客户端调用服务端接口

客户端可以使用两种方式访问服务端接口,一种是使用RestTemplate,一种是WebClient。

  • RestTemplate

①引入依赖:

<dependency>
    <!-- 使用web,使用Spring MVC对外提供服务   -->
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

②编写配置,创建RestTemplate类型的Bean:

需要在方法上标注@LoadBalanced注解。

@Configuration
public class LoadbalanceConfiguration {

    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

}

③编写Controller,客户端实现访问服务端资源,并对外提供访问接口:

通过@Autowired,会注入第①个步骤中创建的RestTemplate实例。

@RestController
@RequestMapping("/rt/client")
public class RestClient {

    // myServer是服务端的服务名,spring.application.name的配置
    private static final String BASE_URL = "http://myServer";

    @Autowired
    private RestTemplate restTemplate;

    @GetMapping("/getString")
    public String getString() {
        // 调用服务端提供的接口
        return restTemplate.getForObject(BASE_URL+"/base/string", String.class);
    }

}

④启动服务端,调用接口:localhost:80/rt/client/getString

访问客户端接口,客户端通过负载均衡,选择一个可用的服务端接口调用。

  • WebClient

WebClient是Spring Web Flux中提供的类,通过使用WebClient可以通过响应式编程的方式异步访问服务端接口。

WebClient.Bulider是WebClient的内部类,也是编程的入口。

①引入依赖:

<dependency>
    <!-- Spring Webflux响应式web编程-->
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

②编写配置,创建WebClient.Bulider类型的Bean:

同样需要在方法上标注@LoadBalanced注解。

@Configuration
public class LoadbalanceConfiguration {

    @Bean
    @LoadBalanced
    public WebClient.Builder builder() {
        return WebClient.builder();
    }
    
}

③编写Controller,客户端实现访问服务端资源,并对外提供访问接口:

@RestController
@RequestMapping("/rx/client")
public class ReactiveClient {

    private static final String BASE_URL = "http://myServer";

    @Autowired
    private WebClient.Builder clientBuilder;

    @GetMapping("/getString")
    public Mono<String> getServerString() {
        return clientBuilder.baseUrl(BASE_URL).build().get().uri("/base/string").retrieve().bodyToMono(String.class);
    }

}

█ 原理-RestTemplate

分别从RestTemplate与WebClient查看实现负载均衡的原理,两者的实现原理思想相同,都是通过对客户端工具类添加相应的拦截器,在拦截器中完成负载均衡的。

和使用Spring Cloud Ribbon进行负载均衡一样,Spring Cloud LoadBalancer也是通过对RestTemplate添加拦截器实现的。RestTemplate提供了一个方法setInterceptors,用于设置拦截器,拦截器需要实现ClientHttpRequestInterceptor接口即可,在实际远程去请求服务端接口之前会先调用拦截器的intercept方法逻辑。这里的拦截器可以理解成相当于Servlet技术中的Filter功能。

// RestTemplate#setInterceptors
public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
   // Take getInterceptors() List as-is when passed in here
   if (this.interceptors != interceptors) {
      this.interceptors.clear();
      this.interceptors.addAll(interceptors);
      AnnotationAwareOrderComparator.sort(this.interceptors);
   }
}

(1)LoadBalancerInterceptor

LoadBalancerInterceptor实现了ClientHttpRequestInterceptor接口,实现intercept方法,用于实现负载均衡的拦截处理:

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
   // 负载均衡客户端
   private LoadBalancerClient loadBalancer;

   private LoadBalancerRequestFactory requestFactory;

   public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
         LoadBalancerRequestFactory requestFactory) {
      this.loadBalancer = loadBalancer;
      this.requestFactory = requestFactory;
   }

   public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
      // for backwards compatibility
      this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
   }

   @Override
   public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
         final ClientHttpRequestExecution execution) throws IOException {
      final URI originalUri = request.getURI();
      String serviceName = originalUri.getHost();
      Assert.state(serviceName != null,
            "Request URI does not contain a valid hostname: " + originalUri);
      return this.loadBalancer.execute(serviceName,
            this.requestFactory.createRequest(request, body, execution));
   }

}

(2)LoadBalancerClient

负载均衡客户端,用于进行负载均衡逻辑,从服务列表中选择出一个服务地址进行调用。对于Spring Cloud Ribbon,在LoadBalancerInterceptor中持有的LoadBalancerClient实现对象是RibbonLoadBalancerClient。在Spring Cloud LoadBalancer中,类型则是BlockingLoadBalancerClient。调用BlockingLoadBalancerClient的execute方法:

public class BlockingLoadBalancerClient implements LoadBalancerClient {
   
   // 通过工厂类,获取具体的负载均衡器
   private final LoadBalancerClientFactory loadBalancerClientFactory;

   @Override
   public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
         throws IOException {
      ServiceInstance serviceInstance = choose(serviceId);
      if (serviceInstance == null) {
         throw new IllegalStateException("No instances available for " + serviceId);
      }
      return execute(serviceId, serviceInstance, request);
   }
   ......
}

(3)LoadBalancerClientFactory

BlockingLoadBalancerClient中持有的LoadBalancerClientFactory,通过调用其getInstance方法获取具体的负载均衡器。负载均衡器实现了不同的负载均衡算法,比如轮询、随机等。在Spring Cloud Ribbon中,RibbonLoadBalancerClient中持有的是SpringClientFactory。LoadBalancerClientFactory与SpringClientFactory共同继承了抽象类NamedContextFactory。都是根据请求的url中服务名创建一个ApplicationContext,将其作为Spring ApplicationConetxt的子容器:

public class LoadBalancerClientFactory
      extends NamedContextFactory<LoadBalancerClientSpecification>
      implements ReactiveLoadBalancer.Factory<ServiceInstance> {

   /**
    * Property source name for load balancer.
    */
   public static final String NAMESPACE = "loadbalancer";

   /**
    * Property for client name within the load balancer namespace.
    */
   public static final String PROPERTY_NAME = NAMESPACE + ".client.name";

   public LoadBalancerClientFactory() {
      super(LoadBalancerClientConfiguration.class, NAMESPACE, PROPERTY_NAME);
   }

   public String getName(Environment environment) {
      return environment.getProperty(PROPERTY_NAME);
   }

   @Override
   public ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) {
      return getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
   }

}

(4)ReactiveLoadBalancer

负载均衡器,实现选择服务进行调用。

public interface ReactiveLoadBalancer<T> {

   Request REQUEST = new DefaultRequest();

   Publisher<Response<T>> choose(Request request);

   default Publisher<Response<T>> choose() { // conflicting name
      return choose(REQUEST);
   }

   @FunctionalInterface
   interface Factory<T> {

      ReactiveLoadBalancer<T> getInstance(String serviceId);

   }

}

Spring Cloud LoadBalancer提供了ReactiveLoadBalancer子接口ReactorLoadBalancer,ReactorLoadBalancer的子接口ReactorServiceInstanceLoadBalancer。提供了一个默认的实现类RoundRobinLoadBalancer,实现了轮询负载均衡算法。可见Spring Cloud LoadBalancer在Hoxton.RELEASE版本对WebClient默认只提供了一种负载均衡算法。

(5)LoadBalancerRequestFactory

LoadBalancerRequest工厂类,用于创建LoadBalancerRequest,调用createRequest方法。在内部持有LoadBalancerClient属性对象,在Spring Cloud Ribbon中是RibbonLoadBalancerClient,在Spring Cloud LoadBalancer中是BlockingLoadBalancerClient

public class LoadBalancerRequestFactory {
   // 负载均衡客户端
   private LoadBalancerClient loadBalancer;

   private List<LoadBalancerRequestTransformer> transformers;
   ......
   public LoadBalancerRequest<ClientHttpResponse> createRequest(
         final HttpRequest request, final byte[] body,
         final ClientHttpRequestExecution execution) {
      // 返回LoadBalancerRequest,这里的书写方式是使用了lamda表达式,创建匿名内部类。       
      return instance -> {
         HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
               this.loadBalancer);
         if (this.transformers != null) {
            for (LoadBalancerRequestTransformer transformer : this.transformers) {
               serviceRequest = transformer.transformRequest(serviceRequest,
                     instance);
            }
         }
         return execution.execute(serviceRequest, body);
      };
   }

}

(6)LoadBalancerClientConfiguration

主要就是创建负载均衡器ReactorLoadBalancer,默认的全局配置类。会在根据接口地址url中服务名创建ApplicationContext时完成加载配置。

@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(
      Environment environment,
      LoadBalancerClientFactory loadBalancerClientFactory) {
   String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
   return new RoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name,
         ServiceInstanceListSupplier.class), name);
}

自动配置:

(1)在org.springframework.cloud:spring-cloud-loadbalancer的jar包下META-INF文件夹的spring.factories中:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration,\
org.springframework.cloud.loadbalancer.config.BlockingLoadBalancerClientAutoConfiguration

LoadBalancerAutoConfiguration

无论是Spring Cloud Ribbon,还是Spring Cloud LoadBalancer for RestTemplate,还是Spring Cloud LoadBalancer for WebClient,这个配置类都是公用的。

作用是创建LoadBalancerClientFactory的Bean。其中的属性configurations会根据注解@LoadBalancerClient或@LoadBalancerClients的配置值创建LoadBalancerClientSpecification对象,并通过LoadBalancerAutoConfiguration的构造器注入,最后传递给LoadBalancerClientFactory:

@Configuration(proxyBeanMethods = false)
@LoadBalancerClients
@AutoConfigureBefore({ ReactorLoadBalancerClientAutoConfiguration.class,
      LoadBalancerBeanPostProcessorAutoConfiguration.class,
      ReactiveLoadBalancerAutoConfiguration.class })
public class LoadBalancerAutoConfiguration {

   private final ObjectProvider<List<LoadBalancerClientSpecification>> configurations;

   public LoadBalancerAutoConfiguration(
         ObjectProvider<List<LoadBalancerClientSpecification>> configurations) {
      this.configurations = configurations;
   }

   @Bean
   public LoadBalancerClientFactory loadBalancerClientFactory() {
      LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory();
      clientFactory.setConfigurations(
            this.configurations.getIfAvailable(Collections::emptyList));
      return clientFactory;
   }

}

BlockingLoadBalancerClientAutoConfiguration

主要职责就是创建BlockingLoadBalancerClient,前提是存在RestTemplate,并且spring.cloud.loadbalancer.ribbon.enabled=false以及不存在类org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient,也就是没有引入Spring Cloud Ribbon相关依赖。从这里可见,如果同时存在Spring Cloud Ribbon和Spring Cloud LoadBalancer的相关类,会优先使用Ribbon做负载均衡。

@Configuration(proxyBeanMethods = false)
@LoadBalancerClients
@AutoConfigureAfter(LoadBalancerAutoConfiguration.class)
@AutoConfigureBefore({
      org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration.class,
      AsyncLoadBalancerAutoConfiguration.class })
public class BlockingLoadBalancerClientAutoConfiguration {
    ......
    @Configuration(proxyBeanMethods = false)
   @ConditionalOnClass(RestTemplate.class)
   @Conditional(OnNoRibbonDefaultCondition.class)
   protected static class BlockingLoadbalancerClientConfig {

       @Bean
       @ConditionalOnBean(LoadBalancerClientFactory.class)
       @Primary
       public BlockingLoadBalancerClient blockingLoadBalancerClient(
            LoadBalancerClientFactory loadBalancerClientFactory) {
          return new BlockingLoadBalancerClient(loadBalancerClientFactory);
       }

   }
   ......
}

(2)在org.springframework.cloud:spring-cloud-commons的jar包下META-INF文件夹的spring.factories中:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration

LoadBalancerAutoConfiguration

LoadBalancerAutoConfiguration用于对RestTemplate设置拦截器。Spring Cloud Ribbon同样也是使用这个配置类完成同样的功能的。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
   
   // 通过@LoadBalanced与@Autowired,会自动注入被@LoadBalanced注解标注的RestTemplate对象
   @LoadBalanced
   @Autowired(required = false)
   private List<RestTemplate> restTemplates = Collections.emptyList();

   @Autowired(required = false)
   private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

   // Spring在初始化bean的时候,会调用SmartInitializingSingleton的afterSingletonsInstantiated方法
   @Bean
   public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
         final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
      // 实现afterSingletonsInstantiated方法:       
      return () -> restTemplateCustomizers.ifAvailable(customizers -> {
         for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
            for (RestTemplateCustomizer customizer : customizers) {
               // 调用customize方法 
               customizer.customize(restTemplate);
            }
         }
      });
   }

   @Bean
   @ConditionalOnMissingBean
   public LoadBalancerRequestFactory loadBalancerRequestFactory(
         LoadBalancerClient loadBalancerClient) {
      return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
   }

   @Configuration(proxyBeanMethods = false)
   @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
   static class LoadBalancerInterceptorConfig {
      
      // 构建LoadBalancerInterceptor的bean,通过参数自动注入当前Spring ApplicationContext中
      // 具有的LoadBalancerClient与LoadBalancerRequestFactory类型的bean
      @Bean
      public LoadBalancerInterceptor ribbonInterceptor(
            LoadBalancerClient loadBalancerClient,
            LoadBalancerRequestFactory requestFactory) {
         return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
      }

      @Bean
      @ConditionalOnMissingBean
      public RestTemplateCustomizer restTemplateCustomizer(
            final LoadBalancerInterceptor loadBalancerInterceptor) {
         return restTemplate -> {
            List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                  restTemplate.getInterceptors());
            list.add(loadBalancerInterceptor);
            // 通过setInterceptors方法设置拦截器
            restTemplate.setInterceptors(list);
         };
      }

   }
}   

█ 原理-WebClient

与Spring Cloud Ribbon不同的是,Spring Cloud LoadBalancer还支持响应式客户端的负载均衡,即Spring Cloud LoadBalancer结合Spring Web Flux实现客户端负载均衡调用。

首先来看看如何使用WebClient完成对服务端接口的调用:

Mono<String> mono = WebClient.builder().baseUrl("localhost:8080/").build().get().uri("/base/string").retrieve().bodyToMono(String.class);

代码看上去是不是一气呵成,流式编程便是响应式编程的一种代码风格。通过上面的代码只是完成了请求内容的构建,实际上并没有发起接口的请求,通过调用Mono的subscribe方法触发异步请求。

// 通过WebClient获取Builder对象
WebClient.builder().
    // 服务器地址
    baseUrl("localhost:8080/").build().
    // get请求
    get().
    // 接口地址
    uri("/base/string").retrieve().
    // 接口响应的内容是String类型的 
    bodyToMono(String.class);

Spring Cloud LoadBalancer实现对WebClient的负载均衡,也是通过设置拦截器实现的。通过WebClient.Builder的filter设置拦截器:

public Builder filter(ExchangeFilterFunction filter) {
    Assert.notNull(filter, "ExchangeFilterFunction must not be null");
    this.initFilters().add(filter);
    return this;
}

(1)ReactorLoadBalancerExchangeFilterFunction

ReactorLoadBalancerExchangeFilterFunction实现了ExchangeFilterFunction接口,通过实现filter方法完成负载均衡的功能:

public class ReactorLoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {

   private static final Log LOG = LogFactory
         .getLog(ReactorLoadBalancerExchangeFilterFunction.class);

   private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory;

   public ReactorLoadBalancerExchangeFilterFunction(
         ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {
      this.loadBalancerFactory = loadBalancerFactory;
   }

   @Override
   public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
      URI originalUrl = request.url();
      String serviceId = originalUrl.getHost();
      if (serviceId == null) {
         String message = String.format(
               "Request URI does not contain a valid hostname: %s",
               originalUrl.toString());
         if (LOG.isWarnEnabled()) {
            LOG.warn(message);
         }
         return Mono.just(
               ClientResponse.create(HttpStatus.BAD_REQUEST).body(message).build());
      }
      return choose(serviceId).flatMap(response -> {
         ServiceInstance instance = response.getServer();
         if (instance == null) {
            String message = serviceInstanceUnavailableMessage(serviceId);
            if (LOG.isWarnEnabled()) {
               LOG.warn(message);
            }
            return Mono.just(ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE)
                  .body(serviceInstanceUnavailableMessage(serviceId)).build());
         }

         if (LOG.isDebugEnabled()) {
            LOG.debug(String.format(
                  "Load balancer has retrieved the instance for service %s: %s",
                  serviceId, instance.getUri()));
         }
         ClientRequest newRequest = buildClientRequest(request,
               LoadBalancerUriTools.reconstructURI(instance, originalUrl));
         return next.exchange(newRequest);
      });
   }

   private Mono<Response<ServiceInstance>> choose(String serviceId) {
      ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerFactory
            .getInstance(serviceId);
      if (loadBalancer == null) {
         return Mono.just(new EmptyResponse());
      }
      return Mono.from(loadBalancer.choose());
   }

   private String serviceInstanceUnavailableMessage(String serviceId) {
      return "Load balancer does not contain an instance for the service " + serviceId;
   }

   private ClientRequest buildClientRequest(ClientRequest request, URI uri) {
      return ClientRequest.create(request.method(), uri)
            .headers(headers -> headers.addAll(request.headers()))
            .cookies(cookies -> cookies.addAll(request.cookies()))
            .attributes(attributes -> attributes.putAll(request.attributes()))
            .body(request.body()).build();
   }

}

(2)ReactiveLoadBalancer.Factory

ReactiveLoadBalancer.Factory是ReactiveLoadBalancer的内部工厂类,用于获取ReactiveLoadBalancer

@FunctionalInterface
interface Factory<T> {

   ReactiveLoadBalancer<T> getInstance(String serviceId);

}

(3)ReactiveLoadBalancer

与RestTemplate中一样。

自动配置:

在org.springframework.cloud:spring-cloud-commons的jar包下META-INF文件夹的spring.factories中:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerBeanPostProcessorAutoConfiguration,\
org.springframework.cloud.client.loadbalancer.reactive.ReactorLoadBalancerClientAutoConfiguration

(1)ReactorLoadBalancerClientAutoConfiguration

创建ReactorLoadBalancerExchangeFilterFunction对象bean。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(WebClient.class)
@ConditionalOnBean(ReactiveLoadBalancer.Factory.class)
public class ReactorLoadBalancerClientAutoConfiguration {
    ......
    @Configuration(proxyBeanMethods = false)
    @Conditional(OnNoRibbonDefaultCondition.class)
    protected static class ReactorLoadBalancerExchangeFilterFunctionConfig {

       @Bean
       public ReactorLoadBalancerExchangeFilterFunction loadBalancerExchangeFilterFunction(
             ReactiveLoadBalancer.Factory loadBalancerFactory) {
          return new ReactorLoadBalancerExchangeFilterFunction(loadBalancerFactory);
       }

    }
}

(2)LoadBalancerBeanPostProcessorAutoConfiguration

将创建的ReactorLoadBalancerExchangeFilterFunction拦截器设置到WebClient.Builder对象中。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(WebClient.class)
@Conditional(LoadBalancerBeanPostProcessorAutoConfiguration.OnAnyLoadBalancerImplementationPresentCondition.class)
public class LoadBalancerBeanPostProcessorAutoConfiguration {
   
   // 根据下面的方法创建的DeferringLoadBalancerExchangeFilterFunction来创建LoadBalancerWebClientBuilderBeanPostProcessor
   @Bean
   public LoadBalancerWebClientBuilderBeanPostProcessor loadBalancerWebClientBuilderBeanPostProcessor(
         DeferringLoadBalancerExchangeFilterFunction deferringExchangeFilterFunction,
         ApplicationContext context) {
      return new LoadBalancerWebClientBuilderBeanPostProcessor(
            deferringExchangeFilterFunction, context);
   }

   @Configuration(proxyBeanMethods = false)
   @Conditional(ReactorLoadBalancerClientAutoConfiguration.OnNoRibbonDefaultCondition.class)
   @ConditionalOnBean(ReactiveLoadBalancer.Factory.class)
   protected static class ReactorDeferringLoadBalancerFilterConfig {
      
      // 根据容器中的ReactorLoadBalancerExchangeFilterFunction,创建DeferringLoadBalancerExchangeFilterFunction
      @Bean
      @Primary
      DeferringLoadBalancerExchangeFilterFunction<ReactorLoadBalancerExchangeFilterFunction> reactorDeferringLoadBalancerExchangeFilterFunction(
            ObjectProvider<ReactorLoadBalancerExchangeFilterFunction> exchangeFilterFunctionProvider) {
         return new DeferringLoadBalancerExchangeFilterFunction<>(
               exchangeFilterFunctionProvider);
      }

   }
}   

LoadBalancerWebClientBuilderBeanPostProcessor

后置处理器,执行拦截器的设置。

public class LoadBalancerWebClientBuilderBeanPostProcessor implements BeanPostProcessor {
   ......
   @Override
   public Object postProcessBeforeInitialization(Object bean, String beanName)
         throws BeansException {
      if (bean instanceof WebClient.Builder) {
         if (context.findAnnotationOnBean(beanName, LoadBalanced.class) == null) {
            return bean;
         }
         // 调用filter方法设置
         ((WebClient.Builder) bean).filter(exchangeFilterFunction);
      }
      return bean;
   }

}

█ 自定义配置

自定义负载均衡器,实现自定义的负载均衡算法。通过注解@LoadBalancerClient@LoadBalancerClients。在Spring Cloud Ribbon中是通过注解@RibbonClient@RibbonClients完成的。在《服务负载均衡-Spring Cloud Netflix-Ribbon》提到,若自定义的配置时放在@ComponentScan的扫描范围内的话,该配置类会被设置成全局配置,因为会被放进Spring ApplicaionContext中。若不在@ComponentScan的扫描范围内,会被放进根据请求的url中服务名创建的对应的ApplicationContext中,只对当前服务名有效,此时@LoadBalancerClient中的name或value属性必须与请求的url中的服务名相同,否则找不到对应的上下文配置。

自定义负载均衡器:

自定义负载均衡器,不能通过只实现ReactiveLoadBalancer接口,需要去实现它的子接口ReactorServiceInstanceLoadBalancer,因为去获取负载均衡器实例的时候,是通过去容器中查找ReactorServiceInstanceLoadBalancer类型的bean来实现的:

// LoadBalancerClientFactory#ReactiveLoadBalancer
@Override
public ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) {
   return getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
}

给MyServer服务定制化负载均衡器:

(1)新建负载均衡器,简单实现了随机算法:

public class CustomRandomLoadBalancerClient implements ReactorServiceInstanceLoadBalancer {

    // 服务列表
    private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;

    public CustomRandomLoadBalancerClient(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider) {
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
    }

    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable();
        return supplier.get().next().map(this::getInstanceResponse);
    }

    /**
     * 使用随机数获取服务
     * @param instances
     * @return
     */
    private Response<ServiceInstance> getInstanceResponse(
            List<ServiceInstance> instances) {
        System.out.println("进来了");
        if (instances.isEmpty()) {
            return new EmptyResponse();
        }

        System.out.println("进行随机选取服务");
        // 随机算法
        int size = instances.size();
        Random random = new Random();
        ServiceInstance instance = instances.get(random.nextInt(size));

        return new DefaultResponse(instance);
    }

}

(2)创建配置类:

@Configuration
public class CustomLoadBalancerClientConfiguration {

    // 参数 serviceInstanceListSupplierProvider 会自动注入
    @Bean
    public ReactorServiceInstanceLoadBalancer customLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider) {
        return new CustomRandomLoadBalancerClient(serviceInstanceListSupplierProvider);
    }

}

(3)在项目启动类上添加@LoadBalancerClient注解:

name值一定要使用服务端配置的服务名(spring.application.name),通过configuration指定自定义的配置

@SpringBootApplication
@LoadBalancerClient(name = "myServer", configuration = CustomLoadBalancerClientConfiguration.class)
public class BlockClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(BlockClientApplication.class, args);
    }

}

Spring Cloud LoadBalancer与Spring Cloud Ribbon通过RestTemplate做负载均衡的比较:

(1)都是使用LoadBalancerInterceptor作为RestTemplate的拦截器。

(2)在LoadBalancerInterceptor中持有LoadBalancerClient对象,在Spring Cloud LoadBalancer中是BlockingLoadBalancerClient,在Spring Cloud Ribbon中是RibbonLoadBalancerClient。

(3)LoadBalancerClient中持有NamedContextFactory对象,在Spring Cloud LoadBalancer中是LoadBalancerClientFactory,在Spring Cloud Ribbon中是SpringClientFactory。

(4)Spring Cloud LoadBalancer通过实现ReactorServiceInstanceLoadBalancer接口自定义负载均衡器,Spring Cloud Ribbon通过实现ILoadBalancer接口。

(5)Spring Cloud LoadBalancer通过注解@LoadBalancerClient或@LoadBalancerClients实现自定义配置,Spring Cloud Ribbon也可以使用这两个注解,另外还可以使用@RibbonClient或@RibbonClients。

(6)Spring Cloud LoadBalancer支持响应式编程负载均衡,即结合Spring Web Flux使用,Spring Cloud Ribbon是不支持的。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐