1. 启动SQL Client

先启动Flink集群

[root@flink1 ~]# 
[root@flink1 ~]# flink-1.14.3/bin/sql-client.sh 
......省略部分......
Flink SQL> 

目前sql-client.sh只能连接本地的集群,不能连接远程的集群

2. SQL Client结果查看模式

1. table模式

将结果物化到内存,再显示。可以通过上下翻页查看更多数据

可以按q退出结果查看

Flink SQL> set 'execution.runtime-mode' = 'streaming';
[INFO] Session property has been set.

Flink SQL> set 'sql-client.execution.result-mode' = 'table';
[INFO] Session property has been set.

Flink SQL> select my_name, count(*) as cnt 
> from (values('Bob'), ('Alice'), ('Greg'), ('Bob')) as my_table(my_name) 
> group by my_name;

                                                                         SQL Query Result (Table)                                                                          
 Table program finished.                                                      Page: Last of 1                                                        Updated: 07:32:16.080 

                        my_name                  cnt
                          Alice                    1
                           Greg                    1
                            Bob                    2


















Q Quit                            + Inc Refresh                     G Goto Page                       N Next Page                       O Open Row                        
R Refresh                         - Dec Refresh                     L Last Page                       P Prev Page                       

2. changelog模式

结果是变更日志的形式,只显示最近1000条日志。不支持batch模式

Flink SQL> set 'execution.runtime-mode' = 'streaming';
[INFO] Session property has been set.

Flink SQL> set 'sql-client.execution.result-mode' = 'changelog';
[INFO] Session property has been set.

Flink SQL> select my_name, count(*) as cnt 
> from (values('Bob'), ('Alice'), ('Greg'), ('Bob')) as my_table(my_name) 
> group by my_name;

                                                                       SQL Query Result (Changelog)                                                                        
 Table program finished.                                                                                                                             Updated: 07:34:14.074 

 op                        my_name                  cnt
 +I                            Bob                    1
 +I                          Alice                    1
 +I                           Greg                    1
 -U                            Bob                    1
 +U                            Bob                    2
















Q Quit                                                  + Inc Refresh                                           O Open Row                                              
R Refresh                                               - Dec Refresh                 
  1. tableau模式
    以表格的形式,显示变更日志的数据
Flink SQL> set 'execution.runtime-mode' = 'streaming';
[INFO] Session property has been set.

Flink SQL> set 'sql-client.execution.result-mode' = 'tableau';
[INFO] Session property has been set.

Flink SQL> select my_name, count(*) as cnt 
> from (values('Bob'), ('Alice'), ('Greg'), ('Bob')) as my_table(my_name) 
> group by my_name;
+----+--------------------------------+----------------------+
| op |                        my_name |                  cnt |
+----+--------------------------------+----------------------+
| +I |                            Bob |                    1 |
| +I |                          Alice |                    1 |
| +I |                           Greg |                    1 |
| -U |                            Bob |                    1 |
| +U |                            Bob |                    2 |
+----+--------------------------------+----------------------+
Received a total of 5 rows

Flink SQL> 

如果未执行完,可以按Ctrl + C结束此条SQL查询

3. 同步 / 异步执行DML语句

默认是异步执行DML语句,提交完DML语句,就退出。如果要取消提交完但正在执行的DML任务,可以通过Web、命令行、Restful API

Flink SQL> create temporary table blackhole_table(
> name string
> ) with ('connector' = 'blackhole');
[INFO] Execute statement succeed.

Flink SQL> insert into blackhole_table select name from (values('zhang_san'), ('li_si'), ('wang_wu')) as source_table(name);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: a927e6b6320391d86056f6204f669a6a

对于Batch模式,通常需要同步执行DML。可以按Ctrl + C取消正在同步执行的DML任务

Flink SQL> set 'table.dml-sync'='true';
[INFO] Session property has been set.

Flink SQL> insert into blackhole_table select name from (values('zhang_san'), ('li_si'), ('wang_wu')) as source_table(name);
[INFO] Submitting SQL update statement to the cluster...
[INFO] Execute statement in sync mode. Please wait for the execution finish...
[INFO] Complete execution of the SQL update statement.

Flink SQL> 
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐