导读数据湖作为一个统一存储池,可接入多种方式的数据输入,无缝对接多种计算分析引擎,进行高效的数据处理与分析。本文将介绍数据湖上的选型思考与探索实践。
主要内容包括一下四个部分:1. 总体架构2. 入湖方案选型3. 实时入湖优化4. 数据湖上的查询分享嘉宾|陈晶 纵腾集团 高级大数据开发编辑整理|吕宴全内容校对|李瑶出品社区|DataFun02
入湖方案选型
数据入湖方案设计上,我们比较了三种入湖的实现思路。1. 入湖方案一如下图所示,包含了两条支线:2. 入湖方案二
这一方案是在前一个方案分支二的基础上进行了一定的改进,通过 Dinky 完成整库数据同步,其优点是同源数据合并成一个 source 节点,减轻源库压力,根据 schema、database、table 分流 sink 到对应表。其缺点是不支持 schema 演变,表结构变更须重新导数。如下图所示,mysql_biz 库中有3张表,从 flink dag 图看到 mysql cdc source 分3条流 sink 到 Hudi 的3张表。3. 入湖方案三
主要流程如下图所示。其主要优点是支持 Schema 演变。Schema 变更的信息由 Debezium 注册到 Confluence Schema Registry,schema change 的信息通过 DeltaStreamer 执行任务变更到 Hudi,使得任务执行过程中不需要重新拉起。其主要缺点是依赖于 Spark 计算引擎,而我们部门主要用 Flink,当然,这会因各个公司实际情况而不同。下图分别是 Yarn 的 deltastreamer 任务, Kafka schema-change topic 的 DML message 和 Hudi 表变更后的数据。4. 入湖方案总结在方案选型时,可以根据下面的流程图进行比较选择:(1) 先看计算框架是 Spark 还是Flink,如果是Spark 则选择方案三,即 Deltastreamer,这一方案适用于表结构变更频繁,重新拉取代价高,主要技术栈是Spark 的情况。
(2) 如果是 Flink,再看数据量是否较少,以及表结构是否较稳定,如果是的话,选择方案二,Dinky 整库同步方案支持表名过滤,适用数据量较少且表结构较稳定的表。
(3) 如果否,再考虑 mysql 能否抗较大压力,如果否,那么选择方案一下分支,即 Kafka Connect,Debezium 拉取发送 Kafka,从 Kafka 读取后写 Hudi。适用数据量较大的多张表。
(4) 如果是,则选择方案一上分支,即 Flink SQL mysql-cdc 写 Hudi,适用于对实时稳定要求高于资源敏感的重要业务场景。
03
实时入湖优化
我们的入湖场景是 Flink Stream API 读取Pulsar 写 Hudi MOR 表,特点是数据量大,并且源端的每条消息都只包含了部分的列数据。我们通过使用 Hudi 的 MOR 表格式和 PartialUpdateAvroPayload 实现了这个需求。使用 Hudi 的 MOR 格式,是因为 COW 的写放大问题,不适合数据量大的实时场景,而 MOR 是增量数据写行存 Avro 格式log,通过在线或离线方式压缩合并至列存格式 parquet。在保证写效率的同时也兼顾了查询的性能。不过需要通过合并任务定期地对数据进行合并处理,这是引入复杂度的地方。以下面这张图为例,recordKey 是 ID1 的3条 msg,每条分别包含一个列值,其余字段为空,按 ts 列 precombine,当 ts3 > ts2 > ts1时,最终 Hudi 存的 ID1 行的值是 v1,v2,v3,ts3。 此入湖场景痛点包括,MOR 表索引选择不当,压缩异常导致越写越慢,直至 checkpoint 超时,某分区存在重复文件导致写任务出错,MOR 表某个压缩计划 pending阻碍此 bucket 的压缩及后续的压缩计划生成,以及如何平衡效率与资源等。我们在实践过程中针对一些痛点实施了相应的解决方案。(1) Hudi 表索引类型选择不当,导致越写越慢至 CK 超时,这是因为 Bucket 索引通过 hash 映射 recordKey 到 fileGroup。而 Bloom 索引是保存 recordKey 和 partition、fileGroup 值来实现,因此 checkpoint size 会随数据量的增加而增长。Bloom Filter 索引基于布隆过滤器实现,索引信息存储在 parquet 的 footer 中,Bloom 的假阳性问题也会导致更新越来越慢,假阳性是指只能判断数据一定不在某个文件而不能保证数据一定在某个文件,因此存在多个文件都可能存在某条数据,即须读取多个文件才能准确判断。我们做的优化是使用 Bucket 索引代替 Bloom 索引,Hudi 目前也支持了可以动态扩容的 Bucket 参数。(2) MOR 表压缩执行异常,具体来说有以下三个场景:
单 log 超过1G,使写延迟提高,导致越写越慢至 checkpoint 超时,checkpoint 端到端耗时增长至3-6分钟;
在 inline schedule 的压缩模式下,offline execute 出现报错:log文件不存在;
Compaction 一直处于 Infight 状态,即进行中,不能完成;同时存在无效 compaction,既不能被压缩,也不能被取消。
(3) MOR 表某分区存在重复文件,导致写任务出错。出现这个问题的原因是某个 instant 已写 log 文件但未成功提交到 timeline 时,发生异常重启后未 rollback 这个 instant,即未清理已有 log,继续写此 instant 则有重复。
我们做的优化是在遇到重复文件时,通过 Hudi-Cli 执行去重任务,再恢复执行。具体来说,需要拆分成以下四个步骤:
停止当前的 Flink 任务
通过 Hudi-cli 执行去重命令
repair deduplicate --duplicatedPartitionPath 20220604 --repairedOutputPath hdfs:///hudi/hudi_tis.db/track_detail_3_repair/20220604 --dedupeType upsert_type --sparkMaster local
删除 partition 文件,修复文件移到原分区
重新启动 Flink 任务
经过多次的修改和验证,我们的入湖任务在性能和稳定性上取得了明显的改善。在稳定性上,做到了在十几天内任务无异常。在时延上,做到了分钟级别的 checkpoint 和数据可见。在资源使用上,对 Hadoop YARN 资源的占用明显减少。
下图总结了我们对实时入湖做的参数优化方案,包括:
索引选择GLOBAL_BLOOM-> BUCKET_INDEX #Bucket索引较布隆索引写吞吐性能高
BUCKET_NUM 20 #Bucket数量:根据单分区数据量评估,保证File Slice2GB,平衡读写性能
Flink增量checkpoint:Rockdb #Flink ck存储,rockdb支持增量ck,减少单ck数据量,提高写吞吐
Yarn资源:
jobmanager 5G #Flink jobmanager内存,减少oom,保证稳定
taskmanager 50G 20S #Flink taskmanager内存与slot数,slot与并发度、bucket数一致
write.rate.limit 30000 #写速度限制,过载保护,保证作业稳定运行
write.max.size 2560 #写用到最大内存,于taskmanager每个slot内存一致
write.batch.size 512 #批量写,适量调大减少刷盘频率
compaction.max.memory 2048 #压缩用到的最大内存,适量调大提升压缩速度
compaction.trigger.strategy num_and_time #压缩策略 增量提交个数或时间达标触发生成压缩策略
compaction.delta_seconds 30 #压缩策略之时间,减少时间间隔,减少单个压缩文件数
compaction.delta_commits 2 #压缩策略之增量提交个数,减少个数,减少单个压缩文件数
04
数据湖上的查询
在引入 Kyuubi 前,我们通过 JDBC、Beeline、Spark Client、Flink Client 等客户端访问服务层执行查询,没有统一入口,多个平台不互通,多账号权限体系。用户的痛点是跨多平台开发体验差,低效率。平台层的痛点是问题定位运维复杂,存在资源浪费。在引入 Kyuubi 后,我们基于社区版 Kyuubi 做了一定的改造,包括 JDBC 引擎开发、JDBC 引擎 Ranger 鉴权开发、BI、JDBC 客户端元数据适配修改、Spark 引擎大结果集存 HDFS、支持导数开发、JDBC 引擎 SQL 拦截控流开发等,实现了统一数据服务入口,做到了统一认证权限管理和统一易用原则。下图展示了 Kyuubi 的架构和权限管控:Kyuubi 查询流程是:客户端请求通过 LDAP 认证后,连接 Kyuubi Server 生成 Kyuubi session,之后 Kyuubi server 根据连接用户以及用户隔离级别路由到已经启动的 engine 或启动一个新的 engine。Spark 引擎会先申请 container 运行 AppMaster,后申请 container 运行 executor 执行 task。Flink 引擎会完成 StreamGraph 至 JobGraph 至 executionGraph 构建并通过 Jobmanager 和 taskmanager 运行。其中 engine 端 RangerPlungin 会在 SQL 解析后拉取 RangerAdmin 由用户配置的策略进行鉴权。RangerAdmin 完成用户同步,策略刷新等。
Kyuubi on Flink 跨库查询的目的是尝试基于 Flink实现流批一体,支持跨数据源导数 SQL 化。我们的实现方案是通过 Flink Metadata Catalog Connector 的开发,即基于元数据系统以统一 datasource.db.table 的格式查询所有数据源,且让用户免于自定义 DDL。其中元数据采集是用 datahub 的 ingestion framework 采集各种数据源的元数据,并生成对应 Flink 表属性。Flink 端是扩展 AbstractCatalog 查询 metadata DB,实现 CatalogFactory 接口。
其基本流程如下图所示:
完整流程是1 发起采集请求2和3是采集服务调 Datahub ingestion framework 完成元数据采集并写到 metadata DB 同时写 Flink 表属性。4是 用户发送 SQL 到 Kyuubi server 5是 Kyuubi server 发送 SQL 到 Flink engine 6和7是 Flink metadata catalog 会读取 metadata DB 根据 Flink 表属性读取对应数据源。Kyuubi on JDBC Doris 可以通过外表查询 Hudi,但在 Doris 1.2 版本,仍然有一定的限制,Hudi 目前仅支持Copy On Write 表的 Snapshot Query,以及 Merge On Read 表的 Read Optimized Query。后续将支持 Incremental Query 和 Merge On Read 表的 Snapshot Query。Doris 的架构示意和其基本使用流程如下图所示: 今天的分享就到这里,谢谢大家。分享嘉宾
INTRODUCTION
陈晶
纵腾集团
高级大数据开发
限时免费资料
往期优质文章推荐
往期推荐
知乎的缓存加速:Presto的进化实战(长文解读)
阿里巴巴数据模型设计与构建实践
B站数据质量保障体系建设与实践
AI基础软件:如何自主构建大+小模型?
数据湖与实时数仓应用实践
探索大模型技术在自智网络方向的应用前景(推荐收藏)
广告策略系统设计(文末赠书)
轻松利用日志动态分析平台玩转Nginx运维管理
九章云极DataCanvas多模态大模型平台的实践和思考
开源数据库 MatrixOne 的 HTAP 分布式架构演进
Abase2: NoSQL数据库中的CRDT支持实践
国产开源湖仓LakeSoul--数据智能的未来方向
网易大数据智能运维平台 EasyEagle
(一文读懂大数据行业)-面向百度商业数据产品的全流程 DataOps 实践
关注我们获取更多信息......