导读 本文将分享丁香园大数据基于 Apache Kyuubi / Celeborn 的实践。
主要包括两大部分:
1. Apache Kyuubi
2. Apache Celeborn(Incubating)
分享嘉宾|陈福 丁香园 大数据基础平台负责人
编辑整理|夏明月
内容校对|李瑶
出品社区|DataFun
01Apache Kyuubi首先来介绍 Apache Kyuubi 的实践。
1. Kyuubi 总览
引入 Apache Kyuubi 是为了统一 Spark 程序入口,下图是 Kyuubi 的整体架构。Kyuubi 最吸引我们的特性包括兼容 Hive Beeline 和 RESTful API,支持多租户隔离,配合不同的 Share Level 使用,可以有效提升 Yarn 资源利用率,并且为 Spark 提供了很多插件,例如 Z-Order 优化、小文件合并、血缘、审计等。
2. 更灵活的灰度任务配置
在生产环境中,我们希望长时间运行的任务采用 Connection 级别的 share level 独享 Spark 应用资源执行任务,而对于小任务,为了减少 Spark submit 的等待时间,使用共享的 User Level 执行任务。Kyuubi 原生提供了四种解决方案。
- 方案一,在 kyuubi-default.conf 或者 spark-default.conf 进行全局配置。
- 方案二,使用客户端启动 JDBC URL 里面的后缀配置需要启动的参数,如下案例中设置 spark.sql.shuffle.partitions=2;spark.executor.memory =5g 启动一个 session,需要注意的是Spark是区分动态配置和静态配置的,其中spark.executor.memory 配置在 user 模式下,并且在已有 Engine 情况下配置是不生效的。
- 方案三,使用 Set 语法进行运行时配置,如 spark.sql.shuffle.partitions 之类的配置。
- 方案四,Kyuubi server 端提供了修改 session conf 的插件org.apache.kyuubi.plugin.SessionConfAdvisor。
简单总结一下,Kyuubi 的配置可以大致分为三类,第一部分是 Kyuubi 控制Spark Engine 启动行为的参数,例如 kyuubi.engine.pool.size,第二部分是Spark Engine 启动静态参数,例如 spark.executor.memory,第三部分是Spark Engine 的运行时动态参数,例如 spark.sql.shuffle.partitions。虽然上面四种方案都可以修改这些参数,但是不够灵活。在实际应用中我们希望不用修改Client 端参数,在 Server 端完成适配。因此引入了 Apollo 实现配置管理。如下图中右侧的两个例子,第一个是灰度 Celeborn,第二个是灰度 Spark 3.5。
3. Kyuubi 的插件 AuthZ
Kyuubi 的权限插件 AuthZ 提供了三个功能。
- 功能一,表列级别细粒度权限管控;
- 功能二,行级别的细粒度权限管控;
- 功能三,数据脱敏。
Spark Catalyst 的执行过程如下图所示。Spark 获取 SQL 进行解析,将 SQL、DataFrame、DataSet 转化为 Unresolved Logical Plan,再结合 Catalog 进行 Analyzer 生成 Logical Plan,通过 Optimized逻辑优化生成Optimized Logical Plan,通过物理计划 Planner 生成 Physical Plan,最终做 Query Execution 完成 SQL 计划的执行。Spark Catalyst 提供了在各个阶段插入用户自定义规则的接口,AuthZ 的表和列权限就是在 Optimizer 阶段插入规则实现的,除此之外,行权限控制和数据脱敏,则主要是在 Analyzer 阶段完成的。值得说明的是,目前 AuthZ 插件默认从 Ranger 中获取鉴权信息,Kyuubi 社区也正在考虑将权限校验接口抽象出接口,方便用户自己对接内部权限插件系统。
接下来看一下 AuthZ 插件存在的问题及处理方式。
- 问题一,Spark 支持 select on files 查询语句,且 Spark Catalyst 插件提供了运行时禁用 Optimizer 规则配置,因此存在使用 Spark SQL 绕过权限控制的风险,Kyuubi 针对这个问题提出了 check 规则防止用户修改动态配置,但是这个配置默认是关闭的,需要用户手动开启。
- 问题二,目前只有 ExecuteStatement 是支持授权的。针对开源的 BI 工具,比如 Hue 获取用户库表、列表时是通过 GetCatalogs、GetTables API 获取的,因此在获取库表列表上存在权限管控不住的风险。
- 问题三,对 Scala 和 Python 脚本无法支持权限管控。对于安全要求比较高的公司,建议把 Scala 和 Python 脚本语言禁掉,或者只对 admin 开放。
在没有 AuthZ 插件之前依赖存储权限进行管理,比如用户 Tom 启动了 Spark Engine 对 table_a的 HDFS 存储路径设置访问权限,则能访问 table_a,对table_b 的 HDFS 存储路径没有设置访问权限,则无法访问 table_b。有了 AuthZ 后,可以在启动 Spark Engine 时利用统一的超级用户,比如 HUE 用户对 warehouse 所有目录都有读写的权限,客户在提交时首先将 Session User 传递过来,然后在 AuthZ 阶段进行判断。这样做的好处是业务层可以更方便地控制自己的权限逻辑。
4. 小文件优化
Spark AQE 解决了 join 倾斜的问题。其基础思想是在 Spark SQL 执行过程中获取统计信息后进行小文件合并优化。如下图左边所示,在没有 AQE 情况下执行 Map Reduce 任务,三个 Map 产生了三个 Reduce,其中第一个 Reduce 的结果是过大的,第二个和第三个 Reduce 结果过小。如下图右所示,使用 AQE 后,同样是三个 Map 产生了三个 Reduce,其中第一个过大的 Reduce 1 就被拆成了 Reduce 1-0 和Reduce 1-1,两个过小的 Reduce 任务被合并成一个。Kyuubi 正是利用了 AQE 的这一特点进行小文件优化。针对没有 Shuffle 任务的SQL 任务,Kyuubi 会强制插入一个 Shuffle 任务进行一次 Shuffle 操作进行小文件合并和倾斜分区拆分。
5. Z-Order 优化
Z-Order 与线性排序对比。下图左边为线性排序,右边为 Z-Order 排序,同样针对 x=2,y=3 点的查询请求,左边需要扫描 9 个文件,右边只需要扫描 7 个文件,因此,通过 Z-Order 排序可以有效提升扫描效率。
Z-Order 的一个核心点是如何计算 z-value。Z-Order 将多维数据通过某种方式映射为一维数据,并且这个一维数据还能保证整个数据的多个维度的聚合性,而不像线性排序先看第一维,再看第二维、第三维的方式。此映射方式采用交叉组值形式。例如,对 x =19,y =47 的点计算 z-value,最终通过交叉组值实现,后面不够位数的进行补零操作,最终计算得 z-value=2479。计算 z-value,当前开源项目中主要有三种实现方案。
- 方案一,直接将数据强制转化为 binary 数据,目前 Kyuubi 就是这样实现的,该方案存在 String 类型执行效率低下问题,比如对一列都是以 http 开头的String 类型数据进行过滤,效率将会下降。
- 方案二,进行一次全局排序后,使用 row_number 计算它 z-value,该方案需要对每个维度做一次全局排序,因此 overhead 是非常大的,在一般任务中无法接受。
- 方案三,如下图左侧所示,我们并不需要知道每个点真正在 value 中的映射关系,只需要采样出 partition_id,使用 partition_id 进行 z-value 计算,本方案为 Delta 的实现方案。
Kyuubi 提供了两种 Z-Order 优化方式。第一种是文件内的 Z-Order 排序,第二种是全局 Z-Order 排序。全局 Z-Order 排序存在数据倾斜的问题,即每执行一次全局 Z-Order 的排序就是执行一次 Partition by Range 操作,分区条件为 z-value,如果一个 Range 中重复值非常多就可能存在倾斜。解决这个问题的方式是通过 z-value 分区条件增加一个随机维度列,将倾斜的分区打散。目前我们内部实现了这种优化方案,但是在开源 Kyuubi 中并没有实现这个优化。实际效果如下图所示,采用 3TB TPCDS 的 store_returns 这张表,在没有合并时候是 174 GB、2800 个文件,平均每个文件是 63 MB,总共有 28 亿条数据。通过 Z-Order 排序后的维度分别是 6500 万和 3200 万。由于 Z-Order 的排序过程成本非常高,因此实际生产中往往结合小文件合并一起优化使用。我们以 384 MB 为目标文件大小进行实验。
- 实验一,仅做了小文件合并的优化,优化结果是 174.0 GB,产生了 683 个文件;
- 实验二,文件内 Z-Order 排序,排序后是 147.1 GB;
- 实验三,全局 Z-Order 排序,排序结果是 156.4 GB,其中最大文件是 1.9 GB,很明显发生了数据倾斜
- 实验四,全局 Z-Order+noise 排序,noise 为随机列的操作,优化后文件大小是 156.4GB。
通过实验发现,Z-Order 排序可以有效地提升压缩率、减少存储空间的使用。
再来看一下 Z-Order 对查询性能的提升。如下图所示,对一个点查的查询条件列进行分析,在仅做小文件合并查询时需要扫描到 52 万条数据,在小文件合并+文件内部z-order 时需要扫描 4 万条数据,小文件合并+全局 Z-Order+noise 的场景下只需要扫描 2 万条数据,因此将 Z-Order 结合 Min/Max 过滤可以有效地提升 scan 的效率。
我们还和 Delta+Z-Order 进行了查询对比,如下图所示,左边是小文件合并+ Z-Order + noise 优化过后的结果,右边是 Delta + Z-Order 的优化结果,同样都是扫描 2 万条数据。由于 Delta 存在 File Skipping 操作,因此只需要扫描 1 个文件,并且扫描的文件数也从 156GB 降低到 300 MB,进而极大地提升了查询速度。对数据使用没有历史包袱的用户,如果可以用数据湖,建议都迁移到数据湖上去,比如 Delta、Iceberg、Hudi 等。
下面分享一下 Z-Order 在丁香园内部实际上线的效果。上面提及的配置都可以在正常的任务中添加,在最终写入文件前进行小文件和 Z-Order 优化。这里需要注意的是,任务运行时和最终写入文件时所需的参数往往是不一样的,针对这一问题,Kyuubi 提出了 Final Stage Config Isolation 最终配置隔离解决方案。为了降低配置复杂度及 Z-Order 排序对任务的额外开销,我们采用了 Job Level Config Isolation 方式,即 T + 1的方式,同时配合 Connection Share Level 去进行 Optimize 操作。由于是异步进行 Optimize 操作,需要在读取表本身后通过 overwrite 的方式实现重写,由于 Spark datasouce 不支持 overwrite 操作,我们最终选择使用 Hive API,但是 Spark 访问 Hive API 是同步操作,导致运行性能下降。实际上线时由于优化时历史文件不统一,我们还做了数据统一的工作。将 RCFile 转化为 Parquet + Z-Order ,最大的单表数据量减少了 90%,减少了 2TB 的存储空间,将 ORCFile 转化为 Parquet + Z-Order,最大单表数据量减少 70%,减少了 2TB 的存储空间。
以上都是 Z-Order 的优点,但 Z-Order 也存在着一些缺点。首先,不合理的维度设置,引发无效的排序工作,导致查询性能下降,比如选一个 distinct count 特别小的维度进行 Z-Order 优化;其次,参与计算维度越多,过滤效果越差,这个是 Z-Order 的一个通病;最后,查询维度变更时,需要重新写数据,否则 skip 效率会变低。
6. Connection 级别的问题
上文提及 Connection 级别执行 Optimize 任务,下面来看一下 Connection 级别的问题。为了保护 Kyuubi Server 节点,通常利用 Cluster 模式提交任务,并让 Spark submit 进程尽可能快地退出,但是在 Connection 高并发的情况下 overhead 是不可能被忽略的,因此在监控图表中显示,在 Optimizer 任务过来的时候 CPU 和 Memory 会飙升。我们内部的解决方案是将 overhead,即Kyuubi Server k8s pod 的 cpu/memory limit 设置大一点,由客户端对并发数进行控制。社区也有针对这一问题的讨论,链接为 https://github.com/apache/kyuubi/issues/4825,有用户还尝试了其他的方案提交 Spark 任务,具体细节大家可以去浏览相应的 issue。
7. Arrow 大结果集合传输优化
Kyuubi 在 1.7.0 版本引入了 Arrow 序列化做大结果集传输的优化。通过 SET kyuubi.operation.result.format=arrow 进行启用,该配置为 operation 级别的配置,意味着每次操作 operation 都可以切换 format。
Thrift 传输过程如下。Kyuubi JDBC Driver 把请求发送给 Kyuubi server;Kyuubi Server 透传给后端的 Spark Driver;Spark Driver 接收到请求之后开始执行 Spark.sql(SQL);Spark Executor 在相关的 SQL task 执行完成后进行转 byte 数组操作;Spark Driver collect 各个 Executor 上的 binary 数据,需要注意的是,此处 Spark Driver 会将全量数据结果集保存到内存一份;collect 完之后再将 byte 数组转为一个 InternalRow,再将 InternalRow 转换为可以访问的Row结构;由于需要返回给客户端,因此需要使用Thrift 协议,我们把它封装成了TRowSet 并返回给 Kyuubi Server;Kyuubi Server 流式透传给 Kyuubi JDBC Driver;JDBC Driver 将 TRowSet 并转为 Java 可访问的对象,并进行后续操作。
Arrow 传输流程如下。Kyuubi JDBC Driver 把请求发送给 Kyuubi server;Kyuubi Server 透传给后端 Spark Driver;Spark Driver 接收到请求后执行spark.sql(sql);与 Thrift 传输不同的是 Spark Executor 在 sql task 执行完成后进行转 Arrow Batch 操作;Spark Driver collect 各个 Executor 上的 binary数据;Spark Driver 将 byte 数组放到 thrift binary 类型容器当中,需要强调的是这里并没有 codec 操作,然后返回给 Kyuubi Server;Kyuubi Server 流式透传给 Kyuubi JDBC Driver;JDBC Driver 完成 Arrow binary 到 Java 可访问对象的转换,并进行后续操作。
下图展示了 Thrift 和 Arrow 的性能对比。为了更好地分析 Arrow 带来的优化效果,采用 12GB 堆内存、2GB MaxDirectMemorySize,Engine 端 Spark driver 内存 32GB、每个 executor 设置 4G 堆内存、6G overhead 进行测试,在 100 万条数据场景下 Arrow 有 1 倍的性能提升,在 1000 万条测场景下有 2 倍的性能提升,而在 5000 万的场景下 Thrift 已经无法运行了。具体原因有两个,其一 Arrow 零拷贝机制,即 Arrow 基于 Java NIO 的 kernel buffe 实现了零拷贝,而 Thrift 需要进行多次的序列化和反序列化,比如 Engine 端往 JDBC 端去返回时,需要先转成 Thrift 的数据结构,然后在客户端进行反序列化;其二,序列化时机不同,Thrift 序列化是在 Driver 端进行的,Arrow 是在 executor 端进行的。
目前 Kyuubi Arrow 序列化传输实现是没有进行压缩的,如果再加上压缩功能,性能会有进一步的提升。然而 Arrow 并不是解决超大结果集最终的解决方案,它只是降低了资源的使用。目前在 Kyuubi 编程大赛中也有相关的 task 去解决超大结果集传输的方案。由于性能瓶颈是在 Spark Driver 阶段需要把数据都 collect 到 Spark Driver,因此数据不往 Spark driver collect,而是写入到 HDFS 中,再从 HDFS 流式透传给 Client 端。Kyuubi Arrow 传输目前存在两个已知的问题。
- 问题一,Kyuubi Server 端的 Arrow batch 塞到了一个字段里,导致 fetchResultsCount 的 metrics 不准;
- 问题二,limit order by 查询会触发额外的 shuffle 操作,但这并不一定是一个坏事,虽然会影响查询性能,但是可以把 Spark Driver 的内存使用率降下来。
02
Apache Celeborn (Incubating)
1. Celeborn VS External Shuffle Service
引入 Celeborn 的目的是为了更弹性地完成 ETL 任务。在实际测试过程中发现 Celeborn 对比 ESS,在 1TB 数据时有 7% 的性能提升,社区目前普遍的提升是20%。实现提升的核心点在于异步化,即异步推送、异步刷盘、异步 Commit、异步 Fetch 等。
2. Pipeline
提到了异步,就需要了解一下什么是 pipeline。以 Spark 为例,Pipeline 功能是指将 Spark task 执行完成后推送到 Celeborn Client 端实现异步化的过程。实际测试中,在磁盘 IO 没有被打满的情况下,大概有 20% ~ 30% 的性能提升。但是 Pipeline 功能有一个致命缺陷,由于推送是异步的,可能会导致非法的内存访问,具体在 PR 里面有详细的描述。
3. 禁用 Netty Cache
在社区中很多小伙伴都遇到了 Netty Cache 的问题。由于 Netty OOM 后,没办法释放内存,一直处于 Trim cache 阶段。在深入 debug 之后发现是由 Netty Cache 引起的。引用 Netty Cache 后,从监控页面中可以看到常驻内存从 1GB 降到了 200MB~300MB。
4. 其他特性
Celeborn 社区正在增加一些重要的特性。第一个是 Stage 重算的功能,这也是大家都比较关注的一个功能,即在 worker 突然挂掉的情况下,当前的 Celeborn 会将整个 Spark 任务都干掉,这对于长时运行任务的用户是很难接受的。第二个特性是在 release 0.3.1 版本中已经有的功能,即 Celeborn worker 与 Spark 跑在同一集群中时,可以实现本地直接读取 shuffle 文件,避免经过Celeborn worker,从而显著降低 Connection 数。第三个特性是 Celeborn 的生态目前已经支持了 Hadoop 的 MapReduce。第四个特性是 memory storage ,在当前的 Celeborn 实现中,不管是本地盘数据还是 HDFS,最终数据都会落到盘上。而有了 storage manage 后,可以在内存中完成整个流程,该功能目前还在 proposal 阶段。第五个特性是国外用户比较关注的,身份认证功能。最后两个特性是对 Scala 2.13 和 JDK 17 的支持,目前 Spark 4.0 已经移除了 JDK 8、JDK 11 的支持,并把默认的 Scala 版本变成了 2.13,所以 Celeborn 社区也是紧跟其他社区的节奏,加上了这两个的支持。
以上就是本次分享的内容,谢谢大家。
分享嘉宾
INTRODUCTION
陈福
丁香园
大数据基础平台负责人
Apache Kyuubi PMC Member / Apache Celeborn (Incubating) Committer。
活动推荐
往期推荐
AB实验的采样分流技术演进以及Sutva假设与现实挑战
DataOps+大模型促进数据工程创新
大语言模型在推荐系统中的探索与应用
从0到1:广告营销多智能体架构落地全攻略
Agent+RAG:大模型真实应用场景落地探索
分布式 Data Warebase - 让数据涌现智能
火山引擎基于 DataLeap 的电商指标管理实践
聚焦电商场景,详解抖音集团埋点及归因分析方案
金融场景中的指标体系建设与应用
指标归因在互联网平台的应用
点个在看你最好看
SPRING HAS ARRIVED