一、项目背景

       业务提了个需求,需要实时拿总部系统数据,结合本部数据做成标签数据,用来分析。本来是有两种方案的,第一种总部把实时数据推给我们,第二种是他们推到自己的实时集群kafka上,我们去消费。领导们讨论出来的方案是用第二种,为啥,咱也不好问。。然后我们这边决定用flink实时消费他们的kafka数据,写到我们自己集群的kafka上。。。

二、问题描述

        因为总部kafka在他们的内网,不会直接开墙让我们连,他们那边做了一层网络nat,把nat后的一段ip开了墙让我们访问。上线之前是验证过的,网络是通的,但是真正起应用的时候,一直连接不上,具体错误如下:

org.apache.kafka.clients.consumer.internals.AbstractCoordinator [consumer clientId=...,groupid=...] connection to node 192.168...:9092  could not be established. Broker may not be available.

        这时就发现你配置的 bootstrap.servers地址是nat后的,但是报错的却是nat之前的原地址。因为没开墙,导致真正读数据的时候一直连不上。

        简单的说下网络图:

总部kafka原地址(有墙)          NAT后地址(开墙了)               本部flink

ip1 -------------------------------------> n_ip1 -------------------------------------> yarn

ip2 -------------------------------------> n_ip2 -------------------------------------> yarn

ip3 -------------------------------------> n_ip3 -------------------------------------> yarn

三、kafka读写原理

        首先了解kafka读写流程,参考这篇文章:kafka存储机制与读写流程

        先建立连接,即我们配置的那个bootstrap.servers,连接建立后,会返回相关信息,如broker地址、topic、offset、partition等信息,consumer拿到后会存到元数据metadata中。metadata参考这篇文章:Kafka集群Metadata管理

四、解决方案

因为连接是成功的,只是连接后他们返回的broker是原地址,只能想办法把原地址再做一次转换。

方案一:总部配置kafka映射

        网上有这种方案的,这边直接pass,根本叫不动他们改的;

方案二:本部集群添加iptable,做网络路由

        在集群各节点做一次路由,将他们返回的原地址再路由成nat后地址。测试环境我们验证已经通过,是可行的。询问网络同事,他们不建议这么做,但是呢也没好的法子,只能答应了。但是运维那边卡住了,不给搞,因为集群好多系统在用,说风险太高,而且维护成本高,也只能放弃。

方案三:优雅的改kafka源码

        不用真正的动kafka源码,而是在程序代码根目录中添加一个package,把报错的那个源码类搞进来改下就行。即添加一段ip映射,将broker强制做一次转换。

五、实施

        需要修改如下两个地方:

​​​​​​https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/Metadata.java
https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

 放置路径:

src.main.java

                    --com.aaa...

                    --org.apache.kafka.clients

         下载下来,修改Metadata.handleMetadataResponse和AbstractCoordinator下的FindCoordinatorResponseHandler.onSuccess。

        添加路由映射

修改Metadata:

AbstractCoordinator.java
// 添加全局变量
private Map<String,String> eventIpNatMap = new HashMap<String,String>(){{
    put(ip1,n_ip1);
    put(ip2,n_ip2);
    put(ip3,n_ip3);
}};


    private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
                                                 Predicate<MetadataResponse.TopicMetadata> topicsToRetain) {
        Set<String> internalTopics = new HashSet<>();
        List<MetadataCache.PartitionInfoAndEpoch> partitions = new ArrayList<>();
        Map<Integer, Node> brokersById = metadataResponse.brokersById();


// 添加如下代码
Set<Integer> setNode=brokersById.keySet();
Iterator<Integer> iterableNode=setNode.iterator();
while(iterableNode.hasNext()){
    Integer idx=iterableNode.next();
    Node resNode=brokersById.get(idx);
    if(eventIpNatMap.containsKey(resNode.host())){
        String natIp=eventIpNatMap.get(resNode.host());
        Node newNode=new Node(resNode.id(),natIp,resNode.port(),resNode.rack());
        brokersById.put(idx.newNode);
    }
}


        for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {

修改AbstractCoordinator:

AbstractCoordinator.java
// 添加全局变量
private Map<String,String> eventIpNatMap = new HashMap<String,String>(){{
    put(ip1,n_ip1);
    put(ip2,n_ip2);
    put(ip3,n_ip3);
}};


synchronized (AbstractCoordinator.this) {
                    // use MAX_VALUE - node.id as the coordinator id to allow separate connections
                    // for the coordinator in the underlying network client layer
                    int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();

// 添加下面这段代码
String host = findCoordinatorResponse.data().host();
if (eventIpNatMap.containsKey(host)){
    host=eventIpNatMap.get(host);
}

// 将下面的findCoordinatorResponse.data().host()  改成上面替换的host

                    AbstractCoordinator.this.coordinator = new Node(
                            coordinatorConnectionId,
                            host,
                            findCoordinatorResponse.data().port());
                    log.info("Discovered group coordinator {}", coordinator);
                    client.tryConnect(coordinator);
                    heartbeat.resetSessionTimeout();
                }

六、总结

参考:

Kafka与zk的关系及连接参数bootstrap.server的正确理解_zollty的专栏-CSDN博客

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐