ClickHouse实战–使用分布式表

分布式表非实体表,本质上来说是一个视图。由于CK的节点是对等的(没有主从的概念),当一个分布式表接收到请求时,请求会落到一个其中一个节点上,接收请求的节点,会把SQL进行拆分,并把请求发送到其他各个节点上,然后再对数据进行聚合,并把聚合好的结果返回给客户端。

查看集群信息

由于在创建分布式表时会用到集群名,所以,需要查看一个集群的信息。可以通过一个sql查看集群信息。

select * from system.clusters;
查看系统设置
select * from system.settings;
分布式表的创建

创建分布式表时,一般是先存在本地表。分布式表是基于本地表创建来进行创建。创建的基本语法如下:

CREATE TABLE [IF NOT EXISTS] [db.]table_name 
[ON CLUSTER cluster] AS [db2.]name2 
ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) 
[SETTINGS name=value, ...]

要注意的是:默认情况下分布式表的数据插入是异步的,若要实现同步的分布式表数据插入,需要设置参数。

分布式引擎参数解释
  • cluster - 服务为配置中的集群名
  • database - 远程数据库名
  • table - 远程数据表名
  • sharding_key - (可选) 分片key
  • policy_name - (可选) 规则名,它会被用作存储临时文件以便异步发送数据

在创建分布式表时,还有几个重要的setting参数需要注意:

  • insert_distributed_sync:默认情况下,向分布式表中插入数据时,ClickHouse 服务器以异步方式向集群节点发送数据。当insert_distributed_sync=1时,数据是同步处理的,只有在所有shard上都保存了所有数据后INSERT操作才成功(如果internal_replication为true,每个shard至少有一个replica)。
  • fsync_after_insert:在异步插入到分布式后对文件数据进行 fsync。保证操作系统将整个插入的数据刷新到启动器节点磁盘上的文件中。

分布式表实战

查看分布式集群信息

可以先通过sql语句来查看一下系统的集群配置。

:) select * from system.clusters;
─cluster────────────────────┬─host_name───┬─host_address─┬─port─┐
│ perftest_2shards_1replicas │ x.x.x.1xx  │ x.x.x.xxx  │ 9000 │
│ perftest_2shards_1replicas │ x.x.x.2xx  │ x.x.x.xxx  │ 9000 │
└────────────────────────────┴─────────────┴──────────────┴──────┘

通过以上输出可知,我们的集群名是:perftest_2shards_1replicas。

创建集群的本地表
CREATE TABLE if not exists test_db.city_local on cluster perftest_2shards_1replicas
(
  `id` Int64,
  `city_code` Int32,
  `city_name` String,
  `total_cnt` Int64,
  `event_time` DateTime
)
Engine=MergeTree()
PARTITION BY toYYYYMMDD(event_time)
ORDER BY id;
创建分布式表

分布式表实际上是一个视图,因此它需要与分片(服务器节点)具有相同的表结构定义。创建视图后,将在每个分片(服务器节点)上查询数据,并将结果聚合到最初调用查询的节点上。

在其中一个节点上执行以下创建分布式表的语句:

CREATE TABLE test_db.city_all on cluster perftest_2shards_1replicas
AS test_db.city_local
ENGINE = Distributed(perftest_2shards_1replicas, test_db, city_local, rand());

注意:在创建分布式表时需要添加上on cluster xxx。另外,创建分布式表的语法,可以参考官方文档

Distributed中参数中,第1个参数:集群名;第2个参数:数据库名;第3个参数:本地表名;第4个参数:分布策略。

向分布式表中插入数据
insert into test_db.city_all (id, city_code, city_name, total_cnt, event_time) values (1, 4000, 'guangzhou', 420000, '2022-02-21 00:00:00');
 
insert into test_db.city_all (id, city_code, city_name, total_cnt, event_time) values (2, 5000, 'shenzhan', 55000, '2022-02-22 00:00:00');
 
insert into test_db.city_all (id, city_code, city_name, total_cnt, event_time) values (3, 6000, 'huizhou', 65000, '2022-02-23 00:00:00');
 
insert into test_db.city_all (id, city_code, city_name, total_cnt, event_time) values (4, 7000, 'huizhou', 75000, '2022-02-24 00:00:00');

insert into test_db.city_all (id, city_code, city_name, total_cnt, event_time) values (5, 8000, 'huizhou', 75001, '2022-02-25 00:00:00');

查看一下分布式表的数据条数:

:) select count() from city_all;

SELECT count() FROM city_all

Query id: eff9e667-61d7-4302-93dc-9d7379d234db

┌─count()─┐
│       5 │
└─────────┘

连接到其中一个节点,查看本地表的数据条数:

:) select count() from city_local;

SELECT count() FROM city_local

Query id: 24347cdb-4cfe-4d6b-8492-90d40c8e0e2c

┌─count()─┐
│       3 │
└─────────┘
更新分布式表的数据

想要更新分布式表的数据时,可以通过更新本地表的数据来完成。不过在更新本地表的数据时,必须添加on cluster xxx。否则只会在其中一个节点中进行操作。另外,不能直接更新分布式表。

更新本地表的语句如下:

ALTER TABLE city_local ON CLUSTER perftest_2shards_1replicas UPDATE total_cnt = 5555 WHERE city_name = 'shenzhan';

通过以上sql语句,可以正确的完成数据的更新。更新完成后,可以通过以下sql语句来查看更新后的数据。

select * from city_all order by id;

我们尝试来通过分布式表来更新数据,会报错。

ALTER TABLE city_all ON CLUSTER perftest_2shards_1replicas UPDATE total_cnt = 4444 WHERE city_name = 'shenzhan';

会报以下错误:

┌─host────────┬─port─┬─status─┬─error───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─num_hosts_remaining─┬─num_hosts_active─┐
│ xxx.xxx.1xx.xx│ 9000 │     60 │ Code: 60, e.displayText() = DB::Exception: Table test_db.city_all doesn't exist (version 21.8.10.19 (official build))               │                   1 │                0 │
│ xxx.xxx.2xx.xx │ 9000 │     48 │ Code: 48, e.displayText() = DB::Exception: Table engine Distributed doesn't support mutations (version 21.8.10.19 (official build)) │                   0 │                0 │
└─────────────┴──────┴────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────┴──────────────────┘
↘ Progress: 0.00 rows, 0.00 B (0.00 rows/s., 0.00 B/s.)  0%

所以,可知,不能通过分布式表来更新数据。

另外,要注意,CK在更新数据时,哪怕更新一条数据,也会重写该条数据所在分区的所有数据,所以,CK的更新操作是一个很重的操作,不能频繁进行。特别是当数据量非常大的时候,进行数据的更新更是要谨慎。

修改分布式表的结构

一般来说,分布式表的表结构必须要和本地保持一致的。在修改表结构时,最好不要直接通过alter语句进行表结构的修改,而是新建一张表,然后把老表的数据导入到新表中。

比如我们有一张表t1_local,分布式表d1_all,需要修改表结构。一般情况下,修改分布式表的表结构的步骤是:

(1)按新的表结构创建一个新的集群本地表(必须添加on cluster xxx),比如为:t2_local

(2)按新的本地表创建一个分布式表(添加on cluster xxx),比如为:d2_all

(3)把老的数据表中的数据导入新的分布式表中:insert into d2_all select f1,f2,f3,fn from d1_all。注意:这一步也可以导入到本地表t2_local中,导入分布式表是为了让数据按新的分片策略进行分片。

(4)验证一下数据是否导入完成。

(5)修改老的本地表名为t1_local_bk,修改新的本地表名为:t1_local。必须添加on cluster xxx。此时,以前的分布式表d1_all没有修改,但我们修改了本地表的名称,所以在对d1_all分布式表进行操作时,会对最新的本地表上进行。

另外,要注意,为了避免数据丢失,在进行表结构更新操作时,最好停止数据写入的操作。

小结

本文介绍了分布式表的基本操作。包括分布式表的创建,如何向分布式表插入数据,如何更新分布式表的表结构等操作。

参考资料

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐