2017-08-16 30 views
3

我从this question中看到Spark节点有效地“直接沟通”,但我不太关心理论,更多地关注实现。 Here它显示在页面底部附近的“###加密”部分中,您可以将Spark配置为使用多种SSL协议进行安全性,至少对我而言,这表明它使用某种形式的HTTP(s)进行通信。我的问题实际上有两部分:Spark节点使用什么协议进行通信,以及为此传输格式化的数据如何?在Shuffle期间Spark节点如何通信?

回答

3

Spark使用RPC(Netty)在执行程序进程之间进行通信。您可以查看NettyRpcEndpointRef类来查看实际的实现。

对于洗牌数据,我们从BlockManager开始,它负责提供数据块。每个执行者进程都有一个。内部使用SerializerManager管理从不同执行者读取的BlockStoreShuffleReader。此管理器保存实际串行器,其通过所述spark.serializer属性定义:

val serializer = instantiateClassFromConf[Serializer](
    "spark.serializer", "org.apache.spark.serializer.JavaSerializer") 
logDebug(s"Using serializer: ${serializer.getClass}") 

BlockManager试图读取块时,它使用从底层配置串行器。它可以是KryoSerializerJavaSerializer,具体取决于您的设置。

底线,用于读取和写入混洗数据Spark使用用户定义的序列化程序。


对于任务序列化,这有些不同。

Spark使用名为closureSerializer的变量,默认为JavaSerializerInstance,这意味着Java序列化。你可以看到这个DAGScheduler.submitMissingTasks方法中:

val taskBinaryBytes: Array[Byte] = stage match { 
    case stage: ShuffleMapStage => 
    JavaUtils.bufferToArray(
     closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) 
    case stage: ResultStage => 
     JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) 
} 

是被序列化和发送到每个执行实际的对象被称为TaskDescription

def encode(taskDescription: TaskDescription): ByteBuffer = { 
    val bytesOut = new ByteBufferOutputStream(4096) 
    val dataOut = new DataOutputStream(bytesOut) 

    dataOut.writeLong(taskDescription.taskId) 
    dataOut.writeInt(taskDescription.attemptNumber) 
    dataOut.writeUTF(taskDescription.executorId) 
    dataOut.writeUTF(taskDescription.name) 
    dataOut.writeInt(taskDescription.index) 

    // Write files. 
    serializeStringLongMap(taskDescription.addedFiles, dataOut) 

    // Write jars. 
    serializeStringLongMap(taskDescription.addedJars, dataOut) 

    // Write properties. 
    dataOut.writeInt(taskDescription.properties.size()) 
    taskDescription.properties.asScala.foreach { case (key, value) => 
    dataOut.writeUTF(key) 
    // SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values 
    val bytes = value.getBytes(StandardCharsets.UTF_8) 
    dataOut.writeInt(bytes.length) 
    dataOut.write(bytes) 
    } 

    // Write the task. The task is already serialized, so write it directly to the byte buffer. 
    Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut) 

    dataOut.close() 
    bytesOut.close() 
    bytesOut.toByteBuffer 
} 

,并得到来自CoarseGrainedSchedulerBackend.launchTasks方法发送的RPC:

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) 

我到目前为止所展示的是关于启动任务的讨论。对于洗牌数据,Spark拥有一个BlockStoreShuffleReader,该账号管理来自不同执行者的读取。

+1

非常感谢!不能要求更多的信息和直接的答案。 – Chance

+0

作为一个快速的后续行动:据我所知,Spark工作人员在可能的情况下直接从文件系统加载输入数据,但是如果驱动程序首先加载数据并手动将其传递给工作人员,那么这也可以通过RPC完成,已经描述过?谢谢。 – Chance

+1

驱动程序和工作人员通过RPC进行通信。请注意,驱动程序加载数据本身并将其分发给工作人员是非常罕见的。 –