一、写入kafka

Linking Denpency

  • 导入poml依赖:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>2.0.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.0.0</version>
    </dependency>
  </dependencies>

Code

  • 代码如下:
package cn.wsj.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.util.Properties;

public class Demo01_Producer {
    public void writeMsg(String msg) throws IOException {
        //通过Properties传入配置
        Properties prop = new Properties();
        //建立与kafka集群的连接
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.237.160:9092");
        //Server完成 producer request前需要确认的数量,ack=all/-1 -> leader和所有follow确认
        prop.put(ProducerConfig.ACKS_CONFIG,"all");
        //在重试发送失败的request前的等待时间
        prop.put(ProducerConfig.RETRIES_CONFIG,"0");
        //key值的序列化
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getTypeName());
        //value值得序列化
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getTypeName());
        //实例化producer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
        //创建一个无key的record
        ProducerRecord<String, String> record = new ProducerRecord<>("demo", msg);
        //发送至Topic
        producer.send(record);
        producer.close();
    }

    public static void main(String[] args) throws IOException {
        new Demo01_Producer().writeMsg("hello world");
    }
}


二、读取Kafka

Code

  • 读取上面才写入的Kafka Topic:
package cn.wsj.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class Demo02_Consumer {
    public void readMsg(){
        //通过Properties传入配置
        Properties prop = new Properties();
        //建立与kafka集群的连接
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.237.160:9092");
        //创建消费者
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"curry");
        //key值的反序列化
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getTypeName());
        //value值的反序列化
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getTypeName());
        //开启自动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //自动提交时间间隔
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        //自动重置消费组的偏移量
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        //实例化消费组
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
        //选择topic和对应分区
        TopicPartition tp = new TopicPartition("demo", 0);
        //创建list接TopicPartition
        ArrayList<TopicPartition> list = new ArrayList<TopicPartition>();
        //这里add()方法需要的参数类型是collection
        list.add(tp);
        //assign和subscribe都可以消费topic
        consumer.assign(list);
        while(true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record->
                    System.out.println(record.topic()+":"+record.partition()+":"+record.key()+":"+record.value()));
        }
    }

    public static void main(String[] args) {
        new Demo02_Consumer().readMsg();
    }
}

e.g. result:

  • 写入kafka的partition编号为0号分区,key为null,值为hello world
"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe" "...
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
demo:0:null:hello world

Process finished with exit code -1

三、多线程并发写入kafka多个分区

实现环节

让我们来捋下思路:

1.数据来源:设置本地日志文件;
2.多分区写入:根据分区数设定对应线程数;
3.Topic partiton数量如何确定:KafkaFuture获取分区数的数量;
4.文件内容分配:平均分配,文件总行数/分区数,如果有余数则放入最后一个分区;
5.如何获取文件总行数:LineNumberReader.getLineNumber()方法直接获取;
6.写入kafka:按行写入,如何在下一个分区时直接跳至对应行:使用RandomAccessFile.seek(long pos)直接跳转;


Code

  • 配置信息这里可以考虑加入一个接口,之后创建不同的类型(jdbc)也可以拿来用
package org.wsj.common;

import java.util.Map;

public interface Dbinfo {
	//获取ip
    public String getIp();
    //获取端口
    public int getPort();
    //获取连接对象名(数据库/topic)
    public String getDbName();
    //若需其他参数请使用这个额外的方法
    public Map<String,String> getOther();
}


  • 连接kafka,这里直接创建一个kafka配置的实例
package org.wsj.common;

import java.util.Map;

public class KafkaConfiguration implements Dbinfo {
    private String ip;
    private int port;
    private String dbname;

    public void setIp(String ip) {
        this.ip = ip;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public void setDbname(String dbname) {
        this.dbname = dbname;
    }

    @Override
    public String getIp() {
        return this.ip;
    }

    @Override
    public int getPort() {
        return this.port;
    }

    @Override
    public String getDbName() {
        return this.dbname;
    }

    @Override
    public Map<String, String> getOther() {
        return null;
    }
}


  • 主要方法实现类
package org.wsj.common;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.util.*;
import java.util.concurrent.ExecutionException;

public class KafkaConnector {
    private Dbinfo info;
    private int totalRow=0;
    private List<Long> rowSize = new ArrayList<>();
    Properties prop = new Properties();

    /**
     * info包含ip,port,topicName
     * @param info
     */
    public KafkaConnector(Dbinfo info) {
        this.info=info;
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,info.getIp()+":"+info.getPort());
        prop.put(ProducerConfig.ACKS_CONFIG,"all");
        prop.put(ProducerConfig.RETRIES_CONFIG,"0");
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getTypeName());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getTypeName());
    }

    /**
     * 通过路径将文件每行的offset存入数据中,方便后续跳转使用;
     * 同时获取文件总行数
     * @param path
     * @throws FileNotFoundException
     */
    private void getFileInfo(String path) throws FileNotFoundException {
        LineNumberReader reader = new LineNumberReader(new FileReader(path));
        try {
            String str = null;
            int total = 0;
            while((str=reader.readLine())!=null){
                total+=str.getBytes().length+1;
                rowSize.add((long)total);
            }
            totalRow=reader.getLineNumber();
            rowSize.add(0,0L);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 根据提供的topic获取对应的分区数量
     * @return
     */
    private int getPartitionNum(){
        AdminClient client = AdminClient.create(prop);
        DescribeTopicsResult result = client.describeTopics(Arrays.asList(info.getDbName()));
        KafkaFuture<Map<String, TopicDescription>> kf = result.all();
        int num =0;
        try {
            num = kf.get().get(info.getDbName()).partitions().size();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return num;
    }

    /**
     * 通过行数,找到数组中存取的偏移量
     * 但是数组内的offset并未计算每行的换行符,所以每行都需要在+1,这才是真正的偏移量
     * @param row
     * @return
     */
    private Long getPos(int row){
        return rowSize.get(row-1)+(row-1);
    }

    /**
     * 根据分区数列出每个分区对应的起始偏移量大小
     * 还有每个分区写入文件的行数
     * @param partitionNum
     * @return
     */
    private Map<Long,Integer> calcPosAndRow(int partitionNum){
        HashMap<Long,Integer> result = new HashMap<>();
        int row = totalRow/partitionNum;
        for (int i = 0; i < partitionNum; i++) {
            if(i==(partitionNum-1)){
                result.put(getPos(row*i+1),row+totalRow%partitionNum);
            }else{
                result.put(getPos(row*i+1),row);
            }
        }
        return result;
    }

    /**
     * 根据路径加载信息,并调用方法多线程并发写入
     * @param path
     * @throws FileNotFoundException
     */
    public void sendMessage(String path) throws FileNotFoundException {
        getFileInfo(path);
        int partitionNum = getPartitionNum();
        Map<Long, Integer> threadParams = calcPosAndRow(partitionNum);
        int thn=0;
        for (Long key : threadParams.keySet()) {
            new CustomerKafkaProducer(prop,path,key,threadParams.get(key),info.getDbName(),thn+"").start();
            thn++;
        }
    }
}


  • 新增一个partiton实例,返回key即可;
package org.wsj.common;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class SimplePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return Integer.parseInt(key.toString());
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}


  • 既然多线程写入,那就创建对应的线程实例,并重写起run()方法:
package org.wsj.common;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

public class CustomerKafkaProducer extends Thread{
    private Properties prop;
    private String path;
    private Long pos;
    private int rows;
    private String topic;

    public CustomerKafkaProducer(Properties prop, String path, Long pos, int rows, String topic,String threadName) {
        this.prop = prop;
        this.path = path;
        this.pos = pos;
        this.rows = rows;
        this.topic = topic;
        this.setName(threadName);
    }

    @Override
    public void run() {
        prop.put("partitioner.class",SimplePartitioner.class.getTypeName());
        KafkaProducer producer = new KafkaProducer(prop);
        try {
            RandomAccessFile raf = new RandomAccessFile(new File(path),"r");
            raf.seek(pos);
            for (int i = 0; i <rows ; i++) {
                //解决中文乱码
                String line = new String(raf.readLine().getBytes("iso-8859-1"), StandardCharsets.UTF_8);
                ProducerRecord record = new ProducerRecord(topic, Thread.currentThread().getName(), line);
                producer.send(record);
            }
            producer.close();
            raf.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}


  • 测试
//这里创建分区数为三的Topic:mydemo
[root@sole ~]# kafka-topics.sh --create --topic mydemo  --zookeeper 192.168.237.160:2181 --replication-factor 1 --partitions 3
public class App

{
	public static void main( String[] args ) throws IOException, ExecutionException, InterruptedException {
	KafkaConfiguration kc = new KafkaConfiguration();
        kc.setIp("192.168.237.160");
        kc.setPort(9092);
        kc.setDbname("mydemo");
        new KafkaConnector(kc).sendMessage("e://logs//log_20200110.log");
         }
}
        

  • kafka查看分区信息:

在这里插入图片描述

Tips:实际生产环境下可做成jar包,通过脚本定时调用,拉取对应离线数据放入kafka中进行消费


PS:如果有写错或者写的不好的地方,欢迎各位大佬在评论区留下宝贵的意见或者建议,敬上!如果这篇博客对您有帮助,希望您可以顺手帮我点个赞!不胜感谢!

原创作者:wsjslient

作者主页:https://blog.csdn.net/wsjslient


Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐