2016-12-05 37 views
2

我使用Spark Streaming v2.0.0从Kafka检索日志并进行一些操作。我正在使用功能mapWithState以保存和更新与设备相关的某些字段。我想知道这个函数如何在集群中工作。事实上,我现在只是使用独立模式,但稍后我会尝试使用Yarn群集。Spark Streaming:mapWithState函数如何在集群中工作?

但是,假设我有一个有多个节点的集群,如果一个节点更新设备的状态,他是否立即通知此更新的所有其他节点?如果否,则需要设置群集中的mapWithState函数。我该怎么做?

回答

3

但是,假设我有一个有多个节点的集群,如果一个节点更新设备的状态,他是否立即通知此更新的所有其他节点?如果否,则需要设置集群中的mapWithState函数。

这不是mapWithState的工作原理。 mapWithState是一个洗牌阶段,这意味着它会导致群集中的数据移动。这如何影响mapWithState?每个条目(键值对)将被洗牌到一个特定的执行者。在随后到达同一密钥时,无论执行者在给定时间从输入流处理它,它都将被洗牌到持有内存映射的节点和先前消息的状态。这是通过HashPartitioner默认完成的,它会散列密钥,然后将其发送到保存状态的正确执行器,这就是为什么您需要仔细选择密钥。

这意味着特定密钥的状态不会散布在整个群集中。它被分配到集群内的一个特定的执行器,并且每次基于密钥的哈希值,传入的数据将保持回到那个。

+1

谢谢Yuval,这非常清楚! –

+0

Hi Yuval,你有关于HashPartitioner的任何文档。我只有这个链接,但我不明白它是如何工作的。 http://spark.apache.org/docs/2.0.2/api/java/index.html?org/apache/spark/HashPartitioner.html –

+1

http://stackoverflow.com/questions/31424396/how-does- hashpartitioner-work –

0

所有有状态转换都通过密钥对数据进行混洗,因此特定密钥的所有值都在同一个执行程序线程上处理。

不需要额外的同步和一个特定的键状态总是一致的。

+0

当你说“同一台机器”时,你的意思是“同一个节点”?如果我不了解你写的内容,那么节点之间已经有了一个关于变换的同步? –

+0

我的意思是执行者线程。 – user7252138

-1

检查点是作为目录提供的,所以可以从本地文件系统,NFS挂载,HDFS托管或S3托管!

现在,考虑YARN + HDFS组合。由于mapWithState而写入检查点的任何数据将根据状态密钥分布在不同的HDFS节点上,并且spark会尝试在相同节点上调度依赖于它的任务。

但是如果你考虑的话,YARN + NFS(可能根本不合逻辑)。每个节点应该在相同的挂载点挂载NFS,并且每个读/写请求将是一个NFS请求。这将创造一个完美的瓶颈!

让我们假设,状态管理用户会话。我们可能会选择保留每个用户少量的信息或多个GB信息。状态中的键应该以某种方式唯一标识用户,并且每次触发mapWithState函数时,都可以访问保存在该用户状态中的所有信息。

+0

我不会检查点到S3;检查点依赖于重命名来提交检查点,并且在非常缓慢且不是原子的对象存储上。 –

相关问题