转载

Spark“并行”写

spark本来已经是一个分布式的计算平台,按说不应该手工去处理并行/异步的事情。但是,最近我实现的一个spark任务,需要一次写出数十个分区的数据,虽然这些分区的数据之间完全独立,但坑爹的是,基础数据平台提供的写数据接口只支持 同步 的一次写一个分区的数据。这样造成的结果就是,用循环来实现时,虽然我有很多个计算节点,数据(RDD)也分布于各个节点之上,但是我只能等一个分区写完成后,再写下一个分区:因为“写分区”这个任务的下发是同步阻塞的。

partitions
  .map(part =>  writeToDisk(data.filter(part), part))

引入Future

这里要感谢scala提供的Future方案。它可以方便的将同步的阻塞操作包装成异步操作并行下发。

配合Await.ready操作来等待所有future完成,我们可以将上面的代码改写为:

partitions
  .map(part => Future { writeToDisk(data.filter(data.part == part), part) })
  .map(f => Await.ready(f, Duration.Inf))

避免data重复计算

在spark中,我们知道使用cache/persist可以避免数据流的重复计算。在这里也是一样,Future之前需要将data用cache/persist打个点。

但是这样还!不!够!

在这里我们希望发生的事情是data在future之前先计算好(只计算一次),然后异步的分发下去写对应的分区。

但是由于spark的惰性计算特性,使用Future之后,多个job并行下发,每个job在执行时data都还没有计算出来,也就没有cache的数据。反应到spark ui上的jobs页面的情况就是,看上去多个job并行执行了,但是cache操作并没有带来tasks skipped。

这时,我们需要在future之前,强制把data计算出来并cache住。这里其实只需要调用一些不影响数据的action算子即可,例如data.count()。

最终的结果,在使用上面的改进措施之后,我的这个spark任务执行时间缩短了约60%。

推荐阅读:

使用双buffer无锁化

不要拷贝

一个新朋友 Git Hooks

转载请注明出处: http://blog.guoyb.com/2018/04/21/spark-scala-future/

欢迎使用微信扫描下方二维码,关注我的微信公众号TechTalking,技术·生活·思考:

Spark“并行”写
原文  http://blog.guoyb.com/2018/04/21/spark-scala-future/
正文到此结束
Loading...