#生产者

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class Producer {
    private static final String brokerList = "127.0.0.1:9092";
    private static final String topic = "xjx";

    public static Properties initNewConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");

        // 自定义分区器的使用
        //props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DefinePartitioner.class.getName());
        // 自定义拦截器使用
        //props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG,"0");
        return props;
    }


    public static void main(String[] args) throws InterruptedException {
        Properties props = initNewConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        //KafkaProducer<String, String> producer = new KafkaProducer<>(props,
        //new StringSerializer(), new StringSerializer());
        //生成 ProducerRecord 对象,并制定 Topic,key 以及 value
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record =
                    new ProducerRecord<>(topic, String.valueOf(i));
            try {
                // 1、发送消息
                producer.send(record);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        producer.close();
    }

}
package com.iaiai;

import kafka.producer.KeyedMessage;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * Created with IntelliJ IDEA.
 * Package: com.iaiai.db.service.impl
 * Author: iaiai
 * Create Time: 16/10/3 下午12:57
 * QQ: 176291935
 * Url: http://iaiai.iteye.com
 * Email: 176291935@qq.com
 * Description: 生产消息
 */
public class KafkaProducer {

    private final org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
    public final static String TOPIC = "TEST-TOPIC";

    private KafkaProducer(){
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.111:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//        props.put(ProducerConfig.ACKS_CONFIG)

        //request.required.acks
        //0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
        //1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
        //-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
//        props.put("request.required.acks","-1");

        producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
    }

    void produce() {
        int messageNo = 1;
        final int COUNT = 2;

        while (messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = "hello kafka message " + key;
            boolean sync = false;   //是否同步

            if (sync) {
                try {
                    producer.send(new ProducerRecord<String, String>(TOPIC, data)).get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } else {
                producer.send(new ProducerRecord<String, String>(TOPIC, data));
            }

            //必须写下面这句,相当于发送
            producer.flush();

            messageNo ++;
        }
    }

    public static void main( String[] args ) {
        new KafkaProducer().produce();
    }

}
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class Producer {
    public static void main(String[] args) throws Exception {

        String topic ="写自己的topic";
        Properties props = new Properties();
        props.put("bootstrap.servers", "服务器:端口,服务器:端口");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>(
                topic, "message1");


        props.put("acks", "-1");
        props.put("retries", 3);
        props.put("buffer.memory",33554432); //32M producterRecord 放到缓存池(加大吞吐量)
        props.put("batch.size", 323840); //0.3M  一个批次满了 0.3M 后就发送
        // 默认是none ,不压缩,但是也可以使用lz4压缩,效率还是不错,压缩后一个批次发送的数据就多,
        //压缩之后可以减小数据量,提升吞吐量,但是可以加到producer端cpu的开销
        props.put("compression.type","lz4");
        props.put("linger.ms", 10); // 10 ms 后发送,不管 batch.size是否被装满
        props.put("max.block.ms", 3000);

        //会出现重复数据
        props.put("retries",3);//重试次数 重试3次
        props.put("retry.backoff.ms",300);//重试间隔 每次间隔300ms

        //解决消息乱序 保证同一个时间只能发送一条消息 (如果加了重试,会导致消息重复,可以加这个参数,保证消息不乱序)
        props.put("max.in.flight.requests.per.connection" , 1);

        /**
         *
         * 如果发送消息,消息不指定key,那么我们发送的这些消息,会被轮训的发送到不同的分区。
         * 如果指定了key。发送消息的时候,客户端会根据这个key计算出来一个hash值,
         * 根据这个hash值会把消息发送到对应的分区里面。
         */

        //kafka发送数据有两种方式:
        //1:异步的方式。
        // 这是异步发送的模式
        int i = 9;
        for (int i1 = 0; i1 < i; i1++) {
            System.out.println(i1);
            ProducerRecord<String, String> record2 = new ProducerRecord<>(
                    topic, "message" + i1);

            producer.send(record2   , new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        // 消息发送成功
                        System.out.println("消息发送成功");
                    } else {
                        // 消息发送失败,需要重新发送
                        exception.printStackTrace();
                    }
                }

            });
        }


        //Thread.sleep(10 * 1000);

        //第二种方式:这是同步发送的模式
//		producer.send(record).get();
        // 你要一直等待人家后续一系列的步骤都做完,发送消息之后
        // 有了消息的回应返回给你,你这个方法才会退出来

        producer.close();
    }
}

#消费者

package kafka;

import cn.hutool.core.date.DateUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;

public class Consumer {
    public final static String groupId = "xjx4-group";
    public final static String bootstrapServers = "127.0.0.1:9092";

    public static KafkaConsumer<String, String> kafkaConsumer() {
        Properties props = new Properties();
        //设置Kafka服务器地址
        props.put("bootstrap.servers", bootstrapServers);
        //设置消费组
        props.put("group.id", groupId);
        //设置数据key的反序列化处理类
        props.put("key.deserializer", StringDeserializer.class.getName());
        //设置数据value的反序列化处理类
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        //设置偏移量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //订阅名称为“xjx”的Topic的消息
        kafkaConsumer.subscribe(Arrays.asList("xjx"));
        return kafkaConsumer;
    }

    public static void main(String[] args) {
        //从Kafka服务器中的名称为“xjx”的Topic中消费消息
        KafkaConsumer<String,String> kafkaConsumer = kafkaConsumer();
        //List<String> messages = new ArrayList<>(records.count());
        for (;;) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records.records("xjx")) {
                String value = record.value();
                System.out.print("time:"+DateUtil.formatDateTime(new Date(record.timestamp()))+",");
                System.out.print("value:"+value+",");
                System.out.println("offset:"+record.offset());
            }
            System.out.println("--------------------------------");
        }
    }
}

package com.iaiai;  
  
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Properties;  
  
import kafka.consumer.ConsumerConfig;  
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;  
import kafka.serializer.StringDecoder;  
import kafka.utils.VerifiableProperties;  
  
public class KafkaConsumer {  
  
    private final ConsumerConnector consumer;  
  
    private KafkaConsumer() {  
        Properties props = new Properties();  
        //zookeeper 配置  
        props.put("zookeeper.connect", "192.168.193.148:2181");  
  
        //group 代表一个消费组  
        props.put("group.id", "jd-group");  
  
        //zk连接超时  
        props.put("zookeeper.session.timeout.ms", "4000");  
        props.put("zookeeper.sync.time.ms", "200");  
        props.put("auto.commit.interval.ms", "1000");  
        props.put("auto.offset.reset", "smallest");  
        //序列化类  
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
  
        ConsumerConfig config = new ConsumerConfig(props);  
  
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);  
    }  
  
    void consume() {  
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));  
  
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());  
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());  
  
        Map<String, List<KafkaStream<String, String>>> consumerMap =   
                consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);  
        KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);  
        ConsumerIterator<String, String> it = stream.iterator();  
        while (it.hasNext())  
            System.out.println(it.next().message());  
    }  
  
    public static void main(String[] args) {  
        new KafkaConsumer().consume();  
    }  
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {

        String topicName    = "写自己的topic";
        String groupId      = "任意命名";
        Properties props = new Properties();
        //props.put("bootstrap.servers", "leidi01:9092,leidi02:9092,leidi03:9092");
        props.put("bootstrap.servers", "服务器:端口");

        props.put("group.id", groupId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


        /**
         * consumer心跳时间间隔,必须得与coordinator保持心跳才能知道consumer是否故障了,
         *     然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作
         *     心跳的间隔一般不要太长,1000,500
         */
        props.put("heartbeat.interval.ms", 1000); // 这个尽量时间可以短一点
        /*
         *  如果30秒才去执行下一次poll
         *  就会认为那个consumer挂了,此时会触发rebalance
         *  如果说某个consumer挂了,kafka broker感知到了,会触发一个rebalance的操作,就是分配他的分区
         *  给其他的cosumer来消费,其他的consumer如果要感知到rebalance重新分配分区,就需要通过心跳来感知
         */
        props.put("session.timeout.ms", 10 * 1000);
        /**
         * 如果在两次poll操作之间,超过了这个时间,
         * 那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,
         * 一般来说结合你自己的业务处理的性能来设置就可以了
         */
        props.put("max.poll.interval.ms", 30 * 1000); // 如果30秒才去执行下一次poll

        //获取一条消息最大字节数,一般建议设置大一些,默认是1M
        props.put("fetch.max.bytes", 10485760);
        //一次拉取500条数据, 如果说你的消费的吞吐量特别大,此时可以适当提高一些
        props.put("max.poll.records", 500);
        // 不要去回收那个socket连接 (如果设置了值,5000ms监听partition 没有数据 ,socket资源就会回收,重新建立连接耗时)
        props.put("connection.max.idle.ms", -1);

        // 开启自动提交,他只会每隔一段时间去提交一次offset
        // 如果你每次要重启一下consumer的话,他一定会把一些数据重新消费一遍
        props.put("enable.auto.commit", "true");
        // 每次自动提交offset的一个时间间隔
        props.put("auto.commit.ineterval.ms", "1000");
        /**
         * 根据偏移量消费的策略:
         *
         * 参数: auto.offset.reset
         * 1.earliest
         *     当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
         * 		topica -> partition0:1000
         * 				  partitino1:2000
         *
         *
         * 2.latest
         * 实时场景
         *     当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
         *
         * 3.none
         *     topic各分区都存在已提交的offset时,从offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
         *
         *     props.put("auto.offset.reset",latest);
         */
        props.put("auto.offset.reset", "earliest");


        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topicName));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(
                            "thread_name=" + Thread.currentThread().getName() + " " +
                                    "partition="+  record.partition() + " " +
                                    "offset=" + record.offset() + "  " +
                                    "key=" + record.key() + " " +
                                    "value="+ record.value());
                }
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐