1、启动一个flink的 集群
  • 可以使用flink独立集群
  • 也可以使用yarn-session.sh
# 启动一个flinkyarn-sesion集群
yarn-sesion.sh -d
2、启动sql-client
sql-client.sh

3、测试命令行
-- 创建source表
CREATE TABLE datagen (
 id STRING,
 name STRING,
 age INT
) WITH (
 'connector' = 'datagen',
 'rows-per-second' = '5', -- 每秒生成的数据行数据
 'fields.id.length' = '5', --字段长度限制
 'fields.name.length'='3',
 'fields.age.min' ='1', -- 最小值
 'fields.age.max'='100' -- 最大值
)
-- 执行sql
-- 可以直接在命令行中查看直接结果
-- 会实时打印结果
select age,count(1) as c from datagen  group by age;

-- 输出结果模式
SET 'sql-client.execution.result-mode' = 'table';
SET 'sql-client.execution.result-mode' = 'changelog';
SET 'sql-client.execution.result-mode' = 'tableau';


-- 在flinksql中执行insert into
CREATE TABLE age_num_mysql (
  age INT,
  num BIGINT,
  PRIMARY KEY (age) NOT ENFORCED -- 按照主键更新数据
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8',
   'table-name' = 'age_num', -- 需要手动到数据库中创建表
   'username' = 'root',
   'password' = '123456'
)

-- 在数据库中创建表
CREATE TABLE `age_num` (
  `age` int NOT NULL,
  `num` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`age`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


-- 插入数据
-- insert into 的语句会提交到flink的集群中运行,和本地客户端就没有关系了
insert into age_num_mysql
select age,count(1) as num from datagen  group by age;

执行:select age,count(1) as c from datagen group by age;

结果如下

执行:insert into age_num_mysql
select age,count(1) as num from datagen group by age;

效果如下

4、sql-client.sh -i
-- 可以将通用的sql放在一个初始的sql文件中
--文件中可以写多个sql,
vim sql-client.sql

CREATE CATALOG mysql_catalog WITH(
    'type' = 'jdbc',
    'default-database' = 'bigdata17',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://master:3306'
);

use catalog mysql_catalog;

-- 启动sql-client 
sql-client.sh -i sql-client.sql

5、sql-client.sh -f

可以直接执行一个sql文件

-- 创建一个sql文件
vim age_num.sql
-- source 表
CREATE TABLE datagen (
 id STRING,
 name STRING,
 age INT
) WITH (
 'connector' = 'datagen',
 'rows-per-second' = '5', -- 每秒生成的数据行数据
 'fields.id.length' = '5', --字段长度限制
 'fields.name.length'='3',
 'fields.age.min' ='1', -- 最小值
 'fields.age.max'='100' -- 最大值
);

-- 多个sql使用分号分隔
-- sink表
CREATE TABLE age_num_mysql (
  age INT,
  num BIGINT,
  PRIMARY KEY (age) NOT ENFORCED -- 按照主键更新数据
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
   'table-name' = 'age_num', -- 需要手动到数据库中创建表
   'username' = 'root',
   'password' = '123456'
);

-- 插入数据
insert into age_num_mysql
select age,count(1) as num from datagen  group by age;

-- 启动
sql-client.sh -f age_num.sql

-- 效果和刚才的分步骤操作差不多,只不过这次是通过一个sql脚本文件一起操作

5、Catalog

catalog ---> database ---> table ---> 字段

catalog是flink用于保存元数据的一种机制

元数据(表结构)

1、GenericInMemoryCatalog(默认)

基于内存的catalog,元数据只在当前会话中起作用

也是flink默认的catalog

2、Jdbc Catalog 整库同步

只能在flink中直接读写数据库中的表

不能在JdbcCatalog中创建flink的表

-- 创建jdbc catalog
CREATE CATALOG mysql_catalog WITH(
    'type' = 'jdbc',
    'default-database' = 'bigdata',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://master:3306'
);

-- 查看当前所有的catalog;
show catalogs;

-- 切换catalog
use catalog mysql_catalog;
3、hive catalog(重点)

hive catalog 可以用于flink读取hvie中的表,可以用于在hive元数据中保存flink的

  • 配置Hadoop classpath

    vim /etc/profile
    # 放在PAth的后面
    export HADOOP_CLASSPATH=`hadoop classpath`
    source /etc/profile
  • 将flink-sql-connector-hive-1.2.2_2.12-1.15.0.jar依赖包上传到flink的lib目录下

    flink-sql-connector-hive-1.2.2_2.12-1.15.0.jar
  • 将mysql驱动放在flink lib目录下

    mysql-connector-java-5.1.49.jar
  • 删除flink lib目录下flink-table-planner-loader-1.15.0.jar包

    flink-table-planner-loader-1.15.0.jar
  • 将flink opt目录flink-table-planner_2.12-1.15.0.jar包复制到flink的 lib目录下

    cp /usr/local/soft/flink-1.15.0/opt/flink-table-planner_2.12-1.15.0.jar /usr/local/soft/flink-1.15.0/lib/

  • 启动hive的元数据服务

    nohup  hive --service metastore >> metastore.log 2>&1 &
  • 重启yarn-session集群

    # 查看yarn中正在运行的任务
    yarn application -list
    # 关闭yarn-session
    yarn application -kill application_1659099426082_0003
    # 启动yarn-session
    yarn-session.sh -d
  • 在sql-client中创建hive 的catalog

-- 进入sql客户端
sql-client.sh 

-- 创建hive catalog
CREATE CATALOG hive_catalog WITH (
    'type' = 'hive',
    'default-database' = 'bigdata17',
    'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG hive_catalog;
  • 在flink中可以直接查询hive的表

    select * from students3;

将 hadoop-mapreduce-client-core-2.7.6.jar 放到 flink/lib 的目录下,然后kill掉已经启动的yarn任务,重新生成任务,再次执行结果如下:

  • 在flink中创建表,表的元数据可以保存到hive的元数据中

    flink将表的元数据保存在hive的元数据中,在hive中可以看到flink的表,但是不能对flink的表进行查询

    flink的元数据保存在hive中,元数据不会丢失

    CREATE TABLE datagen (
     id STRING,
     name STRING,
     age INT
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second' = '1', -- 每秒生成的数据行数据
     'fields.id.length' = '5', --字段长度限制
     'fields.name.length'='3',
     'fields.age.min' ='1', -- 最小值
     'fields.age.max'='100' -- 最大值
    );

4、使用hive的方言

spark sql默认就监控hive语法的

-- 指定hive方言
-- 开启hive的方言之后就不能使用flink自己的语法了
set table.sql-dialect=hive; 
--默认方言
set table.sql-dialect=default;
5、使用hive的函数
-- 加载hive模块,在fink中就可以直接使用hive的函数了
LOAD MODULE hive WITH ('hive-version' = '1.2.1');
-- 查看所有的模块
SHOW MODULES;

flink 中不支持hive中的很多语法比如split

加载hive模块后,就能使用了如下:

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐