EMR 打造高效云原生数据分析引擎(en)

 阿里云安全(en)     |      2019-11-05 00:00:00

本场视频链接:EMR打造高效云原生数据分析引擎

本场ppt材料:https://www.slidestalk.com/AliSpark/2019___0926_110365


基于开源体系打造云上数据分析平台

客户选择开源方案的原因主要有以下几点:

灵活多样的业务场景:目前即便是一个小企业,其数据存储也可能是多种多样的,比如业务数据、日志数据和图数据等,这种情况下,需要有一个高度定制化的系统来串联不同的业务场景;
自己有专业的运维能力:开源系统有充足的人才储备,丰富的网上资料与开源的强大后盾,可以确保公司业务的顺利开展;
多种业务需求vs成本压力:每种云上产品有自己的使用场景,对于中小企业来说购买多种云产品将会造成很大的成本压力,而通过开源体系维护一套系统,在集成用户业务中所需组件的同时,可以降低用户的成本。
image.png

下图是阿里巴巴EMR系统的产品架构图。用户上云的方式主要有两种,一种是购买ECS资源自己搭建一套开源系统;另一种是直接选择阿里巴巴的EMR系统。第一种方式由于开源系统组件多,涉及到了Spark、Hive、Flink和TensorFlow等,从零搭建一套完整的大数据系统对于用户来讲非常复杂,尤其是成百上千的集群规模也给运维造成了很大的挑战。而使用EMR系统具有以下优点:
1) 阿里云EMR系统可以帮助用户一键化自动部署、配置相关组件,开箱即用,同时还会根据用户的机器类型进行参数的自动推荐调优。
2) 阿里云EMR系统与阿里云其他产品实现了打通,比如数据存放在OSS,EMR系统无需额外再做认证配置,便可以很方便地读取OSS上的数据;
3) 阿里云EMR系统集成了很多自研插件,这些插件在其他产品中是没有的;
4) 阿里云EMR系统的所有组件兼容开源但优于开源,如Flink集成了阿里云自研的Blink和TensorFlow(PAI),这也是阿里云为社区做的一点贡献,目的是为了让用户能用到阿里云内部的技术;
5) 阿里云EMR系统提供了全平台的作业诊断与告警组件APM来实现自动化运维,大大降低集群运维的复杂性;
6) 阿里云EMR系统还与DataWorks对接,用户可以以DataWorks为入口,傻瓜式地使用EMR系统。

image.png

EMR系统的目标主要有以下三个:

平台化:将EMR做成一个统一的云上数据分析平台,帮助用户打造全栈式的大数据解决方案,支持全系列VM容器化,提供企业级HAS和大数据APM;
技术社区&深度:持续深耕技术社区,打造大数据友好的云 Native 存储,同时将技术回馈给社区,为社区做贡献;
生态:EMR系统将结合阿里云其他产品构建一个生态,接入Blink、PAI,集成OSS、OTS方案。

image.png

EMR-Jindo:云原生高效数据分析引擎

下图展示了TPC-DS的基准测试报告,可以发现在2019年3月份10TB的测试中,性能指标得分是182万左右,成本是0.31 USD;而2019年十月份同样的测试性能指标得分已经变成526万,成本下降到0.53 CNY,也就是说经过半年左右性能提升了2.9倍,成本缩减到原来的四分之一。同时阿里巴巴还成为了首个提交TPC-DS测试100TB测试报告的厂商。这些成绩的背后是EMR-Jindo引擎的支持。

image.png

EMR-Jindo引擎架构主要分为两部分:

Jindo-Spark:EMR内部全面优化的Spark高效计算引擎,可以处理多种计算任务;
Jindo-FS:自研的云原生存储引擎,兼容开源HDFS的接口,兼顾性能与价格。

image.png

1) Jindo-Spark

Jindo-Spark高效计算引擎对Spark采取了一系列优化措施,比如Runtime Filter支持自适应的运行时数据裁剪;Enhanced Join Reorder来解决外连接重排等问题;TopK支持推理并下推 TopK 逻辑,帮助尽早地过滤数据;File Index支持文件级别过滤和min/max/bloom/倒排等;自研开发了Relational Cache,实现使用一套引擎就可以将查询从分钟级提升为亚秒级;针对特定的场景推出Spark Transaction功能,为Spark引入Full ACID支持;实现了Smart Shuffle功能,从底层来减少sort-merge 次数,提升Shuffle的效率。

image.png

Runtime Filter
类似于Spark中的Dynamic Partition Pruning(DPP),但是其比DPP功能更强大。除了DPP能处理的分析表之外,Runtime Filter还可以处理非分析表。其基本原理是运行时动态裁剪数据,避免不必要的计算。比如,面对一个join查询,无法通过value下推到存储层而将数据过滤,逻辑推算的时候无法预知最后的数据量级。这种情况下如果是分析表,Runtime Filter首先会估计其中一个表中参与join操作的数据量,如果数据量较小,则提前进行数据筛选,再推送到另一侧做数据过滤;而对于非分析表,会引入Filter,如BloomFilter获得Min或Max的统计信息,

image.png

根据这些统计信息,将备选数据比较少的一侧提取出来,推到另一侧进行过滤。Runtime Filter的成本很小,只需要在优化器中进行简单评估,却可以带来显著的性能提升。如下图所示,Runtime Filter实现了35%左右的整体性能提升。该特性已经在Spark提交了PR(SPARK-27227)。
Enhanced Join Recorder
大家都知道,算子执行顺序可能会极大地影响sql的执行效率,这种情况下优化的核心原则是改变算子的执行顺序,尽早地过滤数据。

比如下图左上角的例子中,如果最底层两个表非常大的话,则这两张表join的开销会非常大,join后的大数据再去join小表,大数据一层一层地传递下去,就会影响整个流程的执行效率。此时,优化的思想是先将大表中一些无关的数据过滤掉,减少往下游传递的数据量。针对该问题,Spark使用的是动态规划算法,但其只适用于表的数量比较少的情况,如果表的数量大于12,该算法就束手无策。面对表的数量比较多的情况,EMR提供了多表join的遗传算法,其可以将原来的动态规划算法的2n的复杂度降到线性的量级,能完成成百上千张表的join。

下图右上角可以看到,Query64有18个表参与join,动态规划算法优化时间就需要耗费1400秒,而多表join的遗传算法仅需要20秒左右就可完成。Join Recorder另外一个重要的功能是外连接重排算法,大家都知道sql中外连接不能随意交换顺序的,但这并不代表不能交换顺序,比如A left join B, 然后再left join C,事实上在某种条件下其顺序是可交换的。在Spark中,外连接的优化是直接被放弃掉,而EMR则根据现有研究找到了顺序可交换的充分必要条件,实现了外连接重排算法(如下图左下角所示),对外连接的执行效率有了质的提升(下图右下角)
image.png

Relational Cache
Spark原本的Cache存在几个局限点,其一Spark的Cache是session级别,如果发现某一个Query的片段使用比较频繁,就会对为这个session创建一个cache,但是session结束后,cache就会消失;其二Spark的Cache是存储在本机上,而不是分布式存储,因此无法做到通用。在此基础上,EMR平台实现了Relational Cache,对任意Spark表,视图或者Dataset等关系型数据抽象的数据实体都创建cache, 类似于物化视图(Materialized View),但是比物化视图功能要丰富。Relational Cache的使用场景包括a)亚秒级响应MOLAP引擎;b)交互式BI,Dashboard;c)数据同步;d)数据预组织。

image.png

Relational Cache的创建过程如下,其语法与Spark sql常见的DDL类似。首先CACHE一个表或视图,然后指定Relational Cache的更新策略(DEMAND或COMMIT)、是否用于后续优化、Cache数据的存储方式以及Cache的视图逻辑。Relational Cache支持cache任意Table、View,支持cache到内存、HDFS、OSS等任意数据源,JSON、ORC、Parquet等任意数据格式。

image.png

Relational Cache还支持对用户输入的sql的优化。原来的Spark sql Cache对于用户输入的sql优化非常僵硬死板,用户输入的sql必须精确匹配上Cache才能使用。而Relational Cache则完全不同,如果有a、b、c、d四个表join的cache,当又有a、b、e三个表join的情况下,a、b join的结果便可以从四个表join时生成的Cache数据中读取。下图中右侧展示了Cache和没有Cache的基准测试结果,可以看出Relational Cache可以保证测试的响应时间在亚秒级。
请参考 Spark Relational Cache实现亚秒级响应的交互式分析

image.png

Spark Transaction:有些用户可能会使用Hive表,Hive表有事务支持,然而Spark在事务这一项上是不兼容Hive的。因此,为了满足用户数据订正/删除以及数据流导入的场景支持,EMR平台提供了Spark Transaction支持事务的ACID支持。

传统的数据导入是分批的,比如一天一导入,而流数据导入场景下数据是实时写入的原始数据,并未经过任何处理,因此会有delete和update的需求。Spark Transaction整体来讲是一种锁+MVCC的实现形式,MVCC与底层的存储密不可分。大数据在Hive和Spark兼容的情况下,都是文件的形式存在目录中,文件的版本通过行来控制,写入的每一行都会加上Meta Columns,如op、original_write-id、bucket id和row_id等,来标识这是全表唯一的一行。当需要更新某一行的时候,并不会原地更新该行,而是将该行取出来,重写后产生新的版本进行存储。读取的时候,多版本会进行合并后返回给用户。

image.png

###2) Jindo-FS
EMR早期推出了一种本地盘机型,使用这种机型来部署集群类似于用本地集群在云下部署大数据发行版,价格较高;此外由于当时HDFS有元数据瓶颈,本地存储的动态化伸缩面临很大的挑战。针对这方面的问题,解决的方案是计算与存储分离,将数据存储在OSS上,但是这种分离带来的直接结果就是性能变差,因为OSS元数据操作耗时,读取数据跨网络,传输带宽也会严重影响性能。

image.png

进而的解决方案是将数据从远端拉取到计算侧进行缓存,这也是Jindo-FS做的事情。Jindo-FS是类似于HDFS的系统,其架构也类似于HDFS的Master-Slave架构,分成Name Service 和Storage Service。它支持将某些访问频率比较高的表可以放到RocksDB中进行多级缓存。Jindo-FS整体不同于HDFS的Master结点, Jindo-FS的“Master”(Name Service)是一个分布式集群,使用raft 协议,提供入口服务;提供多Name Space支持;元数据以kv形式存放于高性能kv store 中;因为其本身不存储数据,真实数据在OSS和OTS中,因此支持数据的弹性扩展和销毁重建。

image.png

Jindo-FS底层的元数据管理会将数据拆成一系列的kv,通过递增的id来逐层查询。如/home/Hadoop/file1.txt需要读三次OTS。下图右侧的测试结果说明Jindo-FS在元数据操作方面相对于OSS有较好的性能提升。

image.png

Jindo-FS使用Storage Service来进行底层存储,在写流程中Storage Service将要写的文件同时存储到本地和OSS中,然后再返回给用户写的结果,同时还会在集群结点内进行多副本传输;而读操作和HDFS类似,如果命中本地,则在本地存储中读取,否则要进行远程读取。Storage Service具备高性能、高可靠、高可用、弹性存储等特性,为了支持高性能,Jindo-FS建立了数据流高速通道,同时还有一系列的策略,如减少内存拷贝次数等。

image.png

Jindo-FS中Name Service如何实现高可靠、如何进行热点数据发现与缓存替换、块存储模式与缓存模式;以及Storage Service如何应对读写失败、数据块如何设计并存储、如何实现高速数据通道等问题,请参见大数据生态专场《云上大数据的高效能数据库的存储方案》的分享。

image.png

相关文章:
JindoFS概述:云原生的大数据计算存储分离方案

JindoFS解析 - 云上大数据高性能数据湖存储方案


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!二维码.JPG

(en)