nacos服务间调用
nacos 服务调用1 Ribbon使用样例2 创建spring容器3 创建ILoadBalancer类的作用3.1 负载均衡3.3 替换IPIP1 Ribbon使用样例@Autowiredpublic TestController(RestTemplate restTemplate) {this.restTemplate = restTemplate;}@RequestMapping(value
·
1 Ribbon使用样例
@Autowired
public TestController(RestTemplate restTemplate) {this.restTemplate = restTemplate;}
@RequestMapping(value = "/echo/{str}", method = RequestMethod.GET)
public String echo(@PathVariable String str) {
return restTemplate.getForObject("http://service-provider/echo/" + str, String.class);
}
2 创建spring容器
1 RestTemplate调用getForObject过程中会创建spring容器
2 每组service集群创建一个容器
3 维护ILoadBalancer (ZoneAwareLoadBalancer) spring bean
protected AnnotationConfigApplicationContext getContext(String name) {
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
this.contexts.put(name, createContext(name));
}
}
}
return this.contexts.get(name);
}
protected AnnotationConfigApplicationContext createContext(String name) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
if (this.configurations.containsKey(name)) {
for (Class<?> configuration : this.configurations.get(name)
.getConfiguration()) {
context.register(configuration);
}
}
for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
if (entry.getKey().startsWith("default.")) {
for (Class<?> configuration : entry.getValue().getConfiguration()) {
context.register(configuration);
}
}
}
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
this.propertySourceName,
Collections.<String, Object> singletonMap(this.propertyName, name)));
if (this.parent != null) {
// Uses Environment from parent as well as beans
context.setParent(this.parent);
}
context.setDisplayName(generateDisplayName(name));
context.refresh();
return context;
}
3 创建ILoadBalancer类的作用
1 获取服务列表实例
2 实现负载均衡
3 server的name用ip替换
3.1 获取服务的list
ILoadBalancer
ZoneAwareLoadBalancer构造方法会去拿server的list
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
``
updateListOfServers方法更新服务列表
```java
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
private List<NacosServer> getServers() {
try {
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, true);
return instancesToServerList(instances);
}
catch (Exception e) {
throw new IllegalStateException(
"Can not get service instances from nacos, serviceId=" + serviceId,
e);
}
}
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
通过http请求获取server 列表
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}
启动容器拿server服务列表,定时任务也会去获取。
HostReactor 类
构造方法
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
boolean loadCacheAtStart, int pollingThreadCount) {
executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});
this.eventDispatcher = eventDispatcher;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
if (loadCacheAtStart) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
// 定时调度接收server请求
this.pushReceiver = new PushReceiver(this);
}
死循环接收udp消息
@Override
public void run() {
while (true) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
// 处理upd接收到的server信息
hostReactor.processServiceJSON(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\""
+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\""
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
public ServiceInfo processServiceJSON(String json) {
ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
//empty or error push, just ignore
return oldService;
}
if (oldService != null) {
if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime()
+ ", new-t: " + serviceInfo.getLastRefTime());
}
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) {
oldHostMap.put(host.toInetAddr(), host);
}
Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
for (Instance host : serviceInfo.getHosts()) {
newHostMap.put(host.toInetAddr(), host);
}
Set<Instance> modHosts = new HashSet<Instance>();
Set<Instance> newHosts = new HashSet<Instance>();
Set<Instance> remvHosts = new HashSet<Instance>();
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newServiceHosts) {
Instance host = entry.getValue();
String key = entry.getKey();
if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(),
oldHostMap.get(key).toString())) {
modHosts.add(host);
continue;
}
if (!oldHostMap.containsKey(key)) {
newHosts.add(host);
}
}
for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
Instance host = entry.getValue();
String key = entry.getKey();
if (newHostMap.containsKey(key)) {
continue;
}
if (!newHostMap.containsKey(key)) {
remvHosts.add(host);
}
}
if (newHosts.size() > 0) {
NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: "
+ serviceInfo.getName() + " -> " + JSON.toJSONString(newHosts));
}
if (remvHosts.size() > 0) {
NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: "
+ serviceInfo.getName() + " -> " + JSON.toJSONString(remvHosts));
}
if (modHosts.size() > 0) {
NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "
+ serviceInfo.getName() + " -> " + JSON.toJSONString(modHosts));
}
serviceInfo.setJsonFromServer(json);
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
eventDispatcher.serviceChanged(serviceInfo);
DiskCache.write(serviceInfo, cacheDir);
}
} else {
NAMING_LOGGER.info("new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getName() + " -> " + JSON
.toJSONString(serviceInfo.getHosts()));
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
eventDispatcher.serviceChanged(serviceInfo);
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir);
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getName() +
" -> " + JSON.toJSONString(serviceInfo.getHosts()));
return serviceInfo;
}
3.2 负载均衡
默认负载均衡顺序执行
/**
* Referenced from RoundRobinRule
* Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
*
* @param modulo The modulo to bound the value of the counter.
* @return The next value.
*/
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
// 获取ILoadBalancer 对象
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
// 根据负载均衡算法 获取server
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
3.3 替换IP
@Override
public URI reconstructURI(ServiceInstance instance, URI original) {
Assert.notNull(instance, "instance can not be null");
String serviceId = instance.getServiceId();
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
URI uri;
Server server;
if (instance instanceof RibbonServer) {
RibbonServer ribbonServer = (RibbonServer) instance;
server = ribbonServer.getServer();
uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
} else {
server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
uri = updateToSecureConnectionIfNeeded(original, clientConfig,
serverIntrospector, server);
}
// 用ip替换server的name
return context.reconstructURIWithServer(server, uri);
}
替换ip的全流程
public URI reconstructURIWithServer(Server server, URI original) {
String host = server.getHost();
int port = server.getPort();
String scheme = server.getScheme();
if (host.equals(original.getHost())
&& port == original.getPort()
&& scheme == original.getScheme()) {
return original;
}
if (scheme == null) {
scheme = original.getScheme();
}
if (scheme == null) {
scheme = deriveSchemeAndPortFromPartialUri(original).first();
}
try {
StringBuilder sb = new StringBuilder();
sb.append(scheme).append("://");
if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
sb.append(original.getRawUserInfo()).append("@");
}
sb.append(host);
if (port >= 0) {
sb.append(":").append(port);
}
sb.append(original.getRawPath());
if (!Strings.isNullOrEmpty(original.getRawQuery())) {
sb.append("?").append(original.getRawQuery());
}
if (!Strings.isNullOrEmpty(original.getRawFragment())) {
sb.append("#").append(original.getRawFragment());
}
URI newURI = new URI(sb.toString());
return newURI;
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)