2017-02-02 86 views
2

我已经写了火花工作,这确实低于操作星火foreachpartition连接改进

  1. 从HDFS的文本文件中读取数据。
  2. 执行distinct()调用来过滤重复项。
  3. 做一个mapToPair阶段,并产生pairRDD
  4. 做一个reducebykey呼叫
  5. 做分组元组聚集逻辑。
  6. 现在请#5

    这里的foreach它

    1. 作出卡桑德拉分贝
    2. 呼叫创建一个AWS SNS和SQS客户端连接
    3. 做一些JSON记录格式。
    4. 发布记录SNS/SQS

当我运行这个工作它创建了三个阶段的火花

第一阶段 - 它需要近45秒。执行不同的 第二阶段 - mapToPair和reducebykey =需要1.5分钟

第三阶段=需要19分钟

我做了什么

  1. 我关掉卡桑德拉调用,这样看DB打的原因 - 这是以更短的时间
  2. 我发现
  3. 得罪一部分是创建SNS/SQS连接的foreach分区

其采取更多比整个工作时间的60%多

我在foreachPartition中创建了SNS/SQS连接,以减少连接。我们是否有更好的方法

我不能对驾驶员创建连接对象,因为这些不是序列

我没有使用5克执行人9号,executore核心15,司机2G内存,执行内存

我使用16芯64演出存储器 簇大小1个主9从所有相同的结构 EMR部署火花1.6

+0

你确定'创建一个AWS SNS和SQS客户端连接 '正在60%的工作时间或'发布记录SNS/SQS'呢?这两者之间略有不同。对于第一种情况,您需要最小化连接创建的数量,而对于第二种情况,您需要分配数据(并创建更多连接实例)。有趣!!!! – code

+0

如果是第二种情况,我会用解决方案发布答案​​。 – code

回答

1

听起来好像将要设置正好一个每节点SNS/SQS连接,然后用它来处理每个节点上的所有数据。

我认为foreachPartition在这里是正确的想法,但您可能想事先合并RDD。这会在同一个节点上折叠分区而不洗牌,并且可以避免启动额外的SNS/SQS连接。

在这里看到: https://spark.apache.org/docs/latest/api/scala/index.html#[email protected](numPartitions:Int,shuffle:Boolean,partitionCoalescer:Option[org.apache.spark.rdd.PartitionCoalescer])(implicitord:Ordering[T]):org.apache.spark.rdd.RDD[T]

+0

是的,coalesce正是我的解决方案。还有一点我想在此添加。我有很多像23kb,45kb等小文件,并且通过coalesce将它缩小到正确的分区,现在我能够在20分钟内处理接近25GB的数据。在这里改进更多 – Sam

+0

谢谢布拉德利..还有一件事..这是说我需要1TB数据来处理我应该创建多少分区合并? – Sam

+0

所以我会使用足够多的分区,以便每个分区都适合内存,或者我拥有的核心数量。无论哪个更大。 –