核桃编程Delta Lake实时数仓应用实践

 容器服务Docker K8S     |      2020-03-02 00:00:00

作者:
卢圣刚,核桃编程数据架构师,拥有多年的大数据开发和架构经验。曾担任易观数据挖掘工程师,熊猫TV大数据架构师。


核桃编程简介

核桃编程成立于2017年8月9日,作为少儿编程教育行业的领导者,始终秉持“让每个孩子爱学习、会学习,让优质的教育触手可及”的使命,致力于以科技手段促进编程教育,凭借首创的AI人机双师教学模式与十级进阶课程体系,实现规模化因材施教,“启发中国孩子的学习力”。截止2019年8月,核桃编程已经成为付费学员规模最大的少儿编程教育机构,帮助超过65万名孩子收获学习兴趣,锻炼编程技能,养成良好思维习惯,学员复购率超91%,学员完课率高达98%,在线原创作品1873万份。

1.业务现状

业务需求

  • 业务上固定时间开课,在开课时间内,班主任需要实时/准实时地知道学生的学习情况
  • 数据统计维度一般都是按班级,学期汇总,时间范围可能是几个月,甚至一年
  • 业务变化快,需要及时响应业务变化带来的指标逻辑变更

数据源

image.png

架构改造前方案

现有指标都是将Kafka/Mysql等的数据写入HDFS,使用Hive离线批处理,每10分钟执行一次,循环统计历史累计指标,再定时把数据同步到Mysql,提供给数据后台查询。如下图所示:
image.png

遇到的问题

随着计算的数据量越来越大,逐渐不能满足业务的更新频率要求。

  • 使用Apache Sqoop做全量数据同步,会对业务Mysql库/HDFS造成压力。
  • 使用Apache Sqoop做增量同步,一般只能使用某个时间字段(例如update time)来同步新修改的数据。这样在做分区表时,需要比较复杂的离线合并。
  • 随着数据越来越大,同步以及处理时间会越来越长,满足不了业务实时性需求。

2.实时数仓方案调研

离线的同步方案已经不能满足业务需求,计划迁移到实时方案上来,并做了一些调研。

迁移流式计算的问题

开发周期长

现有离线任务基本都是动辄几百行SQL,逻辑复杂,把所有逻辑迁移到流式计算,开发难度和改造成本都比较大。
例如离线增量同步,需要先同步全量base数据

sqoop import  
--hive-import 
--hive-overwrite 
--connect jdbc:mysql://<mysqlurl>   
--table <mysqltable> 
--hive-table <table_base> 
--hive-partition-key <parcolumn> 
--hive-partition-value <par1>

再消费增量binlog数据,流式写入到hive外部表,最后将两个表合并

insert overwrite table <result_storage_table>select
<col1>,
      
<col2>,
       <colN>
  from(select
row_number() over(partition by t.<primary_key_column>
 order by record_id
desc, after_flag desc) as row_number, record_id, operation_flag, after_flag,
<col1>, <col2>, <colN>
  from(select
incr.record_id, incr.operation_flag, incr.after_flag, incr.<col1>,
incr.<col2>,incr.<colN>
  from
<table_log> incr
 where
utc_timestamp< <timestamp>
 union all select 0
as record_id, 'I' as operation_flag, 'Y' as after_flag, base.<col1>,
base.<col2>,base.<colN>
  from
<table_base> base) t) gtwhere record_num=1 
  and
after_flag='Y'

而应用Delta Lake只需要一个streaming sql即可实现实时增量同步。

CREATE SCAN <SCAN_TABLE> on <STREAM> using
stream;
CREATE STREAM job
OPTIONS(
checkpointLocation='/cdc',
triggerInterval=30000
)
MERGE INTO <CDC_TABLE> as target
USING (
 SELECT
 from_unixtime(<col2>,'yyyyMMdd') as
par_date,
 <col1>
 FROM(
  SELECT 
  recordId, 
  recordType, 
  CAST(before.id as
LONG) as before_id,
  CAST(after.id as
LONG) as id,
 
after.<col1>,
  after.ctime,
  dense_rank() OVER
(PARTITION BY coalesce(before.id,after.id) ORDER BY recordId DESC) as rank
  FROM (
   SELECT 
   recordId, 
   recordType, 
  
from_json(CAST(beforeImages as STRING), 'id STRING, <col1>
<coltype1>,ctime string') as before,
  
from_json(CAST(afterImages as STRING), 'id STRING, <col1>
<coltype1>,ctime string') as after
   FROM (
    select
from_avro(value) as (recordID, source, dbTable, recordType, recordTimestamp,
extraTags, fields, beforeImages, afterImages) from <SCAN_TABLE>
   ) binlog WHERE
recordType != 'INIT'
  ) binlog_wo_init
 ) binlog_extract
WHERE rank=1
) as source
ON target.id = source.before_id
WHEN MATCHED AND source.recordType='UPDATE' THEN
UPDATE SET *
WHEN MATCHED AND source.recordType='DELETE' THEN
DELETE
WHEN NOT MATCHED AND (source.recordType='INSERT' OR
source.recordType='UPDATE') THEN
INSERT *;

数据恢复困难

对离线任务来说数据恢复只需要重新执行任务就行。

但对流式计算,当数据异常,或者逻辑变更,需要重新跑全量数据的时候,只能离线补历史数据,再union实时数据。因为Kafka不可能存所有历史数据,而且从头消费追数据时间也会很久。

而为了满足快速恢复的需求,所有指标都需要从一开始准备离线和实时两套代码,类似Lambda架构。

数据验证困难

Kafka在大数据架构中一般充当消息队列的角色,数据保存周期较短。全量历史数据,会消费Kafka写到HDFS。如果一个指标计算了一个月,发现计算结果有异常,很难追溯是当时Kafka数据有问题,还是计算逻辑有问题。HDFS数据虽然可以用来排查,但是HDFS里的数据和当时Kafka的数据是否一致,是不能保证的。

希望满足的功能

正因为迁移流式作业会有一些迁移成本和问题,所以对实时计算方案提出了一些功能要求。

开发灵活

互联网公司业务发展速度快,人力资源比较紧张,需要更低成本更快捷的开发新指标,满足业务敏捷性的要求。

重跑历史数据方便

业务指标的定义经常发生变更,一旦变更,或者有新的数据指标就需要从最早开始消费。但是历史数据通常非常多,而且一般实时数据源Kafka也不可能存历史所有数据。

数据异常时容易排查问题

以离线数仓为例,几百行的SQL,可以分段执行,来逐步排查。Flink可以埋metrics获取中间过程。

3.基于Delta Lake实时数仓方案

Delta Lake

Delta Lake是美国Databricks开源的数据湖技术,基于Apache Parquet丰富了数据管理功能,如元数据管理/事务/数据更新/数据版本回溯等。使用Delta Lake可以很方便的将流处理和批处理串联起来,快速构建Near-RealTime的Data Pipeline.
image.png

目前阿里巴巴E-MapReduce(简称“EMR”)团队对Delta Lake做了很多功能和性能上的优化,并和Spark做了深度集成,主要以下方面,更多信息详见EMR官方文档

  • SparkSQL支持Update/Delete/Merge Into/Optimize/Vacuum等语法来操作Delta Lake
  • 自研SparkStreaming SQL,支持Delta Lake的相关DML操作
  • Hive&Presto On Delta Lake
  • Delta Lake On OSS(阿里云对象存储)
  • Delta Lake事务冲突检测优化
  • DataSkipping & Zorder性能优化
    image.png

SparkStreaming SQL

阿里巴巴EMR团队在StructStreaming基础上自研了SparkStreaming SQL,用户可以很方便的使用SQL来写流式作业的逻辑,大大降低了开发门槛, 详见 SparkStreaming SQL官方文档
image.png

  • 批流统一引擎
    可以复用底层SparkSQL/SparkCore的优化
  • 丰富的SQL支持
    CREATE TABLE / CREATE SCAN / CREAT STREAM / CTAS

INSERT INTO / MERGE INTO
SELECT / WHERE/ GROUP BY / JOIN / UNION ALL

  • 丰富的UDF支持
    Hive UDF / 窗口函数
  • 丰富的数据源支持
    Delta/Kudu/Druid/HBase/MySQL/Redis/SLS/Datahub/TableStore

并且支持Kafka的Exactly Once
github: https://github.com/aliyun/aliyun-emapreduce-sdk

  • Delta Lake深度集成
    结合Delta Lake的使用场景,新增了一些功能的支持(比如流式写动态分区表)

实时数仓方案

架构方案

基于Delta Lake+SparkStreaming SQL可以快速构建实时数仓的pipeline,如下所示:
image.png

  • ODS层
    ODS的数据主要是实时埋点数据,CDC中的binlog日志等
  • DIM维表
  • DW层
    DW层主要是一部分轻度汇总数据,例如用户维度的课程,作业等信息。

     主要复用的是dw层数据,因此针对每一个指标,需要综合考虑是否聚合,聚合到哪一个维度,是否关联维表。

    DW层分为两种

a.业务简单,基本不会变化。直接写入Kafka。
b.业务逻辑复杂,数据可能<频繁>变化,写入Delta Lake。实践上看,直接写入Kafka是最容易的方案,但是灵活性很低,历史数据无法追溯,也无法修改。DW层通过引入Delta Lake,可以实现流批统一数据源,历史分区数据恢复等功能。

  • DM层

     DM层就是最后的报表展示指标了,可以将DW层delta表做为数据源,再次汇总后sink到展示用的DataBase。
    

备注:
EMR团队提供了流式Merge Into功能,可以通过写SparkStreaming SQL的方式来做CDC回放binlog到Delta表。
详见CDC同步文档

问题的优化

在使用Delta Lake的过程中,我们也发现了一些问题,详细的解决方案和建议如下:

小文件多

CDC流式Merge回放binlog的过程中,会不断产生小文件,需要对小文件进行一些处理,EMR提供了一些优化方案

  • 新增串行auto compaction的功能
    在CDC流式作业运行过程中,根据一定的策略对小文件进行合并compact操作
  • 使用Adaptive Execution
    打开自适应执行开关,可以有效减少Merge过程产生的小文件,如单个batch从100个小文件减少到1~2个文件。

Compact冲突问题

如果不使用串行Compact功能,需要定期手工对Delta表进行Compact合并小文件,但是经常碰到Compact在事务提交的时候和CDC流作业事务提交产生冲突,是的CDC流或者Compact失败,这块也提供了一些优化以及建议:

  • 优化Delta内核冲突机制,使得CDC流能够稳定运行,不会因为Compact挂掉
  • 使用分区表,批量对分区进行Compact,减少冲突概率
  • 在数据库表update/delete操作很少的时候进行Compact(可以使用EMR工作流调度)
  • 使用EMR工作流中的作业重试功能,当遇到Compact事务提交失败时进行重试

架构方案进一步说明

• 为什么不直接从ODS计算

以核桃的到课指标为例,数据源是kafka的埋点topic,需要计算的指标有个人维度到课数据,学期维度,班级维度,学期维度,市场渠道维度。
每个维度都需要消费所有的埋点数据,从中挑出到课相关的事件。并且每个维度的计算程序都需要查询HBase/Mysql关联相关的学期,班级,unit等维表。
一旦有整体逻辑的调整,例如过滤测试班数据,不可能从ods层就把数据过滤掉(这样从底层就开始丢失数据,后期无法追查),那么所有程序都需要重新调整,添加这个过滤逻辑。

• 怎么恢复数据

理想情况是,实时与离线使用同一套SQL,同一套计算逻辑,同一个数据源,这样随时可以用离线脚本重跑历史数据。但是现实是没有哪个框架支持。所谓流批一体,都是在引擎层面,例如Spark的streaming和SQL都是batch的方式,流只是更小的批。而Flink则希望用流的方式去处理批数据,批只是有边界的流。针对高阶的SQL API,流批都有很大的区别。基于Delta Lake的分区表,将dw层的实时数据按时间分区,这样可以随时用离线作业恢复历史分区的数据。而DW之上的汇总因为数据量相对较小,恢复之后可以用流作业从头消费。

4. 业务效果

Delta Lake实时数仓在核桃编程部分数据仓库生产环境上线后,部分业务统计指标已基于新架构产出,指标更新延迟从几十分钟,提升到1分钟以内。班主任可以更快获取学生的学习状态,及时跟进学习进度,从而显著提升了教学质量。
在CDC应用后,数据同步延迟从半小时提升到30秒,同时解决了Sqoop高并发同步时对业务数据库的影响。数据分析人员Ad-Hoc查询时,可以获取实时的业务数据,明显提升了数据分析效果,并且可以更及时的指导业务发展。

5. 后续计划

根据目前的业务应用效果,后续大数据团队会继续梳理业务范围所有实时指标,进一步优化实时数仓各层的结构,推进全面应用基于Delta Lake的实时数仓建设。
基于Delta Lake模式执行、时间旅行等特性,进一步推进机器学习场景下对Delta的应用,构造更可靠、易扩展的Data Pipeline。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png