在Spark Streaming连接Kafka的时候出现了WARN NetworkClient: [Consumer clientId=consumer-g1-1, groupId=g1] Error connecting to node,记录一下,也给大家排坑。

 

首先介绍一下kafka的状况,kafka正常运行,开启了zk和kafka,可以正常创建topic,开生产者和消费者可以正常消费。

streaming代码在另一台机器上正常运行,但是换了电脑,环境明明一样,但是却无法消费,报错如下:

WARN NetworkClient: [Consumer clientId=consumer-g1-1, groupId=g1] Error connecting to node iZuf6fp7azbufaq6loxytsZ:9092 (id: 2147483647 rack: null)
java.net.UnknownHostException: iZuf6fp7azbufaq6loxytsZ
	at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
	at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
	at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
	at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
	at java.net.InetAddress.getAllByName(InetAddress.java:1193)
	at java.net.InetAddress.getAllByName(InetAddress.java:1127)
	at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
	at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
	at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
	at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:958)
	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:575)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:816)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:796)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:237)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:485)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1232)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1165)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:171)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:259)
	at org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)
	at org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)
	at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
	at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
	at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
	at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)
	at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)

可以看到第一行重点是Error connecting to node iZuf6fp7azbufaq6loxytsZ:9092 ,其中乱码部分是我的服务器名称。

 

接下来是解决方法:

windows打开C:\Windows\System32\drivers\etc下的hosts文件(需要管理员权限),添加 : IP iZuf6fp7azbufaq6loxytsZ

注意中间有空格,这是将服务器IP与名字映射。

linux下打开/etc/hosts,同样的配置。

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐