本文根据作者于Arctic开源发布会演讲内容整理,系统解读Arctic项目研发初衷、生态定位、核心特性、性能表现及未来规划。
数据湖Tableformat之争
先看目前ApacheHudi、ApacheIceberg、Delta这几个主流的开源Tableformat的选型。
Tableformat这个概念最早由Iceberg提出,现在行业对它的理解主要有两点。第一点是Tableformat定义了哪些文件可以构成一张表,这样ApacheFlink、ApacheSpark、Trino、ApacheImpala等任何引擎都可以根据Tableformat去查询、检索数据。第二点就是Tableformat规范了数据和文件的分布方式,任何引擎写入数据都要遵照这个标准,通过format定义的标准来支持以前Hive不支持的ACID和模式演进。我们看到Iceberg、Delta和Hudi在这些功能上基本上是拉平的,虽然他们在各自实现上有很大不同,但抽象他们的共性个人认为是非常有意义的事情。
新的Iceberg、Delta、Hudi所主导的Tableformat能力中,增加了一个快照的概念,表的元数据不再是一个简单的表和文件的关系,变成了表和快照以及快照和文件的关系,数据的每次写入会产生新的快照,而这个快照和文件产生一个动态的映射关系,这样它能实现每次写入ACID的保障,也能通过快照的隔离实现用户的多读多写。而且基于快照也能给上层提供一些比较有意思的功能,比如说可以基于快照的增量写入实现增量读,也就是CDC的功能,可以通过快照去支持回溯,例如我们在timetravel或者数据的rollback。
总结下来Tableformat有四点核心特性。
第结构自由。像之前的Hive只能支持简单的加列操作,而在Delta、Iceberg这样的Tableformat之上用户可以自由地更改表的结构,可以加列、减列、改列,而且对数据的迁移和变更不会有要求。
第读写自由。因为它通过快照能够保证数据的ACID,任何实时、离线以及AI的需求都可以自由地往这个表里面写数据或者读数据。
第流批同源。因为Tableformat核心的一个功能是可以很好地支持流场景,我们的批和流都可以往新的Tableformat去写和读。
第引擎平权。这点非常重要,它不能只是绑定某一个引擎,比如说像Delta在0时代是Spark生态中的一个组件,在一个月之前Delta0的发布再次向我们证明了去适配多个引擎的重要性。
在Tableformat这些项目的官网中,他们会主推一些功能主要包含CDC、SQL扩展,数据的Rollback,以及timetravel,模式演进以及我们经常说的流式更新、读时合并的功能。
CDC一定程度上能起到平替消息队列的作用,比如说在生产场景中,实时计算主要会用Kafka或者Pulsar做流表的选型。有了Tableformat之后,我们可以基于数据湖来实现类似于消息队列的功能,当然它的数据延迟会从毫秒或者秒级降级为分钟级别。像Upsert、读时合并和行业内或者说很多公司去推广数据湖的主要场景,就是拿这个实时更新以及读时合并去平替ApacheKudu、Doris、Greenplum这些实时更新的数仓系统。
企业需要怎样的数据湖
在过去两年中,我们跟行业内很多公司交流,大体上都是在这样一种矛盾之中挣扎,想用数据湖的新技术来替代一些其他方案,对业务的吸引力是非常不足的。我们的数据湖或者Lakehouse的技术究竟能给企业带来怎样的价值?
在我们的生产场景中,我们的整个数据平台体系在2020年遇到最主要的问题,就是流批平台割裂。大家知道我们围绕Hive这套离线数仓已经产生了非常丰富的方法论,从数据模型、数据资产到数据质量,基于数据湖的开放架构我们产生了非常好的一套规范、标准以及治理体系。
但是我们把目光切换到实时场景下,目前主要用Flink做实时计算,用Kafka作为流表的选型,当我们做流表join时可能单独需要从数据库那边拉一个实时同步的任务,后面如果我们做数据分析,需要比较高的数据新鲜度,需要引入Kudu或者Doris这些能够准实时或者实时更新的数仓系统。
这套东西和我们离线整套的技术选型以及工具是非常割裂的,而且没有形成比较好的规范,主要还是点对点的开发为主。
举个例子,如果我们既要做实时也要做离线,就需要拉两套链路。离线的链路根据我们整套方法论,根据离线整个流程的工作流,是能比较容易规范地定义出一套出来。实时场景下我们更多需要开发者,需要用户自己去了解Flink怎么玩儿,Kafka怎么读,这个过程中序列化、反序列化怎么做,怎么在Kudu上建表,这一套规范给用户带来了非常大的负担。
总结一下传统Lambda架构给我们带来哪些弊端。
第一是数据孤岛的问题。如果我们使用Kudu或者其他跟数据湖割裂的一套数仓方案,会带来独立的采购和部署成本,会因为容易存储而浪费成本。因为数据之间难以复用和互通,如果我们在相同的业务场景下需要一个实时的数仓,可能需要从源头重新拉一份数据出来,导致成本和人效的浪费。
第二是研发人效低,研发体系割裂,研发规范不通用。这在AI特征、推荐的场景比较典型,需要用户自己去搞定什么时候调用实时的东西,什么时候调用离线的东西,会导致整个业务层非常复杂。
最后是指标和语义的二义性问题。比如过去几年我们主要是使用Kudu作为实时数仓方案,用户需要自己在Kudu里面建一个数仓表,会有Kudu的一套Schema,在Hive这边有一套通过数据模型创建的表,而这两套东西都需要用户自己去维护。当业务逻辑发生变更的时候,用户可能改了Hive但是没有改Kudu的,长久下来就会导致指标和语义的二义性问题。而且随着时间的推移,维护的成本会越来越高。
所以业务期望的是什么呢?其实是我们在平台层,在整个数据中台层或者在整套数据的方法论这一层,能够用一套规范、一套流程把实时和离线,以及AI等更多的场景统所以我们回过头来看Lakehouse这个概念创造出来的意义,就是拓展产品的边界,让数据湖能更多地服务于流的场景、AI的场景。
在我们的生产场景中,Lakehouse给业务最终带来的应当也是一个体系上的收益,而不在于说某一个功能上用了它。比如说我在CDC或者在分析的场景下用了,但是如果用户只是单纯地去比较Kudu和Hudi或者Iceberg之间的差异,他可能很难说到底带来什么样的收益;但是如果我们告诉用户说整套平台可以即插即用地把离线和实时全部统一掉,这个收益就很大了。基于这样一个目标,我们开发了流式湖仓服务Arctic这样一套系统。
理解Arctic流式湖仓服务
Arctic是什么呢?简单来说Arctic是由网易数帆开源的流式湖仓系统,它在lceberg和Hive之上增加了更多实时场景的能力,所以Arctic首先强调的是实时场景的能力,并且面向DataOps提供开箱即用的元数据服务,而且让数据湖更加好用和实用。我们用一句话概括会比较抽象,后面我们会用更多功能的举例以及我们一些干货上的分享,让大家深入理解Arctic到底是什么。
生态位差异
首先我们通过这张强调生态位的差异,Arctic从生态位上在Tableformat之上,所以严格意义上说我们不应该把Arctic当成是另外一套Iceberg或者另外一套Tableformat。
另外一点,我们在Tableformat之上,主要考虑跟开源的Tableformat做兼容,所以Arctic的一个核心目标是帮助企业用好数据湖的Tableformat,以及解决或者拉平在Tableformat以及用户,或者说产品真实的需求之间的gap。
Arctic本身包含两个核心组件,第一个是元数据服务AMS,它在我们系统中定位是下一代HMS的角色。第二个,我们持续自优化的功能,会有整套optimizer组件和机制,来实现后台数据优化。
Tablestore设计与优势
我们之前和很多用户聊过Arctic,大部分用户的第一个问题是我们跟开源的Iceberg具体是什么关系。通过这张我想来说明这点。首先在Arctic中有Tablestore这个概念,Tablestore是一个存储单元的定位,有点类似于传统数据库里面聚簇索引的概念。当流式写入的时候,我们会用一个change的Tablestore存储一个CDC写入的数据,这个数据有点类似于我们数据库中的binlog或者relog,后面这个changetable可以用于CDC的回放,也可以当作一个单独的表来访问。
Hudi、Iceberg也有upsert的功能,但2020年我们开始做这个事情的时候Iceberg还没有这个功能,社区出于对manifest这层设计的严谨考量在实现上会有一些妥协,所以最终我们决定了在上层去做这个事,并且会体现我们的一些优势。
Change表主要存储的是CDC的change数据,另外还有一套Basestore会存储我们的存量数据,两个Tablestore其实是两张独立的Iceberg表。另外我们还可选的集成Kafka的logstore,也就是说我们的数据可以双写,一部分先写到Kafka里面,再写进数据湖里,这样实现了流表和批表的统
这样设计有什么样的优势?首先change表里的CDC数据可以按顺序回放,会解决Iceberg原生的V2CDC不太好回放的问题。
第二个是change表可以开放访问。在很多电商、物流的场景里change数据不光是作为一个表内置的数据用,很多时候订单表、物流表的变更数据也会作为独立的数仓表来用,我们通过这样的设计允许把change表单独拎出来用,当然会添加一些写入保护。如果未来业务有一些定制化需求,比如说在change表中额外添加一些字段,添加一些业务自己的UDF的计算逻辑,这个设计也具备这样的可能。
Arctic架构和组件
理解了Tablestore的概念之后再来看Arctic的架构和组件,我们就会更加容易理解。在数据湖这一层我们有changefiles、basefiles,分别对应changestore和basestore。Tablestore的概念不仅可以用于CDC的场景,未来对于一些排序,对于ZOrder一些具体的需求同样可以采用上层架设独立的Tablestore来解决。
三元组是什么呢?就是catalog.table.db这样的三元组,大家知道在Spark0和Flink2之后,主推的是multicatalog这样的功能,它可以适配不同的数据源。目前我们在主流的大数据实践中更多的是把HMS当作元数据中心来使用,HMS是二元组的结构,如果我们想扩展,把HMS里面根据更多的数据源,需要做很多定制性的工作。比如说网易数帆有数平台其实是在HMS之外单独做了一个元数据中心,来管理三元组和数据源的关系,AMS本身就是面向三元组设计的元数据服务。
第二个我们的AMS可以和HMS做同步,就是我们的Schema可以存在HMS里面,除了Hive所能够存储的一些字段信息外,一些额外的组件信息,一些额外的properties会存在AMS中,这样AMS可以和HMS一起提供服务,所以业务不用担心在使用Arctic的时候一定要做一个替换,这其实是一个很灰度的过程。
第三个是AMS会提供事务和冲突解决的API。
在optimizer这儿,我们有一整套完整的扩展机制和管理机制。首先我们有一个optimizercontainer的概念,本质上是平台调度任务的组件,我们整个后台的optimize过程对业务来说是透明的,后台需要有一套调度服务,能够把optimize的进程调度到一个平台中,这种不同的模式就是optimizercontainer的概念,未来用户也可以通过container接口去扩展它的调度框架。
optimizergroup是在container内部做资源隔离,比如说用户觉得有一些表的optimize需要高优先级运行,可以给他抽出一个独立的optimizergroup执行他的优化任务。
第三点在我们架构中有单独的Dashboard,也是我们的一个管理界面,我们非常注重湖仓本身的管理体验。
最后一点也是非常重要的,刚才提过我们有Tableformat完全兼容的特性。目前提供两种,一个是Iceberg,因为我们是基于Iceberg来做的,basestore、changestore都是独立的Iceberg表,并且我们的兼容是随着Iceberg的迭代持续推进的,目前已经兼容IcebergV
另外我们也有Hive兼容的模式,能让业务在不用改代码的前提下,直接使用Arctic的一些主要功能,如果用户使用的是Hiveformat兼容,它的change数据还是存在Iceberg里面的。
管理功能
之前有提到Arctic非常注重管理体验,尤其对于我们后台持续优化的管理,有一套功能以及相对应的度量和标定的能力提供给大家。下中所展现的,哪些表正在optimizing用到的资源、持续的时间,未来应该怎样做一个更合理的资源调度,通过我们的管理功能都会给到大家。
我们的tableservice的功能,对于表有很多元数据的信息,包括每张表动态的变更,一些DDL的历史信息,事务的信息,都会在表服务中体现。
并发冲突解决
当我们采用了Tableformat去解决流批同源场景的时候,举个例子,比如下上半部分,我们在做一个数据的CDC同步,正常情况下是一个Flink任务去做持续的同步,但是如果我们想做数据回滚或者要做数据更正,比如说添加了一列,这一列有个默认值,需要我们通过批的方式把数值初始化一下,会起一个Spark任务和Flink同步去跑。这个时候如果Saprk任务和Flink任务操作到了同一行数据,这个数据的主键是一样的,就会遇到数据冲突的问题。
下半部分也是类似的,我们对一个数仓表、湖仓表进行了ad-hoc并发的更新c1和cc1在c2之后提交,但是c1在c2之前开始,当它们出现冲突之后是c1覆盖c还是c2覆盖c1?从目前数据湖方案来说,一般是谁后提交以谁为准,但是在更多的生产场景中我们需要谁先开始以谁为准。这一块时间关系就不展开,有任何疑问可以在用户群里与我们深入交流。
在性能方面Arctic也做了很多工作,我们目前是基于Iceberg的,Iceberg是非常灵活开放的Tableformat,它在partition之下没有考虑我的数据以及我的数据对应的更新,应该怎样更好地做映射来提升它的性能。
在Iceberg之上我们做了autobucketing的功能,这个功能跟Hudi中file_group的概念有些类似。不同的是我们没有给用户暴露file_group或者file_index这样的概念。我们在文件之上提供了分组的功能,是一种可扩展的方式,通过二叉树的扩展方式让每一个节点的数据量尽可能维持在用户配置的大小。比如说Iceberg默认配置128兆,我们通过后台的一整optimizing套机制,会尽可能维护每个node的大小向128兆靠拢。
当一个node数据超过这个范畴之后,我们会尝试把它分裂,之前也提到我们分了changestore和basestore,它们都是按照同样的方式管理,这样在每一个节点之上可以对应到change数据和base的数据,就能实现更精细的数据映射,数据分析的性能会有一个非常好的提升。
可以看到在merge-on-read的过程也可以用到整套机制。2000年左右伯克利有一篇论文专门描述这种方案,感兴趣的同学也可以自己去了解。
Arctic性能测试
CHbenchmark支持一个数据库既跑TPC-C也跑TPC-H。从下左边可以看到,有6张表是重合的,既在TPC-C中跑也在TPC-H中跑,有3张表是在TPC-C中引用,3张表只在TPC-H中引用。
基于这套方案我们做了一个改造,首先用TPC-C跑数据库,在下面我们再跑一个FlinkCDC任务,把数据库实时流式地同步到Arctic数据湖中,用Arctic数据湖构建一个分钟级别数据新鲜度的流式湖仓,在这之上我们再跑CHbenchmark中TPC-H的部分,这样能得到比较标准的流式湖仓的数据分析的性能。
针对optimize前后的Arctic、Iceberg和Hudi测试的结果,我们按阶段做了一个简单的对比,分为0-30分钟、30-60、60-90分钟和90-120分钟四组。下蓝色的部分是没有optimize的数据分析的性能,从0-30分钟,到最后的90-120分钟,延迟从20秒降低到了40多秒,降低了一半多。而黄色部分有持续合并的Arctic,性能稳定在20秒左右。
灰色的是原生的Icebergupsert方案,0-30分钟是在30秒左右,从30-60分钟性能是急剧下降的。为什么Iceberg出现了这么大的性能滑坡呢?因为原生Iceberg确实没有insert数据和delete数据的精细化的映射,所以当我们持续写入流式文件之后,每一个insertfile都会跟deletefile产生非常多的关联,从而导致我们在Trino中做merge-on-read的性能急剧下降。后面测60-90分钟、90-120分钟就直接OOM,跑不出来了。
黄色部分是Hudi。目前来看Arctic和Hudi一样,通过后台优化能够保证数据分析的性能,维持在一个比较平稳的数字。目前来看我们通过上层的优化,比Hudi要好一些。后续我们也会在官网中把我们整个测试流程和相关配置向大家公开。
Arctic目前看mor性能相比Hudi有一定优势,这里我们先不强调Arctic做得有多好我们也研究了Hudi,它有RO和RT两种模式,前者是只会读合并数据,RT模式是merge-on-read的一种模式。它的RO模式和RT模式性能差距非常大,所以未来可能会有很大的优化空间。
最后我们对Arcticroadmap以及整个系统做个简单的总结。Arctic是一个流式湖仓服务,提供分别对应streaming、lakehouse、service的核心特性。
streaming层面我们提供了主键的高效流式更新,我们有数据自分桶、结构自由化的能力,Spark、Trinomerge-on-read的功能,提供分钟级别新鲜度的数据分析。
在lakehouse层面我们做到格式的兼容,百分之百兼容lceberg和Hive的表格式语法,如果有一些功能是Arctic没有而Iceberg有的,用户只需要切换到Icebergcatalog,就能够把一张Arctic表当作Iceberg表来使用,并且我们提供了base和change两个表的访问方式。
引擎平权支持Spark和Flink读写数据,支持Trino和Impala查询数据,目前Impala主要是用到Hive的兼容特性,可以把Arctic表作为一个Hive做查询,从而支持Impala。
在service这一块主要强调管理上的功能:
第一个是支持将数据湖和消息队列封装成统一的表,实现流批表的统这样用户使用Arctic表不用担心从秒级或者毫秒级降低到分钟级别,他依然可以使用数据湖提供毫秒或者秒级的数据延迟的能力。
第二点提供流式湖仓标准化度量,dashboard和相关的管理工具。
第三点是解决并发写入冲突,实现事务一致性语义。
在管理层面我们聚焦回答下面这几个问题,这些工作还有很长的路要走。
第一个是表的实时性怎么量化,比如说我们搭建一个流式的湖仓表之后,当前的新鲜度是一分钟、两分钟还是多少,是否达到了用户的预期。
第二个怎样在时效性、成本、性能之间给用户提供tradeoff方案。
第三个查询性能有多少优化空间,需要投入多大的资源做这样的事情。
第四点数据优化的资源该怎样量化,怎样最大化利用这些资源。如果用户深入了解Arctic,会发现我们optimizing跟目前Hudi其他的有很大不同,首先我们optimizing是在平台层面调度的,不是在每一个写入的任务里做的,这样我们可以集中化管理这些数据优化的资源,并且可以提供最快的迭代。比如我们发现通过一些优化能够让合并效率有很大的提升,就可以很快迭代。
最后一点是怎样灵活分配资源,为高优先级来调度资源。
第二个是流式更新部分列,现在我们主要是通过CDC来做streamingupsert,很多场景,比如特征、大宽表,我们可能需要能够更新部分列。
后面是更多的optimizercontainer支持,比如说K8s;更多SQL语法的支持,比如说mergeinto——目前我们在Arctic层面还没有这样的语法,用户可以把Arctic的表当成Iceberg表来用,来支持mergeinto。未来如果在Arctic层面支持了mergeinto,它会和Iceberg有所不同,因为我们的变更数据首先会进入到change空间。
最后一点因为我们的生态位是在数据湖Tableformat之上,所以未来我们会做架构的解耦,去扩展到更多的Tableformat,比如Delta、Hudi。
最后谈谈我们开源的初衷。过去我们做开源可能没有一个非常统一的步调,去年我们领导层下了一个决心,会按照一种更加专注的方式做开源,以Arctic这个项目为例,我们不会做任何的商业隐藏。而且从组织架构而言我们团队推进开源也是非常独立的过程,如果可能有商业化会由其他的团队推进。
这是我今天要分享的全部内容,谢谢大家!
虽然是相似的,但是大家的目标是不太一样的,Flink做这个事对流这个场景而言更加原汁原味,但是肯定不会像我们更多考虑到怎么在Spark上,在其他的引擎上做一些事情,怎么在上层提供更多的管理功能。所以抛开一些功能上的重合,我理解大家的初衷或者最终要解决的问题还是会有差异。
主持人:虽然在表现形式上是相似的,但是Flinktablestore的这种方式更贴近原生Flink的场景,但是我们除了兼容Flink的场景之外还会有更多偏向于Spark的场景做兼容和支持。
马进:不光是Spark,我们还提供了Hive兼容。如果是Hive用户,怎么能让一些Hive表比较顺滑地升级到我们湖仓一体新的架构上,这是我们系统去考量的一些东西,在这个过程中怎样提供一些便利的管理功能,提供度量指标,这些可能和FlinkTablestore考虑的点是不一样的。
主持人:Arctic底层刚才讲到是基于Iceberg实现的,在代码上有强绑定的关系吗?以后会不会考虑基于其他的Tableformat做这种转换?
主持人:说白了我们不帮用户做出最终的决定,而是提供更多的可能性,无论未来是Iceberg还是Delta,我们都能用一种比较开放的方式兼容。
马进:这个是长远的,现在我们还会和Iceberg结合得紧密一些。
Arctic文档地址:
GitHub地址:
文章为作者独立观点,不代表 股票程序化软件自动交易接口观点