2015-08-16 40 views
1

基本上我有一个RDD/DataFrame包含一系列事件(带有一些categoryId)。他们是时间戳,并按时间排序。 我想要做的是扫描每个类别中的所有事件,同时保持/更新某些状态,以记住是否发生了某些事件。一些示例:Spark - 如何使用有状态映射器对排序的RDD进行平面映射?

  • 用户登录到亚马逊(记录新的会话ID,时间戳)
  • 用户添加项目到晒(增量筐尺寸为1)
  • 用户执行花了结账(增量资金,输出RDD新增项目:+的sessionId + start_timestamp
  • 用户添加其他东西到篮下
  • 进行结账号码,如果项目+花的钱) - >下一个项目添加到输出RDD

所以我非常想用一个有状态的映射器(它记住了以前的项目)做一个flatMap。该映射器可以通过categoryId具有“状态”的映射。但是,有几百万个类别呢?有没有比按类别+时间戳排序更好的方法? 我还需要确保整个类别位于单个节点上。在这种情况下,我应该按类别分区吗?我不确定是否有数百万个分区是好主意。

回答

1

由于你的问题是相当普遍的,你会得到一个通用的答案。除非你有充分的理由不应该使用Data FramesWindow Functions

上述第一项将为您提供Catalyst Optimizer的所有好处。第二个应该提供您可能的操作来处理你的数据,你所描述:

  • PARTITION BY - 按类别
  • ORDER BY分区数据 - 为了通过时间戳
  • FRAMEROWS/RANGE) - 可选窗口大小限制
  • 实际functions来执行所需操作

附注

我不知道,如果数以百万计的分区是个好主意。

不,这不是一个好主意,而是通过一些关键的划分并不意味着你需要相同数量的分区作为唯一键的数量:

import org.apache.spark.HashPartitioner 
val rdd = sc.parallelize(
    (1 to 10).flatMap(k => (1 to 100).map(_ => (k, scala.util.Random.nextInt))) 
).partitionBy(new HashPartitioner(2)) 

在上面的例子中,你有10个不同的值,但只有2个分区。

+0

有道理。我想现在是升级到1.4的时候了。 – mabn

相关问题