2014-04-17 32 views
4

我正在研究Spark流式处理实时数据,并且我构建了火花流式传输的例子wordCount,并且我可以运行以下示例: /bin/run-例子org.apache.spark.streaming.examples.JavaNetworkWordCount local [2] localhost 9999Spark:为什么流式传输不能连接java套接字客户端

我在另一个终端上运行“nc -L -p 9999”,然后我可以在这个终端上键入字母,这个例子可以收到信件并给出正确的结果。

但是我开发了一个java socket客户端发送内容到9999端口,为什么不能接收它的例子?我认为这个例子只是监视9999端口,并从端口接收任何东西。

以下是Java部分:

File file = new File("D:\\OutputJson.dat"); 
    long l = file.length(); 
    socket = new Socket(); 
    boolean connected = false; 
    while (!connected) { 
     //not stop until send successful 
     try { 
      socket.connect(new InetSocketAddress("localhost", 9999)); 
      connected = true; 
      System.out.println("connected success!"); 
     } catch (Exception e) { 
      e.printStackTrace(); 
      System.out.println("connected failed!"); 
      Thread.sleep(5000); 
     } 
    } 
    dos = new DataOutputStream(socket.getOutputStream()); 
    fis = new FileInputStream(file); 
    sendBytes = new byte[1024]; 
    while ((length = fis.read(sendBytes, 0, sendBytes.length)) > 0) { 
     sumL += length; 
     System.out.println("sent:" + ((sumL/l) * 100) + "%"); 
     dos.write(sendBytes, 0, length); 
     dos.flush(); 
    } 
    if (sumL == l) { 
     bool = true; 
    } 

这个Java函数总是返回错误: java.net.SocketException异常:插座关闭

我已经开发了另一个Java类从这个发送接收数据套接字,它工作正常,为什么火花不能接收?

+1

JavaNetworkWordCount假定记录由\ n分隔。如果你的OutputJson.data中的数据没有被\ n隔开,那么火花流接收器不会找到记录的结尾,所以不能正确接收任何内容。 –

回答

-1

从内存我想我使用了一个ServerSocket。该代码类似于:

public void sendMsg(String msg) throws IOException { 
    ServerSocket serverSocket = null; 
    Socket clientSocket = null; 
    try { 
     serverSocket = new ServerSocket(port); 
     clientSocket = serverSocket.accept(); 
     PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); 
     out.write(msg); 
     out.flush(); 
     out.close(); 
    } finally { 
     try { 
      clientSocket.close(); 
      serverSocket.close(); 
     } finally { 
      clientSocket = null; 
      serverSocket = null; 
     } 
    } 
} 
+0

这个问题不能用这个方法来解决,因为spark不能从执行者写出来 – pcejrowski