2016-03-03 48 views
0

我最近开始使用spark-kernel。使用spark-kernel comm API

正如在教程和示例代码中给出的那样,我能够设置客户端并使用它在spark-kernel上执行代码片段并获取example code中给出的结果。

现在,我需要使用与spark-kernel一起提供的comm API。我试过这个tutorial,但我无法使它工作。事实上,我不了解如何做这项工作。

我试过下面的代码,但是当我运行这段代码时,在内核上出现了这个错误:“接收到通信打开的无效目标:my_target”。

package examples 
import scala.runtime.ScalaRunTime._ 
import scala.collection.mutable.ListBuffer 
import com.ibm.spark.kernel.protocol.v5.MIMEType 
import com.ibm.spark.kernel.protocol.v5.client.boot.ClientBootstrap 
import com.ibm.spark.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization} 
import com.ibm.spark.kernel.protocol.v5.content._ 
import com.typesafe.config.{Config, ConfigFactory} 
import Array._ 

object commclient extends App{ 
val profileJSON: String = """ 
{ 
     "stdin_port" : 48691, 
     "control_port" : 44808, 
     "hb_port" : 49691, 
     "shell_port" : 40544, 
     "iopub_port" : 43462, 
     "ip" : "127.0.0.1", 
     "transport" : "tcp", 
     "signature_scheme" : "hmac-sha256", 
     "key" : "" 
} 
""".stripMargin 

val config: Config = ConfigFactory.parseString(profileJSON) 
val client = (new ClientBootstrap(config) 
    with StandardSystemInitialization 
    with StandardHandlerInitialization).createClient() 

def printResult(result: ExecuteResult) = { 
    println(s"${result.data.get(MIMEType.PlainText).get}") 
} 
def printStreamContent(content:StreamContent) = { 
    println(s"${content.text}") 
} 
def printError(reply:ExecuteReplyError) = { 
    println(s"Error was: ${reply.ename.get}") 
} 

client.comm.register("my_target").addMsgHandler { 
(commWriter, commId, data) => 
    println(data) 
    commWriter.close() 
} 

// Initiate the Comm connection 
client.comm.open("my_target") 

} 

有人能告诉我如何将我运行这段代码:

// Register the callback to respond to being opened from the client 
kernel.comm.register("my target").addOpenHandler { 
    (commWriter, commId, targetName, data) => 
     commWriter.writeMsg(Map("response" -> "Hello World!")) 
} 

我会很感激,如果有人可以点我完成COMM API的使用工作的例子。

任何帮助将不胜感激。谢谢

回答

0

您可以使用您的客户端在一个程序中运行此服务器(内核)端注册一旦。然后你的其他程序可以使用这个通道与内核通信。 这是我在上面提到的第一个程序中执行注册的一种方式:

client.execute(
""" 
// Register the callback to respond to being opened from the client 
kernel.comm.register("my target"). 
    addOpenHandler { 
     (commWriter, commId, targetName, data) => 
      commWriter.writeMsg(org.apache.toree.kernel.protocol.v5.MsgData("response" -> "Toree Hello World!")) 
    }. 
    addMsgHandler { 
     (commWriter, _, data) => 
      if (!data.toString.contains("closing")) { 
       commWriter.writeMsg(data) 
      } else { 
       commWriter.writeMsg(org.apache.toree.kernel.protocol.v5.MsgData("closing" -> "done")) 
      } 
    } 
""".stripMargin 
) 
相关问题