kafka部署在虚拟机上,真集群分布式,三台机器。版本为kafka_2.8.0-0.8.0.tar.gz;我想在windows10上的myeclipse10中用java代码远程连接虚拟机上的kafka,结果却报错了:

Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:172.16.4.214,port:9092)] failed

ERROR Producer connection to slave1:9092 unsuccessful (kafka.producer.SyncProducer)

java.net.ConnectException: Connection refused


  网上有人说出现上述错误是因为没有配置advertised.host.name,但是我打开server.properties里面根本没有这个属性,只有host.name,而host.name我早已经配置为主机名,既然他没有那个属性,我硬加了一个advertised.host.name属性,值也为主机名。然后重启发现根本没用。错误照样;还有网友说kafka自带的zookeeper   jar包与我们自己安装zookeeper版本不一致,但是我更换后一致后还是不行,所以我自己研究尝试找到以下方法:

 解决方法:

1.kafka安装在3台虚拟机上,那么3台机器都要执行开启命令(#kafka-server-start.sh ./kafka/config/server.properties &),它跟zookeeper一样都需要分别启动(我这里说的zookeeper并非hbase自带),我刚开始在hadoop主节点上安装的kafka,然后将配置分发到其他从节点,我以为只要在主节点上启动kafka就行,结果证明是错的,所以报了上述错误

2.我忘了配置producer.properties,用#vi producer.properties 打开后,按i键进入编辑模式,找到metadata.broker.list这一行,去掉前面的注释,改为metadata.broker.list=master:9092,slave1:9092,slave3:9092  然后保存退出,因为真集群分布,这里必须改,另外我这里写master和slave这些名字,是因为我在三台虚拟上都配置了hosts文件(ip和主机映射),windows10上面的hosts我也配置了,如果你没配置还是写ip吧。

3.consumer.properties里面的zookeeper.connect也要改为zookeeper.connect=master:2181,slave1:2181,slave3:2181

至此运行成功

下面贴上我的代码:

在classpath(src下面)下新添两个属性文件:producer.properties和consumer.properties

producer.properties内容如下

---------------------------------------------------------------------------------------------------------------------------------------

metadata.broker.list=master:9092,slave1:9092,slave3:9092
producer.type=sync
compression.codec=0
serializer.class=kafka.serializer.StringEncoder

---------------------------------------------------------------------------------------------------------------------------------------

consumer.properties内容如下

---------------------------------------------------------------------------------------------------------------------------------------

zookeeper.connect=master:2181,slave1:2181,slave3:2181
zookeeper.connectiontimeout.ms=1000000
group.id=test-group
auto.offset.reset=smallest
auto.commit.enable=true

---------------------------------------------------------------------------------------------------------------------------------------

生产者代码

import java.io.File;
import java.io.FileInputStream;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MyProducer {

    /**
     * @param args
     */
    public static void main(String[] args) {
        Producer<String,String> inner=null;
        try {
            Properties properties=new Properties();
            properties.load(new FileInputStream(new File("E:/javaws/bigdata/src/producer.properties")));
            ProducerConfig config=new ProducerConfig(properties);
            inner=new Producer<String, String>(config);
            int i=0;
            while(true){
                KeyedMessage<String,String> km=new KeyedMessage<String, String>("test-topic",
                        "this is a sample"+i);
                inner.send(km);
                i++;
                Thread.sleep(2000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(inner!=null){
                inner.close();
            }
        }

    }

}

消费者代码

import java.io.File;
import java.io.FileInputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MyConsumer {

    /**
     * @param args
     */
    public static void main(String[] args) {
        ConsumerConfig config=null;
        ConsumerConnector connector=null;
        ExecutorService threadPool=null;
        try {
            Properties properties=new Properties();
            properties.load(new FileInputStream(new File("E:/javaws/bigdata/src/consumer.properties")));
            config=new ConsumerConfig(properties);
            connector=Consumer.createJavaConsumerConnector(config);
            Map<String,Integer> topics=new HashMap<String,Integer>();
            topics.put("test-topic",2);//第二个参数是分区数partitionsNum
            Map<String,List<KafkaStream<byte[],byte[]>>> streams=connector.createMessageStreams(topics);
            List<KafkaStream<byte[],byte[]>> partitions=streams.get("test-topic");
            threadPool=Executors.newFixedThreadPool(2);
            for(final KafkaStream<byte[],byte[]> partition:partitions){
                threadPool.execute(new MessageRunner(partition));
            }
            System.in.read();
            threadPool.shutdownNow();
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            connector.shutdown();
        }

    }

}

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐