Flink流式计算在节省资源方面的简单分析

关于Flink流式计算节省资源方面你必须知道的技巧

小米在流式计算方面经历了Storm、Spark Streaming和Flink的发展历程;从2019年1月接触Flink到现在,已经过去了大半年的时间了。

对Flink的接触越深,越能感受到它在流式计算方面的强大能力;无论是实时性、时间语义还是对状态计算的支持等,都让很多之前需要复杂业务逻辑实现的功能转变成了简洁的API调用。还有不断完善的Flink SQL功能也让人充满期待,相信实时数据分析的门槛会越来越低,更多的业务能够挖掘出数据更实时更深入的价值。

在这期间,我们逐步完善了稳定性、作业管理、日志和监控收集展示等关系到用户易用性和运维能力的特性,帮助越来越多的业务接入到了Flink。

流式作业管理服务的界面:

Flink作业的监控指标收集展示:

Flink作业异常日志的收集展示:

Spark Streaming 迁移到Flink的效果小结

在业务从Spark Streaming迁移到Flink的过程中,我们也一直在关注着一些指标的变化,比如数据处理的延迟、资源使用的变化、作业的稳定性等。其中有一些指标的变化是在预期之中的,比如数据处理延迟大大降低了,一些状态相关计算的“准确率”提升了;但是有一项指标的变化是超出我们预期的,那就是节省的资源。

信息流推荐业务是小米从Spark Streaming迁移到Flink流式计算最早也是使用Flink最深的业务之一,在经过一段时间的合作优化后,对方同学给我们提供了一些使用效果小结,其中有几个关键点:

  • 对于无状态作业,数据处理的延迟由之前Spark Streaming的16129ms降低到Flink的926ms,有94.2%的显著提升(有状态作业也有提升,但是和具体业务逻辑有关,不做介绍);
  • 对后端存储系统的写入延迟从80ms降低到了20ms左右,如下图(这是因为Spark Streaming的mini batch模式会在batch最后有批量写存储系统的操作,从而造成写请求尖峰,Flink则没有类似问题):
  • 对于简单的从消息队列Talos到存储系统HDFS的数据清洗作业(ETL),由之前Spark Streaming的占用210个CPU Core降到了Flink的32个CPU Core,资源利用率提高了84.8%;

其中前两点优化效果是比较容易理解的,主要是第三点我们觉得有点超出预期。为了验证这一点,信息流推荐的同学帮助我们做了一些测试,尝试把之前的Spark Streaming作业由210个CPU Core降低到64个,但是测试结果是作业出现了数据拥堵。这个Spark Streaming测试作业的batch interval 是10s,大部分batch能够在8s左右运行完,偶尔抖动的话会有十几秒,但是当晚高峰流量上涨之后,这个Spark Streaming作业就会开始拥堵了,而Flink使用32个CPU Core却没有遇到拥堵问题。

很显然,更低的资源占用帮助业务更好的节省了成本,节省出来的计算资源则可以让更多其他的业务使用;为了让节省成本能够得到“理论”上的支撑,我们尝试从几个方面研究并对比了Spark Streaming和Flink的一些区别:

调度计算VS调度数据

对于任何一个分布式计算框架而言,如果“数据”和“计算”不在同一个节点上,那么它们中必须有一个需要移动到另一个所在的节点。如果把计算调度到数据所在的节点,那就是“调度计算”,反之则是“调度数据”;在这一点上Spark Streaming和Flink的实现是不同的。

Spark的核心数据结构RDD包含了几个关键信息,包括数据的分片(partitions)、依赖(dependencies)等,其中还有一个用于优化执行的信息,就是分片的“preferred locations”

// RDD
/**
 * Optionally overridden by subclasses to specify placement preferences.
 */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

”调度计算”的方法在批处理中有很大的优势,因为“计算”相比于“数据”来讲一般信息量比较小,如果“计算”可以在“数据”所在的节点执行的话,会省去大量网络传输,节省带宽的同时提高了计算效率。但是在流式计算中,以Spark Streaming的调度方法为例,由于需要频繁的调度”计算“,则会有一些效率上的损

耗。首先,每次”计算“的调度都是要消耗一些时间的,比如“计算”信息的序列化 → 传输 → 反序列化 → 初始化相关资源 → 计算执行→执行完的清理和结果上报等,这些都是一些“损耗”。

另外,用户的计算中一般会有一些资源的初始化逻辑,比如初始化外部系统的客户端(类似于Kafka Producer或Consumer);每次计算的重复调度容易导致这些资源的重复初始化,需要用户对执行逻辑有一定的理解,才能合理地初始化资源,避免资源的重复创建;这就提高了使用门槛,容易埋下隐患;通过业务支持发现,在实际生产过程中,经常会遇到大并发的Spark Streaming作业给Kafka或HBase等存储系统带来巨大连接压力的情况,就是因为用户在计算逻辑中一直重复创建连接。

Spark在官方文档提供了一些避免重复创建网络连接的示例代码,其核心思想就是通过连接池来复用连接:

这个信息提供了该分片数据的位置信息,即所在的节点;Spark在调度该分片的计算的时候,会尽量把该分片的计算调度到数据所在的节点,从而提高计算效率。比如对于KafkaRDD,该方法返回的就是topic partition的leader节点信息:

rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

需要指出的是,即使用户代码层面合理的使用了连接池,由于同一个“计算”逻辑不一定调度到同一个计算节点,还是可能会出现在不同计算节点上重新创建连接的情况。

Flink和Storm类似,都是通过“调度数据”来完成计算的,也就是“计算逻辑”初始化并启动后,如果没有异常会一直执行,源源不断地消费上游的数据,处理后发送到下游;有点像工厂里的流水线,货物在传送带上一直传递,每个工人专注完成自己的处理逻辑即可。

虽然“调度数据”和“调度计算”有各自的优势,但是在流式计算的实际生产场景中,“调度计算”很可能“有力使不出来”;比如一般流式计算都是消费消息队列Kafka或Talos的数据进行处理,而实际生产环境中为了保证消息队列的低延迟和易维护,一般不会和计算节点(比如Yarn服务的节点)混布,而是有各自的机器(不过很多时候是在同一机房);所以无论是Spark还是Flink,都无法避免消息队列数据的跨网络传输。所以从实际使用体验上讲,Flink的调度数据模式,显然更容易减少损耗,提高计算效率,同时在使用上更符合用户“直觉”,不易出现重复创建资源的情况。

不过这里不得不提的一点是,Spark Streaming的“调度计算”模式,对于处理计算系统中的“慢节点”或“异常节点”有天然的优势。比如如果Yarn集群中有一台节点磁盘存在异常,导致计算不停地失败,Spark可以通过blacklist机制停止调度计算到该节点,从而保证整个作业的稳定性。或者有一台计算节点的CPU Load偏高,导致处理比较慢,Spark也可以通过speculation机制及时把同一计算调度到其他节点,避免慢节点拖慢整个作业;而以上特性在Flink中都是缺失的。

Mini batch vs streaming

Spark Streaming并不是真正意义上的流式计算,而是从批处理衍生出来的mini batch计算。如图所示,Spark根据RDD依赖关系中的shuffle dependency进行作业的Stage划分,每个Stage根据RDD的partition信息切分成不同的分片;在实际执行的时候,只有当每个分片对应的计算结束之后,整个个Stage才算计算完成。

这种模式容易出现“长尾效应”,比如如果某个分片数据量偏大,那么其他分片也必须等这个分片计算完成后,才能进行下一轮的计算(Spark speculation对这种情况也没有好的作用,因为这个是由于分片数据不均匀导致的),这样既增加了其他分片的数据处理延迟,也浪费了资源。 

而Flink则是为真正的流式计算而设计的(并且把批处理抽象成有限流的数据计算),上游数据是持续发送到下游的,这样就避免了某个长尾分片导致其他分片计算“空闲”的情况,而是持续在处理数据,这在一定程度上提高了计算资源的利用率,降低了延迟。

当然,这里又要说一下mini batch的优点了,那就在异常恢复的时候,可以以比较低的代价把缺失的分片数据恢复过来,这个主要归功于RDD的依赖关系抽象;如上图所示,如果黑色块表示的数据丢失(比如节点异常),Spark仅需要通过重放“Good-Replay”表示的数据分片就可以把丢失的数据恢复,这个恢复效率是很高的。

而Flink的话则需要停止整个“流水线”上的算子,并从Checkpoint恢复和重放数据;虽然Flink对这一点有一些优化,比如可以配置failover strategy为region来减少受影响的算子,不过相比于Spark只需要从上个Stage的数据恢复受影响的分片来讲,代价还是有点大。

总之,通过对比可以看出,Flink的streaming模式对于低延迟处理数据比较友好,Spark的mini batch模式则于异常恢复比较友好;如果在大部分情况下作业运行稳定的话,Flink在资源利用率和数据处理效率上确实更占优势一些。

数据序列化

简单来说,数据的序列化是指把一个object转化为byte stream,反序列化则相反。序列化主要用于对象的持久化或者网络传输。

常见的序列化格式有binary、json、xml、yaml等;常见的序列化框架有Java原生序列化、Kryo、Thrift、Protobuf、Avro等。

对于分布式计算来讲,数据的传输效率非常重要。好的序列化框架可以通过较低    的序列化时间和较低的内存占用大大提高计算效率和作业稳定性。在数据序列化上,Flink和Spark采用了不同的方式;Spark对于所有数据默认采用Java原生序列化方式,用户也可以配置使用Kryo;而Flink则是自己实现了一套高效率的序列化方法。

首先说一下Java原生的序列化方式,这种方式的好处是比较简单通用,只要对象实现了Serializable接口即可;缺点就是效率比较低,而且如果用户没有指定serialVersionUID的话,很容易出现作业重新编译后,之前的数据无法反序列化出来的情况(这也是Spark Streaming Checkpoint的一个痛点,在业务使用中经常出现修改了代码之后,无法从Checkpoint恢复的问题);当然Java原生序列化还有一些其他弊端,这里不做深入讨论。

有意思的是,Flink官方文档里对于不要使用Java原生序列化强调了三遍,甚至网上有传言Oracle要抛弃Java原生序列化:

相比于Java原生序列化方式,无论是在序列化效率还是序列化结果的内存占用上,Kryo则更好一些(Spark声称一般Kryo会比Java原生节省10x内存占用);Spark文档中表示它们之所以没有把Kryo设置为默认序列化框架的唯一原因是因为Kryo需要用户自己注册需要序列化的类,并且建议用户通过配置开启Kryo。

虽然如此,根据Flink的测试,Kryo依然比Flink自己实现的序列化方式效率要低一些;如图所示是Flink序列化器(PojoSerializer、RowSerializer、TupleSerializer)和Kryo等其他序列化框架的对比,可以看出Flink序列化器还是比较占优势的:

那么Flink到底是怎么做的呢?网上关于Flink序列化的文章已经很多了,这里我简单地说一下我的理解。

像Kryo这种序列化方式,在序列化数据的时候,除了数据中的“值”信息本身,还需要把一些数据的meta信息也写进去(比如对象的Class信息;如果是已经注册过的Class,则写一个更节省内存的ID)。

但是在Flink场景中则完全不需要这样,因为在一个Flink作业DAG中,上游和下游之间传输的数据类型是固定且已知的,所以在序列化的时候只需要按照一定的排列规则把“值”信息写入即可(当然还有一些其他信息,比如是否为null)。

如图所示是一个内嵌POJO的Tuple3类型的序列化形式,可以看出这种序列化方式非常地“紧凑”,大大地节省了内存并提高了效率。另外,Flink自己实现的序列化方式还有一些其他优势,比如直接操作二进制数据等。

凡事都有两面性,自己实现序列化方式也是有一些劣势,比如状态数据的格式兼容性(State Schema Evolution);如果你使用Flink自带的序列化框架序进行状态保存,那么修改状态数据的类信息后,可能在恢复状态时出现不兼容问题(目前Flink仅支持POJO和Avro的格式兼容升级)。

另外,用户为了保证数据能使用Flink自带的序列化器,有时候不得不自己再重写一个POJO类,把外部系统中数据的值再“映射”到这个POJO类中;而根据开发人员对POJO的理解不同,写出来的效果可能不一样,比如之前有个用户很肯定地说自己是按照POJO的规范来定义的类,我查看后发现原来他不小心多加了个logger,这从侧面说明还是有一定的用户使用门槛的。

// Not a POJO demo.public class Person {  private Logger logger = LoggerFactory.getLogger(Person.class);  public String name;  public int age;}

针对这一情况我们做了一些优化尝试,由于在小米内部很多业务是通过Thrfit定义的数据,正常情况下Thrift类是通过Kryo的默认序列化器进行序列化和反序列化的,效率比较低。虽然官方提供了优化文档,可以通过如下方式进行优化,但是对业务来讲也是存在一定使用门槛;

// KafkaRDD
override def getPreferredLocations(thePart: Partition): Seq[String] = {
    val part = thePart.asInstanceOf[KafkaRDDPartition]
    Seq(part.host) 
    // host: preferred kafka host, i.e. the leader at the time the rdd was created
  }

于是我们通过修改Flink中Kryo序列化器的相关逻辑,实现了对Thrfit类默认使用Thrift自己序列化器的优化,在大大提高了数据序列化效率的同时,也降低了业务的使用门槛。

总之,通过自己定制序列化器的方式,确实让Flink在数据处理效率上更有优势,这样作业就可以通过占用更低的带宽和更少的计算资源完成计算了。

本文小结

Flink和Spark Streaming有非常大的差别,也有各自的优势,这里我只是简单介绍了一下自己浅薄的理解,不是很深入。不过从实际应用效果来看,Flink确实通过高效的数据处理和资源利用,实现了成本上的优化;希望能有更多业务可以了解并试用Flink,后续我们也会通过Flink SQL为更多业务提供简单易用的流式计算支持,谢谢!

参考文献

{1}《Deep Dive on Apache Flink State》 – Seth Wiesmanhttps://www.slideshare.net/dataArtisans/webinar-deep-dive-on-apache-flink-state-seth-wiesman

{2}Flink 原理与实现:内存管理https://ververica.cn/developers/flink-principle-memory-management

{3}Batch Processing — Apache Sparkhttps://blog.k2datascience.com/batch-processing-apache-spark-a67016008167

Image placeholder
土土爹
未设置
  89人点赞

没有讨论,发表一下自己的看法吧

推荐文章
基于时序数据库做监控,这里有超流行的开源方案

在微服务架构下,我们对服务进行了拆分,所以用户的每次请求不再是由某一个服务独立完成了,而是变成了多个服务一起配合完成。这种情况下,一旦请求出现异常,我们必须得知道是在哪个服务环节出了故障,就需要对每一

低代码平台在移动开发方面的缺陷

本文由公众号EAWorld翻译发表,转载需注明出处。作者:TimoRailo 译者:白小白 原题:Whymostlow-codeplatformsfallshortonmobiledevelopmen

瓜子二手车在 Dubbo 版本升级、多机房方案方面的思考和实践

前言随着瓜子业务的不断发展,系统规模在逐渐扩大,目前在瓜子的私有云上已经运行着数百个Dubbo应用,上千个Dubbo实例。瓜子各部门业务迅速发展,版本没有来得及统一,各个部门都有自己的用法。随着第二机

vue..js 编写的简单音乐播放器

闲暇之余写了一个音乐小应用项目目录代码开始index.html 每日推荐音乐 {{music.title}} {{music.author}} n

Laravel 7 的简单隐式路由模型绑定

在2020年二月份即将到来的Laravel的下一个主要发行版本 ,你可以直接在路由定义中自定义隐式路由模型绑定:Route::get('/posts/{post:slug}',function(Pos

JavaScript中对“this”的简单理解

1.this的奥秘很多时候,JS中的this对于咱们的初学者很容易产生困惑不解。this的功能很强大,但需要一定付出才能慢慢理解它。对Java、PHP或其他标准语言来看,this表示类方法中当前对象的

Disruptor的简单介绍与应用

前言最近工作比较忙,在工作项目中,看了很多人都自己实现了一套数据任务处理机制,个人感觉有点乱,且也方便他人的后续维护,所以想到了一种数据处理模式,即生产者、缓冲队列、消费者的模式来统一大家的实现逻辑。

个人学习系列 - httpd的简单应用

想学习一下前端的代码,自然而然就希望能部署并观察一下自己写的烂代码了。所以,就研究一下httpd这个工具了。httpd的使用docker中的httpd的获取查询httpd的镜像并下载 查询httpd

从五个方面入手,保障微服务应用安全

随着计算机、互联网技术的飞速发展,信息安全已然是一个全民关心的问题,也是各大企业非常重视的问题。企业一般会从多个层次着手保障信息安全,如:物理安全、网络安全、系统安全(主机和操作系统)、应用安全等。对

三个方面告诉你,为什么说传统安全托管服务已过时

随着组织发展其安全程序,其安全环境的复杂性也在增长。复杂性和变化要求采用一种全新的方式来应对现代安全运营中心(SOC)。根据Gartner的数据现实,到2022年,50%的SOC将转变为具有集体事件响

基于JS的高性能Flutter动态化框架MXFlutter

导语:18年10月份,手机QQ看点团队尝试使用Flutter,做为iOS开发,一接触到Flutter就马上感受到,Flutter虽然强大,但不能像RN一样动态化是阻碍我们使用她的唯一障碍了。看Goog

Flutter路由项目实战之fluro

github:https://github.com/zhengzhuan...关于flutter路由,在小项目中,就按照原生写法,但是在大型项目中,这样的我就不会进行推荐,我这里使用的fluro路由管

Laravel 里面的 chunk 分块效率问题

laravel里面的chunk分块效率问题 在批处理较大数据数据时,laravel提供了chunk处理大块数据的方法,但数据量大了之后效率会非常慢 本次数据库测试数据供有二十万零一千(201000)

两年Flink迁移之路:从standalone到on yarn,处理能力提升五倍

一、背景与痛点在2017年上半年以前,TalkingData的AppAnalytics和GameAnalytics两个产品,流式框架使用的是自研的td-etl-framework。该框架降低了开发流式

数据结构与算法分析——开篇以及复杂度分析

开篇 你也许已经发现了,工作了几年,原以为已经是一只老鸟。但看到刚参加工作的同事,你发现,原来自己一直在原地踏步。跟新人相比,你的唯一优势就是对业务更熟悉而已,别的就没有什么优势了。 怎样才能够让自己

数据结构与算法分析——开篇以及复杂度分析

开篇你也许已经发现了,工作了几年,原以为已经是一只老鸟。但看到刚参加工作的同事,你发现,原来自己一直在原地踏步。跟新人相比,你的唯一优势就是对业务更熟悉而已,别的就没有什么优势了。怎样才能够让自己更上

Python数据分析实战 | 爬遍拉勾网,带你看看数据分析师还吃香吗?

微信公众号:「Python读财」如有问题或建议,请公众号留言伴随着移动互联网的飞速发展,越来越多用户被互联网连接在一起,用户所积累下来的数据越来越多,市场对数据方面人才的需求也越来越大,由此也带火了如

ThinkPHP6 核心分析(一):Http 类的实例化

从入口文件出发 当访问一个ThinkPHP搭建的站点,框架最先是从入口文件开始的,然后才是应用初始化、路由解析、控制器调用和响应输出等操作。入口文件主要代码如下: //引入自动加载器,实现类的自动加载

ThinkPHP5-使用 think-API 部署 JWT

因为下一个项目要用TP5开发一个小程序,所以就使用到了,TP框架,因为小程序开发需要后台来编写api接口,所以就上网查了一下有没有相关的依赖,在此推荐一下think-api扩展工具,因为主要想使用其中

集成 think-ORM 的 symfony bundle thinkorm-bundle

thinkorm-bundleSymfonyThinkOrmBundle关于thinkorm-bundle允许在你symfony使用thinkorm.所安装$composerrequireccwwwo

AI 计算竞争升级,参访平安科技背后的硬实力

平安科技的四块科技版图,分别是云、认知、区块链和人工智能。所有的AI公司在AI领域中最核心的壁垒不是技术,因为技术都是人创造的,打磨团队就可以。核心的壁垒应该时间、业务和场景。智能科技的涌现、大数据

干货:计算机网络知识总结

一计算机概述 (1),基本术语 结点(node): 网络中的结点可以是计算机,集线器,交换机或路由器等。 链路(link): 从一个结点到另一个结点的一段物理线路。中间没有任何其他交点。 主机

Go语言高级编程_3.2 计算机结构

3.2计算机结构 汇编语言是直面计算机的编程语言,因此理解计算机结构是掌握汇编语言的前提。当前流行的计算机基本采用的是冯·诺伊曼计算机体系结构(在某些特殊领域还有哈佛体系架构)。冯·诺依曼结构也称为普

学计算机我后悔了的原因盘点

  学计算机我后悔了的原因盘点,课程进度快加班多、技术更新换代快需要不断学习,整个IT行业对快速迭代要求很高越快占领市场越好。产品质量难以把控,会频繁出现bug需要及时修改bug。   学计算机我后悔

历史上最著名计算机病毒,似乎都成了我们的回忆

Windows勒索病毒似乎让全球计算机用户都闻风丧胆,不过这其实真的不算什么。然而令人始料不及的是,即便勒索病毒传遍了100多个国家,也仅仅才收获了5万美金。所以说勒索病毒真的不算啥。历史上比勒索病毒