阿里巴巴云计算边缘计算(流式计算的系统设计和实现)
阿里巴巴云计算边缘计算(流式计算的系统设计和实现)continuous computing(实时数据的实时计算):大数据集的在线复杂实时计算。stream computing(实时数据的计算):计算可枚举,计算在数据发生变化时发生。流计算对于时效性要求比较严格,实时计算就是对计算的时效性要求比较强。流计算是利用分布式的思想和方法,对海量“流”式数据进行实时处理的系统,它源自对海量数据“时效”价值上的挖掘诉求。那么,通常说的实时系统或者实时计算,严格意义上来说分成三大类:ad-hoc computing(数据的实时计算):计算不可枚举,计算在query时发生。
更多深度文章,请关注云计算频道:https://yq.aliyun.com/cloud
阿里云数据事业部强琦为大家带来题为“流式计算的系统设计与实现”的演讲,本文主要从增量计算和流式计算开始谈起,然后讲解了与批量计算的区别,重点对典型系统技术概要进行了分析,包括Storm、Kinesis、MillWheel,接着介绍了核心技术、消息机制以及StreamSQL等,一起来了解下吧。
增量计算和流式计算
流式计算
流计算对于时效性要求比较严格,实时计算就是对计算的时效性要求比较强。流计算是利用分布式的思想和方法,对海量“流”式数据进行实时处理的系统,它源自对海量数据“时效”价值上的挖掘诉求。
那么,通常说的实时系统或者实时计算,严格意义上来说分成三大类:
-
ad-hoc computing(数据的实时计算):计算不可枚举,计算在query时发生。
-
stream computing(实时数据的计算):计算可枚举,计算在数据发生变化时发生。
-
continuous computing(实时数据的实时计算):大数据集的在线复杂实时计算。
增量计算
日志采集和在线分析:如基于访问日志、交易数据的BI算法分析。比较有名的像Google的统计、百度的统计 一些网站根据访问日志,会分析出各种的UV、 PV、 IPV等运营指标,有了流式计算,就可以对这些访问的时效性做到秒级、分钟级的监控,比如双十一当天,不同的店铺会通过店铺的实时访问情况来决定后面的运营策略;
-
大数据的预处理:数据清洗、字段补全等;
-
风险监测与告警:如交易业务的虚假交易实时监测与分析;
-
网站与移动应用分析统计:如双11运营、淘宝量子统计、CNZZ、友盟等各类统计业务;
-
网络安全监测:如CDN的恶意攻击分析与检测;
-
在线服务计量与计费管理系统 搜索引擎的关键词点击计费;
此外,流式计算和增量计算也应用在工业4.0和物联网上。
流式计算的数据特点
流(Stream)是由业务产生的有向无界的数据流。
-
不可控性:你不知道数据的到达时机以及相关数据的顺序,对于数据质量和规模也是不可控的;
-
时效性要求:在容错方案、体系架构和结构输出方面都与传统的计算是截然不同的;
-
体系缺失:传统学术领域已经对批量计算和离线计算的体系研究的非常成熟,而在实时领域如数据仓库中间层等领域都是缺失的,包括数据源管理、数据质量管理等等。
-
另外,数据处理粒度最小,可以小到几条数据,对架构产生决定性影响;
-
处理算子对全局状态影响不同,有状态、无状态、顺序不同等;
-
输出要求,比如一致性和连贯性等。
整个流计算会对系统有非常多的不一样的要求,这就会导致整个系统有非常大的复杂性,跟离线非常的不同,我们的计算仍然要求时效性、要求快,质量上要求它的计算一定是精准的,对容错的要求,不论你的机器、集群、网络硬件有任何的宕机,计算应该是持续稳定,对整个计算的要求也是非常多样性的。关于多样性,不同的业务场景,对计算的结果要求也是不一样的,有些要求精确,一点数据都不能丢、精度损失,还有的业务场景要求可以多但是不能少,还有丢数据有一个sla在保证等,所以种种特点导致我们做流式计算和增量计算系统会面临与传统的离线计算和增量计算完全不同的要求。
与批量计算的区别
从架构角度,增量计算、流式计算和离线处理、批处理有什么本质的区别?
与批量计算的区别如上图所示,比如全量计算设计理念是面向吞吐,而流式计算是实时计算的一部分,面向延时;随之而来的整个全量DAG是一个串型的DAG,是一个StageByStage的DAG,而流式计算的DAG是一个并行DAG,也就是说Batch跟Batch之间是完全可以并行的,离线的批量系统它的串型化和Streaming场景下的并行化,它们在整个数据的时效性上
有非常大的区别,特别是在Latency的体现。
典型系统计算概要分析
下面将向大家介绍业界比较经典的几个流计算产品:
Twitter Storm
Storm是Twitter内部使用开源被广泛使用的一套流计算系统,那么它的一个核心概念是说,一个任务要创建一个Topology,它表示了一个完整的流计算作业,它的最开始的源头名字叫做Spout,做收集数据的任务,它的前面可以挂任何的数据源、任何一个队列系统甚至可以对接文件,那么Bolt是它的具体计算任务所在的载体,而Bolt里有诸多的Task,它是在Spout和Bolt里负责具体一个数据分片的实体,它也是Storm里调度的最小单位。Acker负责跟踪消息是否被处理的节点。Storm的整个容错是采用源头重发的消息机制
源头重发在网络流量激增的情况下,会造成系统的雪崩风险大大提升。上图是两个Storm的作业,它先从源头读出数据,然后进行filter过滤,最终进行join,join后进行一些逻辑处理。
Nimbus–Zookeeper–Supervisor
Storm采用了Nimbus Supervisor之间的方式进行任务调度和跟踪,它们之间是利用Zookeeper来进行通讯,Nimbus相当于一个全局的任务Master,负责接收Topology,然后进行二重的资源调度,并且将调度的信息记录到Zookeeper中,定期检查Zookeeper中的各种Supervisor的心跳信息,根据心跳状态决定任务是否进行重新调度,而Supervsor充当着每台物理机的一个watchdog,它在轮询Zookeeper中的调度任务信息,然后接收到发现有启动任务的信息,就会拉启进程,启动Task,同时定期要把心跳信息写入Zookeeper,以便Supervisor来做出重新调度或者系统的重发操作。
消息跟踪机制是Storm的核心,保证消息至少被处理一次,它追踪源头信息的所有子孙信息。
基本思路如下:
Acker节点是进行消息跟踪的节点,以源头消息的ID为hash key,来确定跟踪的Acker,源头信息对应的所有的子孙消息都有该Acker负责跟踪,而消息树上每产生一个新的子孙消息,则通知对应的Acker,子孙消息被处理,然后再去通知对应的Acker,当Acker里所有的子孙消息都被处理的时候,那么整个数据处理就完成了。
子孙的产生是由父节点,而处理是被子节点。所以Storm用了一个非常巧妙的异或方法,当父节点产生这个消息时,产生一个随机数,把这个随机数异或到Acker里,Acker把这个随机数传递到下一步的节点,当这个节点正确被处理以后,再把这节点发送给Acker去做异或,所以Storm利用了这个Acker机制来压缩整个数据的跟踪机制,最终保证任意节点出现宕机而值不会变成0。
Transactional Topology
光有以上的机制,还远远不够。被系统重发的消息没有任何附加信息,用户无法判断消息是否是被重发的等一些问题还有待解决,为解决消息被重复处理的问题,Storm 0.7.0以后版本推出了Transactional Topology进行改进,
原理如下:
在Spout上将源头消息串行划分成 Batch,为每个Batch赋以递增的id,记录在Zookeeper中,利用Acker跟踪Batch是否被完全处理完成,超时或者节点异常,Spout重发Batch内的所有消息,不影响中间状态的操作可以并发的执行,例如 Batch内的聚合操作,用户代码利用唯一的Batch ID进行去重。
整个Topology同一时刻只能有一个Batch正在提交,以保证在每个节点上Batch串行递增,简化用户去重的逻辑。
Storm优缺点
优点:消息在框架内不落地,处理非常高效,保证了消息至少被处理,Transactional Topology为消息去重提供了可能,调度模式简单,扩展能力强(关闭重发模式下),社区资源丰富,拥有各种常见消息源的Spout实现。
当然Storm也有自己的劣势:Transactional Topology对Batch串行执行方式,性能下降严重;Batch太大太小都有问题,大小需要用户根据具体业务分情况设置等。
Amazon Kinesis
Kinesis系统是一种完全托管的实时处理大规模数据流的开放服务。
-
所有节点运行于EC2中:相对Storm来说,它采用了消息节点内部重放的系统,而不是像Storm那样子源头重发,它的所有的节点都已经在EC2中,无需单独的调度策略、复用安全、资源隔离机制,且扩展性好、弹性可伸缩。
-
只支持单级Task,可以利用多个Stream组成复杂的DAG-Task,用户代码需要实现DAG-Task内部的消息去重逻辑。
-
数据收集与计算独立:数据收集模块(Shard)对消息进行持久化,最长保留24小时;可以Get方式从其它系统中读取Shard数据,计算模块(Kinesis App)处理被推送的数据,Instance个数与Shard个数相同;用户代码可以自主控制Checkpoint节奏。
用户可以自主调用相应的SplitShard\MergeShard接口,Stream上所有App的并发度随之调整。具体实现方法如下:
每个Shard串行将接收到的消息写入S3文件中,SplitShard后,原有Shard不再接收新数据,原有Shard对应的所有App的Instance处理完消息后关闭,启动新的Shards(两个)和对应新的Instances。
使计算可以更加的弹性,服务的可用性也更高。
Google MillWheel
MillWheel系统是利用内部支持snapshot功能的Bigtable来进行持续化中间结果,将每个节点的计算输出消息进行持久化,实现消息的“不丢不重”。
区别于Storm的是,它没有复杂的跟踪树。因为每一级都把它的输出消息进行持久化,用户可以通过SetTimer\ProcessTimer接口解决用户代码在消息到来时才能取得控制流的弊端,然后在源头节点(Injector)上将数据打上系统时间戳,每个内部节点(Computation)计算出所有输入Pipe上的最小时间戳,向所有输出Pipe上广播当前完成的最小时间戳,用户可以利用Low Watermark这一机制解决消息乱序或一致性问题。
核心技术
那么,流式计算和增量计算中最核心的一些技术和难点有哪些呢?
从这张图可以看到,整个流计算是由一个复杂的Topology所构成。那么,从输入到输出,其中比较重要的两个角色一是Jobmaster,一是Coordinator。Jobmaster是每个Job负责运行时的一个master;而Coordinator是刚才所说的消息跟踪的一个角色,所以Coordinator最好是完全可以做到无状态的线性扩展。
Batch数据从源头进入后,进入Source节点,Source节点会从消息源读取数据,蓝色的部分代表着Worker节点,蓝色节点再向橙色节点进行数据传输的时候,遵循着Shuffle的方法,可以是哈希的方法,可以是广播的方法,也可以是任何用户自定义的方法,output节点会将输出结果向在线系统输出,或者向下一级MQ节点输出,输出的结果也是按照Batch去对齐。
系统边界-数据收集/结果数据
-
拉:从消息队列(Kafka)、存储(Hbase、HDFS)等系统读取数据,并且借助这些源头已经持久化的数据实现系统的故障恢复;涉及第三方服务系统授权。
-
推:需要实现Http处理模块(Apache、Nginx等),更需要解决故障恢复问题。
-
订阅:结果数据写入消息队列,业务方订阅,进入自己在线存储系统。
-
服务:直接提供在线数据服务;涉及第三方服务系统授权;结果数据时钟对齐。
shuffle机制
数据如何在处理节点之间流转,这就涉及到shuffle机制了。
在流计算的处置机制下,数据采用了push模式,它整个数据不落地,对于数据传输的时效性是比较好的;另外,消息机制是需要解决丢数据和重数据的问题,框架也需要保证消息的有序性。
计算节点
流计算的计算进程是longlive的,即便没有数据进程也在,所以决定了我们的系统不同的调度方式、不同的消息机制。计算的容错采用任务跟踪机制,最重要的一点,流计算的计算是个有状态的计算,这个中间状态的存储方式、容错关乎着整个计算的时效性、正确性、吞吐等。
分布式挑战和服务化诉求
离线计算系统对整个编程模型进行了约束,所以它在计算规模以及容错上面已经有了非常成熟的研究,但是在流计算情况下,它在扩展能力上集群规模的上限是多少?计算作业是否可以线性增加?
用户针可不可以重新定义等价的DAG来避免数据倾斜(牺牲性能),同时,如何避免倾斜带来超时/雪崩的问题,数据动态的变化如何做到实时调整?
数据如何高可靠存储,集群扩容、系统代码升级时是否需要停止服务?单节点故障是否会导致整体服务的不可用?此外,我们不可避免地面临着多租户管理的问题,也就是授权鉴权 、资源隔离;还有计量计费、安全体系和运维体系怎么保证等等。这都是做流计算系统面临的一些核心问题。
增量计算语义
我们提出了新的模型MRM,它分为三个主要阶段:
-
Local阶段,是指Batch的Local操作,这个语意完全等同于MapReduce,可以认为离线计算或者全量计算,是一个只有一个Batch的增量计算,所以这时候的Map语意是完全一样的;
-
Reduce阶段是一个Aggregate阶段,相当于在Batch内的数据做一次重新的组合,但是增量的语意告诉我们一定要做跨Batch的数据的结果合并。
-
Merge阶段,跨批数据做全局的聚合计算。
上图为一个简单的wordcount例子,按照这样的Batch去分,可以看到Map阶段每个Batch的输入、输出,然后Reduce输出,第一个Batch输出7,第二个Batch输入5,依此类推,初始化情况下OldValue为0,那么,Merge很简单,就进行0和7的合并操作。在这个case下面,进行一个count操作,就是7 0在Batch2,它把本次的Reduce结果5和上一次Merge的全局结果再进行一次Merge12,依此类推,所以大家可以看到对角线的颜色,这就是一轮的Merge的结果会成为下一轮的OldValue。
可以看到,贯穿始终的是一个很重要的Batch概念,它是系统跟踪数据/时效性处理的最小单位。其实Batch是一个可以scale的概念,它可以退化为全量计算,也可以把一条数据放在一个Batch里,可以做到时效性非常高。这两者系统的吞吐、时效性会有截然不同的表现。
增量计算具备三要素:确定性、可加性和可逆性。
图中case是说,假设在双十一有两行SQL,首先按照卖家ID去Groupby,我要统计卖家的实时成交情况,是按照sellerid为11卖家,假设刚开始没卖,第二行按照每10块钱分制方图,
依此类推,就像大家看到双十一的阿里巴巴大屏一样,它是实时滚动的。
sellerid为11的这个卖家刚开始没卖东西,第0档现在没卖家,第一档当前时刻已经有10个卖家了,第七档有53位卖家,卖家为11做成了一个5块钱的生意,那么整个计算11这个0会变成5,现在属于第一档,那么就把第一档值由0变成1,但是紧接着,11号卖家又做成了16块钱的生意,someMoney就会变成21,21属于20到29区间,也就是这个档位原来有10个卖家,现在变成11个卖家,但是重要的是,应该还把原来第0档的那个1减掉,第一要把现在档位加上去,还要把原来对其他档位的贡献要撤销掉,这就是我们说的可逆性。
我们需要把这样的UDEF,不仅要做一个正像操作,同时也要去实现一个负向操作。意思就是,把原来的那个值对系统状态的影响让用户有机会撤销掉,可以看到整个增量计算的一个
增量语意的rollback,相当于对数据库领域的物化视图,两个物化视图实时更新,一个物化视图嵌套一个物化视图,必须要对之前的计算有一个回稳操作,这是增量计算的一个非常本质的要求。
消息机制
消息机制是整个流处理系统的核心,它会影响计算延迟、容错机制、可扩展性、服务可用性等方面,整个消息过程从分发到接收到处理相应用户的work代码,我们将这个消息框架定义为shuffle framework,对于消息的发送和接收利用Zookeeper来记录整个DAG。也可以利用消息队列比如Kafka,就像我们每一步都落地、存储这样的方式来定义shuffle。
对于解决消息的丢失问题,有两种做法:
-
一是消息源头的重发。每一个节点跟节点之间是用Push过去的,中间结果不会进行任何的可靠存储,所以运行效率是比较高的,但是当一个Topology比较大的时候,任意一个节点的宕机或者超时都会引起整体的重发和重做,进而增加雪崩的风险;
-
一是节点内部重发。它跟前者的优势劣势刚好反转,它需要每一步落地,运行效率会比较低,系统也会产生大量的文件碎片,它的好处是重发重做只依赖于父亲节点。
消息源头重发机制
方案
依赖源头可靠的数据存储系统(Kafka、HBase等),源头存储系统既是消息收集模块,又是消息重发的数据源,中间节点消息不落地,跟踪源头消息,超时后重发。
雪崩
分布式系统中最常见的异常状态是网络的抖动。在流处理系统中,消息跟踪代价过大,一般的跟踪机制并不会在跟踪结果中详细标示出是哪一个节点出现故障。这种不加区分的源头消息重发,会使得本来正常的节点因为其它单点的故障,也要接收大量的重复数据,消耗宝贵的网络资源,使网络状况进一步恶化。然后这种情况会一步一步地变大,最终造成整个集群网络的瘫痪。
针对以上问题,我们进行了消息命令流的大量约减,我们不会跟踪具体每一个Batch内的数据,而是每一批发送一个特殊的命令流来跟踪。我们要求每个消息有唯一的batchid,并且与源头节点的offset可重入,还需要这样的Batchid去做消重,也作为一个唯一的版本,每次重试我们会在Batchid后面新增一个attemptid。
当然这样的问题还是存在大量的通讯量,节点Crash后整个表如何重建等问题。
消息节点内部重放
方案
节点接收到消息后,先落地,再计算;节点出现故障,从存储系统中重放;定期做 Checkpoint,减少重放代价。
这种方案特点是方案简单,但依赖于可靠、高效的存储模块;局部故障对全局影响小,系统可扩展性好,雪崩的风险也极大地降低。
消息去重
消息去重策略是上下游节点之间通过自增的ID协议。
发送端消息从0开始赋值唯一的id,每次加1;发送消息后等待接收端返回信息,成功或者消息重复才可发送下一消息,否则一直重试。
接收端在内存维护一个去重表,Key是上游节点对应的ID,Value是对应上游节点最后一次接收成功的消息id;接到新消息,首先在去重表中,根据PipeID比较消息id是否自增加1;如果小于等于已有消息id,则直接返回消息重复;否则,将消息写入存储系统中,之后更新去重表,并返回发送端消息接收成功。
为了避免造成大量的IO浪费,我们也会使用读写分离技术。
写模块:
-
节点将接收到的上游消息首先进行持久化;
-
将数据和存储系统中的Offset信息通过内存(或者网络)尝试发送至处理模块的缓冲区中;
-
如果缓冲区不满,直接放置队尾;
-
如果缓冲区已满,则将缓冲区中队首尚未处理的消息丢弃,然后将当前消息放置队尾。
读模块:
-
消息处理模块读取缓冲区队首的消息;
-
通过判断该消息的Offset信息是否连续来确定是否有消息丢失;
-
如果发现消息丢失,直接从存储系统“追读”丢失的消息,直到追上缓冲区队首的消息。
读写分离的好处是,网络抖动不影响其它节点,局部故障也不放大,不会出现处理快的节点一直在等慢的节点等。
有状态计算
增量计算
Map-Reduce-Merge模型,Map和Reduce约定在Batch内的一个Map操作和aggregate的一个reduce操作,语意和Map-Reduce完全兼容;用户只需要在Merge里面去写用户逻辑,全部是由增量计算框架来维持,输入这个Value是本次Reduce的结果,用户只用把oldValue和这个value进行合并操作并且返回新的值,作为下一次的oldValue传。
Checkpoint
当作业的并发数增长时,一定程度上,任务的并发程度的增长已经受限于Hbase的能力,尤其是随机读的能力,所以对整个系统的scale ability是有限制的。
对此,我们引入了一个内存增量的snapshot机制。用户可以指定固定批次的数据,在这个批次内的对于OldValue和state的修改,完全都在一个增量的snapshot,而这个snapshot在内存里头, Merge是update,它完全是在增量的snapshot内完成,而系统这时持续地引进会产生大量增量的snapshot,这时系统会在内部启动一个checkpoint的线程,它会顺序地将这些snapshot选择性地进行Compact,Compact后将这个内存的snapshot持久化,批量地刷入到盘古,刷入到一个全局的快存储。
这样的机制既保证用户在调用Merge这个函数的时候,基本上都在操作内存,而整个系统的scale ability不依赖于其他第三方的可靠存储,而系统将snapshot的checkpoint是在异步的后台进行。
并行DAG
为了克服datascale以及增加系统的时效性,整个DAG完全是一个并行的DAG。
那进行一个简单的建模:假设有N条数据,M个资源,共有n个module,第i个module的吞吐为OI 调度的资源数为Pi。
图中可以看出,在理想情况下,它们完成的延时是一致的。
但事实上完全不是。限制时下的物理模型远远比这个复杂,串行的模式的优势是模型简单、吞吐高。劣势是数据时效性和数据倾斜对系统的整体延时伤害。所以串行模型是面向吞吐、兼顾延时。
并行DAG优势是数据时效性好、对倾斜友好。但是它的建模非常复杂,调度也是非常复杂。
并行模型是面向延时而兼顾吞吐的。
抢占式调度和资源隔离
整个流计算是个longlive的进程,所以业界之前的调度系统针对任务结束后进程回收的情况很明显不再适用,那么离线里面,无论是Yarn还是fuxi,都不能适应长进程的任务调度。
现在有一个开源项目叫Slider,它在某种程度上尝试去解决这个问题。在线系统的调度与离线系统的调度差异性是非常大的,我们需要解决的问题还有很多很多。
在隔离维度上,用户程序使用的Memory、Network、CPU(隔离难度依次递增),不需要隔离本地IO访问(不容许用户程序访问本地IO),框架使用的资源,通过消息“流控”来限制。
而Memory上,Java程序通过启动jvm时的-Xss参数设置;C\C 程序通过定期查看linux下/proc/pid/status,超过上限后,限制消息输入;Network通过linux系统的iptables tc模块。
CPU通过linux taskset命令将进程绑定到某一具体CPU上,同一CPU上多进程依然会发生抢占资源。
Failover机制
整个流计算的Failover容错机制,Batch是容错的最小单位,是数据跟踪的最小单位,是输入输出的最小单位,是控制的最小单位;整个容错分为源头重建和节点重建两种,全量输出,无外部互相依赖,跟踪消息与消息体量级。
离线跟踪、流式跟踪、在线跟踪,完全在实现方法上、策略上不一样。
那么有状态计算的Failover的checkpoint,它的内存重建,大家可以关注开源的tachyon,在整个Failover的机制设计方面,有运行时效率和恢复时效率的一个tradeoff,包括如何避免雪崩,这些都是在容错机制上要考虑的重点问题。
综上所述,整个系统是在不断做TradeOff:
-
吞吐与响应时间的TradeOff
-
实时性与数据链路的不可控的TradeOff
-
非幂等操作与数据链路的不可控的TradeOff
-
精度与成本的TradeOff
-
恢复成本与运行时成本的TradeOff
-
全链路与系统边界的Tradeoff
-
需求多样性与平台一致性的TradeOff
-
不同计算场景不同技术体系的TradeOff
StreamSQL
streamSQL支持用SQL一样的语意来表示,让用户使用SQL来操作流计算。streamSQL提供了5个基本原语:Map、Reduce 、Shuffle 、Union和Merge。我们也实现了一些高级算子,用户可以去扩展高级算子,包括Topk、distinct 、Join 、windows。
如图所示,最底下的引擎是整个流计算的一个并行DAG;MRM层最主要的部分分为消息控制、容错以及计算模型;在此之上定义了算子层;算子层再往上是SQL层,SQL层分为SQL解析、制定逻辑执行计划、根据拓普运行情况和原信息进行物理执行计划的优化和SQL的改写。
那么我们可以定义我们的源表,也就是数据源,你可以create一个stream table。用户还可以定义自己的维表、临时表,临时表作为一个SQL的极联存在,它定义了内部数据流的一个Schema,实际上它是不存储任何的数据,只是做串联上下游的逻辑,构造复杂DAG。
用户除了写SQL之外,还可以自定义函数算子来实现它的逻辑。
StreamSQL的系统架构如图所示,分成gateway层、控制集群层和worker执行层。
用户看到的是统一的逻辑集群,我们提供了开发平台、Web UI、监控报警以及profiling 和Metric子系统等。
延伸
在实际业务场景中,会发现很多很多的问题等待我们去解决,比如:
-
无法做任务之间的复用数据
-
完成业务需要各种计算模型
-
多个系统融合
-
系统之间无法共享数据
-
离线与在线的鸿沟依然明显
以上就是跟大家分享的流式计算和增量计算的关键技术点,谢谢大家!