学习笔记:Flink 读取和写入MySQL数据
实际开发中,经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用 Flink 自定义数据源从MySQL
·
Flink 读取和写入 MySQL 数据有两种方式:
- 直接使用 JDBCInputFormat 和 JDBCOutputFormat
- 继承 RichSourceFunction 和 RichSinkFunction
引入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
读取 MySQL 数据
直接使用 JDBCInputFormat
package api.source.Source6_MySQL;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
public class MySQL1_InputFormat {
public static void main(String[] args) throws Exception {
// ToDo 0.env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// ToDo 1.source
DataSet<Row> dataInput = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
// 配置数据库连接信息
.setDrivername("com.mysql.jdbc.Driver") // JDBC驱动名
// com.mysql.jdbc.Driver:是 mysql-connector-java 5 中的
// com.mysql.cj.jdbc.Driver:是 mysql-connector-java 6及以上 中的
.setDBUrl("jdbc:mysql://localhost:3306/demodb?serverTimezone=GMT%2B8&useSSL=false") // 数据库URL
// jdbc:mysql://主机名:端口号/数据库名
// serverTimezone=GMT%2B8:指定时区,设置为北京时间东八区
// useSSL=false:MySQL 8.0以上版本 不需要建立SSL连接,需要显示关闭
.setUsername("root") // 用户名
.setPassword("123456") // 登录密码
.setQuery("select id,name,age from t_student") // 需要执行的SQL语句
.setRowTypeInfo(new RowTypeInfo( // 设置查询的列的类型
BasicTypeInfo.INT_TYPE_INFO, // id:Int类型
BasicTypeInfo.STRING_TYPE_INFO, // name:String类型
BasicTypeInfo.INT_TYPE_INFO)) // age:Int类型
.finish());
// ToDo 2.transformation
DataSet<Student> dataMap = dataInput.map(new MapFunction<Row, Student>() {
@Override
public Student map(Row row) throws Exception { // 转换为Student类型
return new Student(
(int) row.getField(0),
(String) row.getField(1),
(int) row.getField(2));
}
});
// ToDo 3.sink
dataMap.print();
// ToDo 4.execute
env.execute();
}
@Data // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
@AllArgsConstructor // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
@NoArgsConstructor // 注解在类上,为类提供无参构造函数
public static class Student {
private Integer id;
private String name;
private Integer age;
}
}
继承 RichSourceFunction
Flink 提供了数据源接口,实现该接口就可以实现自定义数据源,不同的接口有不同的功能。
数据源接口 | 功能 | 并行度 |
---|---|---|
SourceFunction | 非并行数据源 | 并行度只能=1 |
RichSourceFunction | 多功能非并行数据源 | 并行度只能=1 |
ParallelSourceFunction | 并行数据源 | 并行度能够>=1 |
RichParallelSourceFunction | 多功能并行数据源 | 并行度能够>=1 |
package api.source.Source6_MySQL;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import com.mysql.jdbc.Connection;
import com.mysql.jdbc.PreparedStatement;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.DriverManager;
import java.sql.ResultSet;
public class MySQL2_SourceFunction {
public static void main(String[] args) throws Exception {
// ToDo 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ToDo 1.source
DataStream<Student> dataInput = env.addSource(new MySQLSource());
// ToDo 2.transformation
// ToDo 3.sink
dataInput.print();
// ToDo 4.execute
env.execute();
}
@Data // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
@AllArgsConstructor // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
@NoArgsConstructor // 注解在类上,为类提供无参构造函数
public static class Student {
private Integer id;
private String name;
private Integer age;
}
public static class MySQLSource extends RichSourceFunction<Student> {
private volatile boolean isRunning = true;
private Connection conn = null;
private PreparedStatement ps = null;
// 打开数据库连接,只执行一次,之后一直使用这个连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("com.mysql.jdbc.Driver"); // 加载数据库驱动
conn = (Connection) DriverManager.getConnection( // 获取连接
"jdbc:mysql://localhost:3306/demodb?serverTimezone=GMT%2B8&useSSL=false", // 数据库URL
"root", // 用户名
"123456"); // 登录密码
ps = (PreparedStatement) conn.prepareStatement( // 获取执行语句
"select id,name,age from t_student"); // 需要执行的SQL语句
}
// 执行查询并获取结果
@Override
public void run(SourceContext<Student> ctx) throws Exception {
while(isRunning) { // 使用while循环可以不断读取数据
ResultSet resultSet = ps.executeQuery();
while(resultSet.next()) {
int id = resultSet.getInt("id");
String name = resultSet.getString("name");
int age = resultSet.getInt("age");
ctx.collect(new Student(id,name,age)); // 以流的形式发送结果
}
Thread.sleep(5000); // 每隔5秒查询一次
}
}
// 取消数据生成
@Override
public void cancel() {
isRunning = false;
}
// 关闭数据库连接
@Override
public void close() throws Exception {
super.close();
if(conn != null) conn.close();
if(ps != null) ps.close();
}
}
}
写入 MySQL 数据
直接使用 JDBCOutputFormat
package api.sink.Sink4_MySQL;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.types.Row;
import java.sql.Types;
import java.util.Arrays;
public class MySQL1_OutputFormat {
public static void main(String[] args) throws Exception {
// ToDo 0.env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// ToDo 1.source
DataSet<Student> dataSource = env.fromCollection(
Arrays.asList(
new Student(5, "Nancy", 22),
new Student(6, "Emma", 20),
new Student(7, "Sophia", 19)
)
);
// ToDo 2.transformation
DataSet<Row> dataMap = dataSource.map(new MapFunction<Student, Row>() {
@Override
public Row map(Student student) throws Exception { // 转换为Row类型
Row row = new Row(3); // 有3个参数
row.setField(0, student.getId());
row.setField(1, student.getName());
row.setField(2, student.getAge());
return row;
}
});
// ToDo 3.sink
dataMap.output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver") // JDBC驱动名
.setDBUrl("jdbc:mysql://localhost:3306/demodb?serverTimezone=GMT%2B8&useSSL=false") // 数据库URL
.setUsername("root") // 用户名
.setPassword("123456") // 登录密码
.setQuery("insert into t_student(id,name,age) values(?,?,?)") // 需要执行的SQL语句
// 如果数据不存在则插入,数据存在则更新,可以将SQL语句替换为以下:
// insert into t_student(id,name,age) values(?,?,?) on duplicate key update name=values(name),age=values(age)
.setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.INTEGER}) // 设置查询的列的类型
.finish());
// ToDo 4.execute
env.execute();
System.out.println("MySQL写入成功!");
}
@Data // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
@AllArgsConstructor // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
@NoArgsConstructor // 注解在类上,为类提供无参构造函数
public static class Student {
private Integer id;
private String name;
private Integer age;
}
}
继承 RichSinkFunction
package api.sink.Sink4_MySQL;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;
public class MySQL2_SinkFunction {
public static void main(String[] args) throws Exception {
// ToDo 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ToDo 1.source
DataStreamSource<Student> dataInput = env.fromCollection(
Arrays.asList(
new Student(3, "Linda", 20),
new Student(6, "Allen", 19),
new Student(8, "Marcia", 20),
new Student(9, "Helen", 18)
)
);
// ToDo 2.transformation
// ToDo 3.sink
dataInput.addSink(new MySQLSink());
// ToDo 4.execute
env.execute();
System.out.println("MySQL写入成功!");
}
@Data // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
@AllArgsConstructor // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
@NoArgsConstructor // 注解在类上,为类提供无参构造函数
public static class Student {
private Integer id;
private String name;
private Integer age;
}
public static class MySQLSink extends RichSinkFunction<Student> {
private Connection conn = null;
private PreparedStatement insertStmt = null;
private PreparedStatement updateStmt = null;
// 打开数据库连接,只执行一次,之后一直使用这个连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("com.mysql.jdbc.Driver"); // 加载数据库驱动
conn = DriverManager.getConnection( // 获取连接
"jdbc:mysql://localhost:3306/demodb?serverTimezone=GMT%2B8&useSSL=false", // 数据库URL
"root", // 用户名
"123456"); // 登录密码
insertStmt = conn.prepareStatement( // 获取执行语句
"insert into t_student(id,name,age) values (?,?,?)"); // 插入数据
updateStmt = conn.prepareStatement( // 获取执行语句
"update t_student set name=?,age=? where id=?"); // 更新数据
}
// 执行插入和更新
@Override
public void invoke(Student value, Context ctx) throws Exception {
// 每条数据到来后,直接执行更新语句
updateStmt.setString(1, value.getName()); // 与占位符(?)对应的参数
updateStmt.setInt(2, value.getAge());
updateStmt.setInt(3, value.getId());
updateStmt.execute(); // 执行更新语句
// 如果更新数为0,则执行插入语句
if (updateStmt.getUpdateCount() == 0) {
insertStmt.setInt(1, value.getId());
insertStmt.setString(2, value.getName());
insertStmt.setInt(3, value.getAge());
insertStmt.execute(); // 执行插入语句
}
}
// 关闭数据库连接
@Override
public void close() throws Exception {
super.close();
if(conn != null) conn.close();
if(insertStmt != null) insertStmt.close();
if(updateStmt != null) updateStmt.close();
}
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)