因为flink版本迭代比较迅速,在我们进行代码的编写过程中容易出现版本不兼容的问题,为此本文是在flink版本为1.12.0的基础上完成开发的。

1:配置maven依赖(重要)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>io-flink</groupId>
    <artifactId>flink_id</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.12.0</version>
        </dependency>

        <!--flink连接mysql-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>
        <!--flink连接kafka将得到的数据转化为json格式  -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.12.0</version>
        </dependency>



        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
    </dependencies>
</project>

注:这里面有一个比较坑的地方,我们在之前的版本配置flink连接kafka的时候会遇到许多坑最常见的就是kafka在flink1.12之后需要设置为:flink-connector-kafka_2.11

而在1.12之前的版本应该设置成:flink-connector-kafka-0.11_2.11(新版本已经弃用)

在运行代码的时候会发生如下错误:

 2:代码部分如下

package kafka2flink2mysql

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.slf4j.LoggerFactory
import org.slf4j.event.Level

object flink_sql {
    private val log = LoggerFactory.getLogger(flink_sql.getClass.getSimpleName)

    def main(args: Array[String]): Unit = {

        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
        val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)
        import org.apache.flink.api.scala._
        val sql_info =
            """CREATE TABLE t_user (
              |  user_id STRING,
              |  name STRING,
              |  sex STRING,
              |  money STRING
              |)
              | WITH
              |(
              |'connector' = 'kafka',
              |'topic' = 'flink',
              |'properties.bootstrap.servers' = 'hadoop:9092',
              |'scan.startup.mode' = 'earliest-offset',
              |'properties.group.id' = 'group_1',
              |'format' = 'json'
              |)""".stripMargin
        tableEnv.executeSql(sql_info)
        //获得刚刚生成的表转化为table类
        val t_user: Table = tableEnv.from("t_user")
        //输出打印table的schema信息
        t_user.printSchema()
        //或者转化为流打印出来
        val stream: DataStream[(String, String, String, String)] = tableEnv.toAppendStream[(String,String,String,String)](t_user)
        stream.print()

        //定义mysql输出表
        val sql_info2 =
            """CREATE TABLE mysql_user (
              |  user_id STRING,
              |  name STRING,
              |  sex STRING,
              |  money STRING
              |)
              | WITH
              |(
              |'connector' = 'jdbc',
              |'url' = 'jdbc:mysql://localhost:3306/one?serverTimezone=Asia/Shanghai&zeroDaeTimeBehavior=convertToNull&useSSL=true',
              |'driver' = 'com.mysql.jdbc.Driver',
              |'username' = 'root',
              |'password' = 'root',
              |'table-name' = 't_user',
              |'lookup.cache.max-rows' = '100',
              |'lookup.cache.ttl' = '60000'
              |)""".stripMargin
        tableEnv.executeSql(sql_info2)


        var insert =
            """
              |insert into mysql_user
              |select * from t_user
              |""".stripMargin
        tableEnv.executeSql(insert)
        env.execute("flink_running")
    }
}

 切记kafka的配置参数一定要按照提示写入,不然会报错

kafka端数据传输格式如下:
{"user_id":"1001","name":"zhangsan","sex":"nv","money":"499"}

java代码实现如下

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class datastreaming_demo {
    public static void main(String[] args) {
        Logger log = LoggerFactory.getLogger(datastreaming_demo.class);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        EnvironmentSettings build = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, build);
        String sql =
                "CREATE TABLE t_user (\n" +
                "user_id STRING,\n" +
                "name STRING,\n" +
                "sex STRING,\n" +
                "money STRING\n" +
                ")\n" +
                "WITH\n" +
                "(\n" +
                "'connector' = 'kafka',\n" +
                "'topic' = 'flink_streaming',\n" +
                "'properties.bootstrap.servers' = 'localhost:9092',\n" +
                "'scan.startup.mode' = 'latest-offset',\n" +
                "'properties.group.id' = 'group_1',\n" +
                "'format' = 'json')\n";
        tableEnv.executeSql(sql);
        Table t_user = tableEnv.from("t_user");
        t_user.printSchema();
//        DataStream<pojo> pojoDataStream = tableEnv.toAppendStream(t_user,pojo.class);
//        pojoDataStream.print();
        String sql2 =
                "CREATE TABLE mysql_user (\n" +
                "user_id STRING,\n" +
                "name STRING,\n" +
                "sex STRING,\n" +
                "money STRING,\n" +
                "PRIMARY KEY (user_id) NOT ENFORCED\n"+
                ")\n" +
                " WITH\n" +
                "(\n" +
                "'connector' = 'jdbc',\n" +
                "'url' = 'jdbc:mysql://localhost:3306/POC_DB?serverTimezone=Asia/Shanghai&zeroDaeTimeBehavior=convertToNull&useSSL=false',\n" +
                "'driver' = 'com.mysql.jdbc.Driver',\n" +
                "'username' = 'root',\n" +
                "'password' = '5675219999',\n" +
                "'table-name' = 'mysql_user',\n" +
                "'lookup.cache.max-rows' = '100',\n"+
                "'lookup.cache.ttl' = '60000'\n"+
                ")";
        tableEnv.executeSql(sql2);
        Table mysql_user = tableEnv.from("mysql_user");
        mysql_user.printSchema();
        String insert = "insert into mysql_user select * from t_user ";
        tableEnv.executeSql(insert);
        try {
            env.execute("flink_running");
        } catch (Exception e) {
            log.info("抛出异常!");
            System.out.println(e.getMessage());
        }
}
}

pojo类

// 一定要包含空构造函数才行
public class pojo {
    public String user_id;
    public String name;
    public String sex;
    public String money;

    public pojo(String user_id, String name, String sex, String money) {
        this.user_id = user_id;
        this.name = name;
        this.sex = sex;
        this.money = money;
    }
    public pojo() {

    }
    public String getUser_id() {
        return user_id;
    }

    public String getName() {
        return name;
    }

    public String getSex() {
        return sex;
    }

    public String getMoney() {
        return money;
    }

    public void setUser_id(String user_id) {
        this.user_id = user_id;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public void setMoney(String money) {
        this.money = money;
    }

}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐