Flink 读取和写入 MySQL 数据有两种方式:

  • 直接使用 JDBCInputFormat 和 JDBCOutputFormat
  • 继承 RichSourceFunction 和 RichSinkFunction

引入依赖:

		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.44</version>
        </dependency>

读取 MySQL 数据

在这里插入图片描述


直接使用 JDBCInputFormat
package api.source.Source6_MySQL;

import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;

public class MySQL1_InputFormat {

    public static void main(String[] args) throws Exception {

        // ToDo 0.env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // ToDo 1.source
        DataSet<Row> dataInput = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
                // 配置数据库连接信息
                .setDrivername("com.mysql.jdbc.Driver")  // JDBC驱动名
                    // com.mysql.jdbc.Driver:是 mysql-connector-java 5 中的
                    // com.mysql.cj.jdbc.Driver:是 mysql-connector-java 6及以上 中的
                .setDBUrl("jdbc:mysql://localhost:3306/demodb?serverTimezone=GMT%2B8&useSSL=false")  // 数据库URL
                    // jdbc:mysql://主机名:端口号/数据库名
                    // serverTimezone=GMT%2B8:指定时区,设置为北京时间东八区
                    // useSSL=false:MySQL 8.0以上版本 不需要建立SSL连接,需要显示关闭
                .setUsername("root")  // 用户名
                .setPassword("123456")  // 登录密码
                .setQuery("select id,name,age from t_student")  // 需要执行的SQL语句
                .setRowTypeInfo(new RowTypeInfo(  // 设置查询的列的类型
                        BasicTypeInfo.INT_TYPE_INFO,  // id:Int类型
                        BasicTypeInfo.STRING_TYPE_INFO,  // name:String类型
                        BasicTypeInfo.INT_TYPE_INFO))  // age:Int类型
                .finish());

        // ToDo 2.transformation
        DataSet<Student> dataMap = dataInput.map(new MapFunction<Row, Student>() {
            @Override
            public Student map(Row row) throws Exception {  // 转换为Student类型
                return new Student(
                        (int) row.getField(0),
                        (String) row.getField(1),
                        (int) row.getField(2));
            }
        });

        // ToDo 3.sink
        dataMap.print();

        // ToDo 4.execute
        env.execute();
    }

    @Data  // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
    @AllArgsConstructor  // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
    @NoArgsConstructor  // 注解在类上,为类提供无参构造函数
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

在这里插入图片描述


继承 RichSourceFunction

Flink 提供了数据源接口,实现该接口就可以实现自定义数据源,不同的接口有不同的功能。

数据源接口功能并行度
SourceFunction非并行数据源并行度只能=1
RichSourceFunction多功能非并行数据源并行度只能=1
ParallelSourceFunction并行数据源并行度能够>=1
RichParallelSourceFunction多功能并行数据源并行度能够>=1
package api.source.Source6_MySQL;

import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import com.mysql.jdbc.Connection;
import com.mysql.jdbc.PreparedStatement;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.DriverManager;
import java.sql.ResultSet;

public class MySQL2_SourceFunction {

    public static void main(String[] args) throws Exception {

        // ToDo 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // ToDo 1.source
        DataStream<Student> dataInput = env.addSource(new MySQLSource());

        // ToDo 2.transformation

        // ToDo 3.sink
        dataInput.print();

        // ToDo 4.execute
        env.execute();
    }

    @Data  // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
    @AllArgsConstructor  // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
    @NoArgsConstructor  // 注解在类上,为类提供无参构造函数
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }

    public static class MySQLSource extends RichSourceFunction<Student> {

        private volatile boolean isRunning = true;
        private Connection conn = null;
        private PreparedStatement ps = null;

        // 打开数据库连接,只执行一次,之后一直使用这个连接
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            Class.forName("com.mysql.jdbc.Driver");  // 加载数据库驱动
            conn = (Connection) DriverManager.getConnection(  // 获取连接
                    "jdbc:mysql://localhost:3306/demodb?serverTimezone=GMT%2B8&useSSL=false",  // 数据库URL
                    "root",  // 用户名
                    "123456");  // 登录密码
            ps = (PreparedStatement) conn.prepareStatement(  // 获取执行语句
                    "select id,name,age from t_student");  // 需要执行的SQL语句
        }

        // 执行查询并获取结果
        @Override
        public void run(SourceContext<Student> ctx) throws Exception {
            while(isRunning) {  // 使用while循环可以不断读取数据
                ResultSet resultSet = ps.executeQuery();
                while(resultSet.next()) {
                    int id = resultSet.getInt("id");
                    String name = resultSet.getString("name");
                    int age = resultSet.getInt("age");
                    ctx.collect(new Student(id,name,age));  // 以流的形式发送结果
                }
                Thread.sleep(5000);  // 每隔5秒查询一次
            }
        }

        // 取消数据生成
        @Override
        public void cancel() {
            isRunning = false;
        }

        // 关闭数据库连接
        @Override
        public void close() throws Exception {
            super.close();
            if(conn != null) conn.close();
            if(ps != null) ps.close();
        }
    }
}

在这里插入图片描述


写入 MySQL 数据


直接使用 JDBCOutputFormat
package api.sink.Sink4_MySQL;

import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.types.Row;

import java.sql.Types;
import java.util.Arrays;

public class MySQL1_OutputFormat {

    public static void main(String[] args) throws Exception {

        // ToDo 0.env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // ToDo 1.source
        DataSet<Student> dataSource = env.fromCollection(
                Arrays.asList(
                        new Student(5, "Nancy", 22),
                        new Student(6, "Emma", 20),
                        new Student(7, "Sophia", 19)
                )
        );

        // ToDo 2.transformation
        DataSet<Row> dataMap = dataSource.map(new MapFunction<Student, Row>() {
            @Override
            public Row map(Student student) throws Exception {  // 转换为Row类型
                Row row = new Row(3);  // 有3个参数
                row.setField(0, student.getId());
                row.setField(1, student.getName());
                row.setField(2, student.getAge());
                return row;
            }
        });

        // ToDo 3.sink
        dataMap.output(JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername("com.mysql.jdbc.Driver")  // JDBC驱动名
                .setDBUrl("jdbc:mysql://localhost:3306/demodb?serverTimezone=GMT%2B8&useSSL=false")  // 数据库URL
                .setUsername("root")  // 用户名
                .setPassword("123456")  // 登录密码
                .setQuery("insert into t_student(id,name,age) values(?,?,?)")  // 需要执行的SQL语句
                    // 如果数据不存在则插入,数据存在则更新,可以将SQL语句替换为以下:
                    // insert into t_student(id,name,age) values(?,?,?) on duplicate key update name=values(name),age=values(age)
                .setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.INTEGER})  // 设置查询的列的类型
                .finish());

        // ToDo 4.execute
        env.execute();
        System.out.println("MySQL写入成功!");
    }

    @Data  // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
    @AllArgsConstructor  // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
    @NoArgsConstructor  // 注解在类上,为类提供无参构造函数
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

在这里插入图片描述


继承 RichSinkFunction
package api.sink.Sink4_MySQL;

import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;

public class MySQL2_SinkFunction {

    public static void main(String[] args) throws Exception {

        // ToDo 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // ToDo 1.source
        DataStreamSource<Student> dataInput = env.fromCollection(
                Arrays.asList(
                        new Student(3, "Linda", 20),
                        new Student(6, "Allen", 19),
                        new Student(8, "Marcia", 20),
                        new Student(9, "Helen", 18)
                )
        );

        // ToDo 2.transformation

        // ToDo 3.sink
        dataInput.addSink(new MySQLSink());

        // ToDo 4.execute
        env.execute();
        System.out.println("MySQL写入成功!");
    }

    @Data  // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
    @AllArgsConstructor  // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
    @NoArgsConstructor  // 注解在类上,为类提供无参构造函数
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }

    public static class MySQLSink extends RichSinkFunction<Student> {

        private Connection conn = null;
        private PreparedStatement insertStmt = null;
        private PreparedStatement updateStmt = null;

        // 打开数据库连接,只执行一次,之后一直使用这个连接
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            Class.forName("com.mysql.jdbc.Driver");  // 加载数据库驱动
            conn = DriverManager.getConnection(  // 获取连接
                    "jdbc:mysql://localhost:3306/demodb?serverTimezone=GMT%2B8&useSSL=false",  // 数据库URL
                    "root",  // 用户名
                    "123456");  // 登录密码
            insertStmt = conn.prepareStatement(  // 获取执行语句
                    "insert into t_student(id,name,age) values (?,?,?)");  // 插入数据
            updateStmt = conn.prepareStatement(  // 获取执行语句
                    "update t_student set name=?,age=? where id=?");  // 更新数据
        }

        // 执行插入和更新
        @Override
        public void invoke(Student value, Context ctx) throws Exception {
            // 每条数据到来后,直接执行更新语句
            updateStmt.setString(1, value.getName());  // 与占位符(?)对应的参数
            updateStmt.setInt(2, value.getAge());
            updateStmt.setInt(3, value.getId());
            updateStmt.execute();  // 执行更新语句

            // 如果更新数为0,则执行插入语句
            if (updateStmt.getUpdateCount() == 0) {
                insertStmt.setInt(1, value.getId());
                insertStmt.setString(2, value.getName());
                insertStmt.setInt(3, value.getAge());
                insertStmt.execute();  // 执行插入语句
            }
        }

        // 关闭数据库连接
        @Override
        public void close() throws Exception {
            super.close();
            if(conn != null) conn.close();
            if(insertStmt != null) insertStmt.close();
            if(updateStmt != null) updateStmt.close();
        }
    }
}

在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐