背景介绍

最近又能挤一挤时间,来聊一聊前一段时间接手的一个大数据系统项目。

随着云计算的普及,大部分互联网公司的系统都是基于云原生的产品和体系来搭建的,我接手的系统也不例外。数据处理部分从底层存储,到中间层数据处理系统,再到上层的ETL系统第一版都是基于Google Cloud Platform来搭建的,GKE + PubSub + DataFlow + FireStore,数据服务部分,负载均衡也是Google App Engine的体系。

GCP的产品在接入的环节做的都很便捷,门槛低,对应产品主流开发语言的SDK一应俱全,在项目排期紧张的情况下,整套系统架构基于GCP进行开发、部署与使用都很方便。

那为什么要做一次迁移呢?经过这段时间的折腾,回头想想主要原因有以下几点:

  1. 我们主要的服务对象发生了变化,从海外用户逐渐回归到国内用户
  2. 我们数据产品对实时性有要求,国内用户查询的延迟不能超过1s,增量数据的更新要实时(不超过3s)
  3. GCP整体的成本太高,特别是美元汇率,再加上和国内阿里云相比,迁移后能省不少费用

那么我们在迁移中遇到了什么挑战呢?我认为最有挑战性的有两块:云计算圈子有一句话是存储在哪里,用户就在哪里,由此可见存储做的好对用户的粘性有多大了吧。那遇到的最大的挑战就是整体存储的迁移,从原来的Firestore迁移到mongodb,批处理数据导入hbase;其次是整套数据处理框架的改造,由原来的Apache beam改为Flink framework。

接下来结合迁移计划,我将对v1与v2两个版本的大数据系统做更详细的介绍与分析。

###迁移计划

  • 数据双写,流处理的数据同时写入mongodb,批处理的部分写入hbase
  • API层重构,不同存储的client +和迁移数据进行合并
  • firestore到mongo的迁移
  • 流量切换
  • firestore数据清理

v1数据系统

系统架构

第一期系统架构

从架构图可以看出ETL的链路比较长,从源数据队列消费后需要上传一次阿里云OSS,再依赖OSS callback功能触发GCP上的pipeline程序进行GetObject的操作,最后再把数据导入GCP上的ETL系统做数据清洗、处理和聚合入库。

为什么这么选型+技术点

团队历史原因导致的整体链路冗长,这里忽略1w字。存储的选型上,我们选择了Firestore,这里firestore可能对国内的同学比较陌生,其实可以当其为分布式分片存储的mongodb集群,但是Firestore的subcollection功能特别符合我们的场景,我们的数据是基于用户id存储的document,subcollection可以让我们方便的在doc内创建子collection,这样就很容易基于某个用户id做用户维度下的二级索引,做用户维度下的二级甚至N级维度的数据聚合。

数据处理模型上我们选用是Dataflow进行流处理,因为我们的使用场景里,一个时间窗口内不会有重复的用户id出现,所以处理方式是获取到数据后,做1次IO+中间值处理。中间的operator也比较简单,第一版的设计只是依赖Dataflow的worker管理能力,我们的业务层逻辑只有一个worker,因此整体的逻辑处理模型如下:

数据处理v1

整个项目上线10天左右的时间,数据处理层在没有过多优化的情况下,在流量高峰时是要做横向扩容才能保证处理速度的。那在第一版里最大的挑战是在于整个链路如何保障速度和稳定性,特别是在跨洋处理的这一段。

我们做了以下几个处理:

  1. 源数据上传OSS,做了重试与冷启动逻辑,海外链路波动会经常导致上传OSS失败,这意味着没有触发oss的callback。源数据会定时重试,清理本地失败文件夹里的数据文件。
  2. OSS选择在HK节点,这么做对国内和海外的链路都比较适中,这里如果直接选择国内的OSS,网络波动很不稳定,相比HK的网络波动基本是在国内下午的时间会有一段峰值,和网络超时,其余时间都还算稳定,这样保证我们的数据传输在3-5s内是可以完成
  3. GCP上Pipeline做了数据拉取与二次推送的跟踪,如果拉取数据及推送PubSub失败都加上了重试和冷却的逻辑

数据上传网络监控

v2数据系统

迁移后的数据系统,主要的变化有三个:

  • 没有海外链路的部分,数据都在阿里云内网处理
  • 数据处理层拆分成流处理与批处理两个模型,从Dataflow改为Flink集群
  • 数据存储从Firestore改为MongoDB与HBase,原始数据存在oss上

存储选型的一些思考:

  1. Firestore之前提到与MongoDB类似,所以迁移的选型在考虑数据结构与模型没有太大变化的情况下,所以选择Mongo的分片集群。

一般使用Shard Cluster MongoDB, 主要考虑几个因素:

  • 数据容量扩展性
  • 读写性能扩展性

设计shard key主要目标有:

  • key 分布足够离散 (sufficient cardinality)
  • 写请求均匀分布 (evenly distributed write)
  • 尽量避免 scatter-gather 查询 (targeted read)

Mongo分片集群架构

一般通过document里的某个字段作hash来完成sharding,这里需要考虑到jumbo chunk的问题。如果只是基于某个字段进行分片,逻辑中再根据时间进行query,这样还是会导致基于某个字段的数据都集中在一个分片中,因此根据请求和业务逻辑,可以通过(key1, key2, key3…)这样的方式进行shard key设计。

chunk达到size之后会分裂,除了jumbo chunk的情况。底层的balancer程序会根据shard的情况迁移chunk。具体mongo分片的逻辑可以参考https://docs.mongodb.com/manual/core/sharding-shard-key/

  1. 根据数据的访问频次和处理方式,我们还选择了HBase用于批处理的数据存储,按用户一天、一周与一个月的数据进行聚合;OSS存储原始数据考虑到部分数据并非热点数据,并且是原始数据格式的format后直接显示,所以不用转存一份,这里我们是使用MongoDB来做二级索引,比如某个用户ID下的所有Match ID。另外OSS上的数据也是按天存储,批处理模型也直接是从OSS读数据作为datasource进行批处理计算。

  2. 那么之前Firestore SubCollection的数据怎么处理?

  • 基于MongoDB没有subcollection的特性,这里我们是将subcollection的数据通过User ID与维度ID做的sharding key,新创建一个维度表进行存储
流处理部分

流处理Flink DAG

V1的架构中,我们的处理模型是一个wokrer处理所有的数据在分别写Firestore collection里的documents。

在V2的架构里,我们从Dataflow迁移到了Flink的集群,所以在operator的拆分上也做了一些优化,即读取原始数据后写入不同的flatmap算子给到不用维度的数据使用,在分别聚合和sink。

这么做的好处是我们在基于flink自身check points和自己做savepoints时可以更细粒度的做恢复,且之前dataflow里如果要更新集群就需要drain dataflow,屏蔽source读取,把已有的数据处理完,再开启新的集群处理新的数据。

迁移与删除的容错与优化

搭建完系统后,两套系统同时在线进行双写,之后再开始数据迁移的工作,这里我们的做法还是覆盖overview数据并记录迁移时间,在API层通过全局的overview已聚合数据再聚合迁移时间之后产生的数据。

所以迁移过程没有那么复杂,需要注意的是迁移过程因为也是跨洋迁移,做了以下几点优化:

  1. 为了防止网络超时出现数据缺失,mongo的endpoint是通过香港节点作为4层代理进行写入
  2. 迁移程序同样需要retry和冷却逻辑,这里记得retry成功后需要更新迁移时间

踩了什么大坑(持续更新)

  1. Firestore数据的删除
  • Firestore写入和查询用起来都很方便,但是在数据删除上坑很大,第一是没有console能一键删除整个db;第二是console上提供了删除collectin的功能,但是实现的逻辑是遍历collection的doc一个个删除,还要保持在线,稳定性差,耗时也很长;第三删除之后的doc index还会存在console上,因此无法判断是删除成功还是失败。

是实现的逻辑是遍历collection的doc一个个删除,还要保持在线,稳定性差,耗时也很长;第三删除之后的doc index还会存在console上,因此无法判断是删除成功还是失败。

  1. 跨洋链路不太稳定,建议是所有的跨洋通信都需要加上hk节点的代理,重试和冷却逻辑也必不可少。
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐