我从this question中看到Spark节点有效地“直接沟通”,但我不太关心理论,更多地关注实现。 Here它显示在页面底部附近的“###加密”部分中,您可以将Spark配置为使用多种SSL协议进行安全性,至少对我而言,这表明它使用某种形式的HTTP(s)进行通信。我的问题实际上有两部分:Spark节点使用什么协议进行通信,以及为此传输格式化的数据如何?在Shuffle期间Spark节点如何通信?
3
A
回答
3
Spark使用RPC(Netty)在执行程序进程之间进行通信。您可以查看NettyRpcEndpointRef
类来查看实际的实现。
对于洗牌数据,我们从BlockManager
开始,它负责提供数据块。每个执行者进程都有一个。内部使用SerializerManager
管理从不同执行者读取的BlockStoreShuffleReader
。此管理器保存实际串行器,其通过所述spark.serializer
属性定义:
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
当BlockManager
试图读取块时,它使用从底层配置串行器。它可以是KryoSerializer
或JavaSerializer
,具体取决于您的设置。
底线,用于读取和写入混洗数据Spark使用用户定义的序列化程序。
对于任务序列化,这有些不同。
Spark使用名为closureSerializer
的变量,默认为JavaSerializerInstance
,这意味着Java序列化。你可以看到这个DAGScheduler.submitMissingTasks
方法中:
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
是被序列化和发送到每个执行实际的对象被称为TaskDescription
:
def encode(taskDescription: TaskDescription): ByteBuffer = {
val bytesOut = new ByteBufferOutputStream(4096)
val dataOut = new DataOutputStream(bytesOut)
dataOut.writeLong(taskDescription.taskId)
dataOut.writeInt(taskDescription.attemptNumber)
dataOut.writeUTF(taskDescription.executorId)
dataOut.writeUTF(taskDescription.name)
dataOut.writeInt(taskDescription.index)
// Write files.
serializeStringLongMap(taskDescription.addedFiles, dataOut)
// Write jars.
serializeStringLongMap(taskDescription.addedJars, dataOut)
// Write properties.
dataOut.writeInt(taskDescription.properties.size())
taskDescription.properties.asScala.foreach { case (key, value) =>
dataOut.writeUTF(key)
// SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values
val bytes = value.getBytes(StandardCharsets.UTF_8)
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
// Write the task. The task is already serialized, so write it directly to the byte buffer.
Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut)
dataOut.close()
bytesOut.close()
bytesOut.toByteBuffer
}
,并得到来自CoarseGrainedSchedulerBackend.launchTasks
方法发送的RPC:
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
我到目前为止所展示的是关于启动任务的讨论。对于洗牌数据,Spark拥有一个BlockStoreShuffleReader
,该账号管理来自不同执行者的读取。
相关问题
- 1. 如何测量Spark Standalone集群中节点之间的通信?
- 2. 节点间通信
- 3. Spark shuffle error org.apache.spark.shuffle.FetchFailedException:FAILED_TO_UNCOMPRESS(5)
- 4. Spark - Shuffle Read Blocked Time
- 5. 两个Cassandra节点之间的通信
- 6. Cassandra握手和节点间通信
- 7. 群集节点之间的通信
- 8. 天青 - 间业务节点的通信
- 9. 两个节点之间的通信
- 10. 如何监视两个Erlang节点之间的通信
- 11. Spark shuffle为什么在磁盘上存储中间数据?
- 12. Python和节点JS通信
- 13. spark的shuffle读和shuffle写有什么区别?
- 14. 如何使用.NET在MSCS集群节点之间进行协调/通信
- 15. 在部署图中绘制节点之间的通信
- 16. 将spark-jobserver部署到BlueMix Spark节点
- 17. 如何在通话期间在背景中发送短信
- 18. Spark 1.5.2 Shuffle/Serialization - 内存不足
- 19. Spark Streaming 2.0 GC错误(Shuffle Issue)
- 20. Spark中的sortByKey是否会调用shuffle?
- 21. 通过节点中间人
- 22. Spark 2:检查节点是主节点还是工作节点
- 23. 如何在循环期间更新或追加XML节点?
- 24. 如何在重构环境/角色/节点期间使用ChefSpec
- 25. NS-3:如何在模拟期间更改节点位置?
- 26. 如何在neo4j节点中插入当前时间和日期
- 27. 如何估算Spark Shuffle所需的内存和磁盘?
- 28. 单节点独立Spark?
非常感谢!不能要求更多的信息和直接的答案。 – Chance
作为一个快速的后续行动:据我所知,Spark工作人员在可能的情况下直接从文件系统加载输入数据,但是如果驱动程序首先加载数据并手动将其传递给工作人员,那么这也可以通过RPC完成,已经描述过?谢谢。 – Chance
驱动程序和工作人员通过RPC进行通信。请注意,驱动程序加载数据本身并将其分发给工作人员是非常罕见的。 –