flinksql实时读取kafka写入mysql
因为flink版本迭代比较迅速,在我们进行代码的编写过程中容易出现版本不兼容的问题,为此本文是在flink版本为1.12.0的基础上完成开发的。1:配置maven依赖(重要)<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http:
·
因为flink版本迭代比较迅速,在我们进行代码的编写过程中容易出现版本不兼容的问题,为此本文是在flink版本为1.12.0的基础上完成开发的。
1:配置maven依赖(重要)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io-flink</groupId>
<artifactId>flink_id</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.12.0</version>
</dependency>
<!--flink连接mysql-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<!--flink连接kafka将得到的数据转化为json格式 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
</project>
注:这里面有一个比较坑的地方,我们在之前的版本配置flink连接kafka的时候会遇到许多坑最常见的就是kafka在flink1.12之后需要设置为:flink-connector-kafka_2.11
而在1.12之前的版本应该设置成:flink-connector-kafka-0.11_2.11(新版本已经弃用)
在运行代码的时候会发生如下错误:
2:代码部分如下
package kafka2flink2mysql
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.slf4j.LoggerFactory
import org.slf4j.event.Level
object flink_sql {
private val log = LoggerFactory.getLogger(flink_sql.getClass.getSimpleName)
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)
import org.apache.flink.api.scala._
val sql_info =
"""CREATE TABLE t_user (
| user_id STRING,
| name STRING,
| sex STRING,
| money STRING
|)
| WITH
|(
|'connector' = 'kafka',
|'topic' = 'flink',
|'properties.bootstrap.servers' = 'hadoop:9092',
|'scan.startup.mode' = 'earliest-offset',
|'properties.group.id' = 'group_1',
|'format' = 'json'
|)""".stripMargin
tableEnv.executeSql(sql_info)
//获得刚刚生成的表转化为table类
val t_user: Table = tableEnv.from("t_user")
//输出打印table的schema信息
t_user.printSchema()
//或者转化为流打印出来
val stream: DataStream[(String, String, String, String)] = tableEnv.toAppendStream[(String,String,String,String)](t_user)
stream.print()
//定义mysql输出表
val sql_info2 =
"""CREATE TABLE mysql_user (
| user_id STRING,
| name STRING,
| sex STRING,
| money STRING
|)
| WITH
|(
|'connector' = 'jdbc',
|'url' = 'jdbc:mysql://localhost:3306/one?serverTimezone=Asia/Shanghai&zeroDaeTimeBehavior=convertToNull&useSSL=true',
|'driver' = 'com.mysql.jdbc.Driver',
|'username' = 'root',
|'password' = 'root',
|'table-name' = 't_user',
|'lookup.cache.max-rows' = '100',
|'lookup.cache.ttl' = '60000'
|)""".stripMargin
tableEnv.executeSql(sql_info2)
var insert =
"""
|insert into mysql_user
|select * from t_user
|""".stripMargin
tableEnv.executeSql(insert)
env.execute("flink_running")
}
}
切记kafka的配置参数一定要按照提示写入,不然会报错
kafka端数据传输格式如下:
{"user_id":"1001","name":"zhangsan","sex":"nv","money":"499"}
java代码实现如下
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class datastreaming_demo {
public static void main(String[] args) {
Logger log = LoggerFactory.getLogger(datastreaming_demo.class);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings build = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, build);
String sql =
"CREATE TABLE t_user (\n" +
"user_id STRING,\n" +
"name STRING,\n" +
"sex STRING,\n" +
"money STRING\n" +
")\n" +
"WITH\n" +
"(\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'flink_streaming',\n" +
"'properties.bootstrap.servers' = 'localhost:9092',\n" +
"'scan.startup.mode' = 'latest-offset',\n" +
"'properties.group.id' = 'group_1',\n" +
"'format' = 'json')\n";
tableEnv.executeSql(sql);
Table t_user = tableEnv.from("t_user");
t_user.printSchema();
// DataStream<pojo> pojoDataStream = tableEnv.toAppendStream(t_user,pojo.class);
// pojoDataStream.print();
String sql2 =
"CREATE TABLE mysql_user (\n" +
"user_id STRING,\n" +
"name STRING,\n" +
"sex STRING,\n" +
"money STRING,\n" +
"PRIMARY KEY (user_id) NOT ENFORCED\n"+
")\n" +
" WITH\n" +
"(\n" +
"'connector' = 'jdbc',\n" +
"'url' = 'jdbc:mysql://localhost:3306/POC_DB?serverTimezone=Asia/Shanghai&zeroDaeTimeBehavior=convertToNull&useSSL=false',\n" +
"'driver' = 'com.mysql.jdbc.Driver',\n" +
"'username' = 'root',\n" +
"'password' = '5675219999',\n" +
"'table-name' = 'mysql_user',\n" +
"'lookup.cache.max-rows' = '100',\n"+
"'lookup.cache.ttl' = '60000'\n"+
")";
tableEnv.executeSql(sql2);
Table mysql_user = tableEnv.from("mysql_user");
mysql_user.printSchema();
String insert = "insert into mysql_user select * from t_user ";
tableEnv.executeSql(insert);
try {
env.execute("flink_running");
} catch (Exception e) {
log.info("抛出异常!");
System.out.println(e.getMessage());
}
}
}
pojo类
// 一定要包含空构造函数才行
public class pojo {
public String user_id;
public String name;
public String sex;
public String money;
public pojo(String user_id, String name, String sex, String money) {
this.user_id = user_id;
this.name = name;
this.sex = sex;
this.money = money;
}
public pojo() {
}
public String getUser_id() {
return user_id;
}
public String getName() {
return name;
}
public String getSex() {
return sex;
}
public String getMoney() {
return money;
}
public void setUser_id(String user_id) {
this.user_id = user_id;
}
public void setName(String name) {
this.name = name;
}
public void setSex(String sex) {
this.sex = sex;
}
public void setMoney(String money) {
this.money = money;
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)