背景

在FlinkSql client下尝试将 kafka中映射的虚拟表ods_base_province 导入到mysql表base_province时,抛了如下错误:

Flink SQL>  

INSERT INTO base_province
SELECT *
FROM ods_base_province;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
kafka
print
upsert-kafka

Shutting down the session...
done.
[bigdata_admin@dn5 flink_sql_rtdw-demo]$

分析

在Flink运行时上下文中可能:

  • 缺少flink与jdbc的连接适配器
  • 缺少 mysql 的 jdbc 驱动包

请检查${FLINK_HOME}/lib下是否包含如下名称的jar:

  • flink-connector-jdbc_2.x-1.y.z.jar
  • mysql-connector-java-5.1.38.jar (仅举例)

修复

手工添加上述依赖包,记得把hive、kafka的适配器连接包,也一块导进来:

cd ${FLINK_HOME}/lib

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.13.6/flink-connector-jdbc_2.11-1.13.6.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.13.6/flink-connector-hive_2.11-1.13.6.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.6/flink-sql-connector-kafka_2.11-1.13.6.jar

下载后的jar包:

[bigdata_admin@dn5 lib]$ ll
total 210872
-rw-rw-r-- 1 bigdata_admin bigdata_admin   7758727 Feb  4 17:48 flink-connector-hive_2.11-1.13.6.jar
-rw-rw-r-- 1 bigdata_admin bigdata_admin    249570 Feb  4 17:48 flink-connector-jdbc_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin     92314 Feb  4 17:11 flink-csv-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 115425612 Feb  4 17:15 flink-dist_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin    148127 Feb  4 17:11 flink-json-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin   7709740 May  7  2021 flink-shaded-zookeeper-3.4.14.jar
-rw-rw-r-- 1 bigdata_admin bigdata_admin   3674190 Feb  4 17:59 flink-sql-connector-kafka_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin  36455408 Feb  4 17:14 flink-table_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin  41077430 Feb  4 17:14 flink-table-blink_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin    208006 Jan 13 19:06 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin    301872 Jan  7 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin   1790452 Jan  7 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin     24279 Jan  7 18:07 log4j-slf4j-impl-2.17.1.jar
-rw-rw-r-- 1 bigdata_admin bigdata_admin    983911 Dec  2  2015 mysql-connector-java-5.1.38.jar

引申

这里使用的FlinkSql client方式来操作source端kafka中的数据,落地至sink端的mysql中,在使用TableEnvironment scala编程时,请将
驱动包添加到 pom.xml中, 同时在相关依赖中的参数置为 provided,如下所示, 以防止与服务器上的jar发生jar冲突。

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.13.6</version>
    <scope>provided</scope>
</dependency>

Maven依赖包参数取值范围:

1.test范围是指测试范围有效,在编译和打包时都不会使用这个依赖
2.compile范围是指编译范围内有效,在编译和打包时都会将依赖存储进去
3.provided依赖,在编译和测试过程中有效,最后生成的war包时不会加入 例如:
   servlet-api,因为servlet-api  tomcat服务器已经存在了,如果再打包会冲突
4.runtime在运行时候依赖,在编译时候不依赖

默认依赖范围是compile
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐