前言

本文介绍的是如何将MySQL的数据导入到Kafka中。其实方法有很多中,可以使用kafka的插件,或者Canal采集MySQL的binlog再导入到Kafka中。这里介绍的方式是用简单的代码将MySQL导出JSON格式的文件,再使用kafka自带的命令导入。使用的Kafka版本为V2.3.0。


一、将MySQL数据导出Json格式文件

1.Maven引用

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.17</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.9</version>
</dependency>

2.导出Json数据

import com.google.gson.Gson;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;

public class MysqlToJSON {
    public static void main(String[] args) throws Exception {
    
        String driver = "com.mysql.cj.jdbc.Driver";
        String url = "jdbc:mysql://127.0.0.1:3306/test?serverTimezone=UTC";
        String user = "username";
        String pwd = "passwd";

        try {
            Class.forName(driver);
            Connection con = DriverManager.getConnection(url, user, pwd);
            Statement stet = con.createStatement();
            String sql = "select * from `test-json`";
            ResultSet rs = stet.executeQuery(sql);
            ResultSetMetaData metaData = rs.getMetaData();
            int columnCount = metaData.getColumnCount();
            int count = 1;
            while (rs.next()) {
                Gson gson = new Gson();
                Map<String, String> map = new HashMap<String, String>();
                for (int i = 1; i <= columnCount; i++) {
                    String columnName = metaData.getColumnLabel(i);
                    String value = rs.getString(columnName);
                    map.put(columnName, value);
                }
                String result = gson.toJson(map);
                System.out.println(count++);
                write(result);
            }

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }

    }

	/**
     * 写入文件
     * @param content 写入内容
     */
    public static void write(String content) {
        FileWriter fw = null;
        try {
            //如果文件存在,则追加内容;如果文件不存在,则创建文件
            File f = new File("./topic-test-json.txt");
            fw = new FileWriter(f, true);
        } catch (IOException e) {
            e.printStackTrace();
        }
        PrintWriter pw = new PrintWriter(fw);
        pw.println(content);
        pw.flush();
        try {
            fw.flush();
            pw.close();
            fw.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

3.运行代码导出数据

运行完成后,会将文件输出到代码目录下,文件中每一行是一个Json对象。

(1)数据库的结构和数据如下图:

在这里插入图片描述

(2)生成的文件内容如下:

在这里插入图片描述

二、使用Kafka命令将文件导入到topic中

1.创建topic

代码如下(示例):

bin/kafka-topics.sh  --create --topic json-test1 \
--zookeeper 172.11.11.11:2181,172.11.11.12:2181,172.11.11.13:2181 \
--partitions 3 --replication-factor 1

2.导入数据

代码如下(示例):

bin/kafka-console-producer.sh --broker-list 172.11.11.11:9092 \
--topic json-test1 < /usr/local/topic-test-json.txt

3.消费数据进行验证

bin/kafka-console-consumer.sh --bootstrap-server 172.11.11.11:9092 --topic json-test1 --from-beginning

总结

以上是MySQL数据导入到Kafka的过程,这种场景大多数是用来测试使用,真正的线上环境很少使用,如有描述不当,烦请指正。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐