2016-12-25 69 views
2

http://spark.apache.org/docs/latest/streaming-programming-guide.html 提到的例子让我在TCP流接收数据包并监听端口9999星火斯卡拉UDP收到监听端口

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 

// Create a local StreamingContext with two working thread and batch interval of 1 second. 
// The master requires 2 cores to prevent from a starvation scenario. 

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") 
val ssc = new StreamingContext(conf, Seconds(1)) 


// Create a DStream that will connect to hostname:port, like localhost:9999 
val lines = ssc.socketTextStream("localhost", 9999) 
// Split each line into words 
val words = lines.flatMap(_.split(" ")) 
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 
// Count each word in each batch 
val pairs = words.map(word => (word, 1)) 
val wordCounts = pairs.reduceByKey(_ + _) 

// Print the first ten elements of each RDD generated in this DStream to the console 
wordCounts.print() 
ssc.start()    // Start the computation 
ssc.awaitTermination() // Wait for the computation to terminate 

我能够通过TCP发送数据通过使用在我的Linux系统创建一个数据服务器 $ nc -lk 9999

问题
我需要从一个机器人电话流使用UDP和Scala接收流/火花
VAL线= ssc.socketTextStream( “本地主机”,9999)
仅接收在TCP流。

如何使用Scala + Spark以类似的简单方式接收UDP流并创建Spark DStream。

回答

3

有没有内置的东西,但它并没有太多的工作让它自己完成。下面是我根据自定义UdpSocketInputDStream[T]做了一个简单的解决方案:

object Implicits { 
    implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal { 
    def udpSocketStream[T: ClassTag](host: String, 
            port: Int, 
            converter: InputStream => Iterator[T], 
            storageLevel: StorageLevel): InputDStream[T] = { 
     new UdpSocketInputDStream(ssc, host, port, converter, storageLevel) 
    } 
    } 
} 

这里是:

import java.io._ 
import java.net.{ConnectException, DatagramPacket, DatagramSocket, InetAddress} 

import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.dstream.ReceiverInputDStream 
import org.apache.spark.streaming.receiver.Receiver 

import scala.reflect.ClassTag 
import scala.util.control.NonFatal 

class UdpSocketInputDStream[T: ClassTag](
              _ssc: StreamingContext, 
              host: String, 
              port: Int, 
              bytesToObjects: InputStream => Iterator[T], 
              storageLevel: StorageLevel 
             ) extends ReceiverInputDStream[T](_ssc) { 

    def getReceiver(): Receiver[T] = { 
    new UdpSocketReceiver(host, port, bytesToObjects, storageLevel) 
    } 
} 

class UdpSocketReceiver[T: ClassTag](host: String, 
            port: Int, 
            bytesToObjects: InputStream => Iterator[T], 
            storageLevel: StorageLevel) extends Receiver[T](storageLevel) { 

    var udpSocket: DatagramSocket = _ 

    override def onStart(): Unit = { 

    try { 
     udpSocket = new DatagramSocket(port, InetAddress.getByName(host)) 
    } catch { 
     case e: ConnectException => 
     restart(s"Error connecting to $port", e) 
     return 
    } 

    // Start the thread that receives data over a connection 
    new Thread("Udp Socket Receiver") { 
     setDaemon(true) 

     override def run() { 
     receive() 
     } 
    }.start() 
    } 

    /** Create a socket connection and receive data until receiver is stopped */ 
    def receive() { 
    try { 
     val buffer = new Array[Byte](2048) 

     // Create a packet to receive data into the buffer 
     val packet = new DatagramPacket(buffer, buffer.length) 

     udpSocket.receive(packet) 

     val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength)) 
     // Now loop forever, waiting to receive packets and printing them. 
     while (!isStopped() && iterator.hasNext) { 
     store(iterator.next()) 
     } 

     if (!isStopped()) { 
     restart("Udp socket data stream had no more data") 
     } 
    } catch { 
     case NonFatal(e) => 
     restart("Error receiving data", e) 
    } finally { 
     onStop() 
    } 
    } 

    override def onStop(): Unit = { 
    synchronized { 
     if (udpSocket != null) { 
     udpSocket.close() 
     udpSocket = null 
     } 
    } 
    } 
} 

为了得到StreamingContext对自身添加一个方法,我们有一个隐含的类丰富它你怎么称呼这一切:

import java.io.{BufferedReader, InputStream, InputStreamReader} 
import java.nio.charset.StandardCharsets 

import org.apache.spark.SparkContext 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.dstream.InputDStream 
import org.apache.spark.streaming.{Seconds, StreamingContext} 

import scala.reflect.ClassTag 

object TestRunner { 
    import Implicits._ 

    def main(args: Array[String]): Unit = { 
    val sparkContext = new SparkContext("local[*]", "udpTest") 
    val ssc = new StreamingContext(sparkContext, Seconds(4)) 

    val stream = ssc.udpSocketStream("localhost", 
            3003, 
            bytesToLines, 
            StorageLevel.MEMORY_AND_DISK_SER_2) 
    stream.print() 

    ssc.start() 
    ssc.awaitTermination() 
    } 

    def bytesToLines(inputStream: InputStream): Iterator[String] = { 
    val dataInputStream = new BufferedReader(
     new InputStreamReader(inputStream, StandardCharsets.UTF_8)) 
    new NextIterator[String] { 
     protected override def getNext(): String = { 
     val nextValue = dataInputStream.readLine() 
     if (nextValue == null) { 
      finished = true 
     } 
     nextValue 
     } 

     protected override def close() { 
     dataInputStream.close() 
     } 
    } 
    } 

    abstract class NextIterator[U] extends Iterator[U] { 
    protected var finished = false 
    private var gotNext = false 
    private var nextValue: U = _ 
    private var closed = false 

    override def next(): U = { 
     if (!hasNext) { 
     throw new NoSuchElementException("End of stream") 
     } 
     gotNext = false 
     nextValue 
    } 

    override def hasNext: Boolean = { 
     if (!finished) { 
     if (!gotNext) { 
      nextValue = getNext() 
      if (finished) { 
      closeIfNeeded() 
      } 
      gotNext = true 
     } 
     } 
     !finished 
    } 

    def closeIfNeeded() { 
     if (!closed) { 
     closed = true 
     close() 
     } 
    } 

    protected def getNext(): U 
    protected def close() 
    } 
} 

大部分代码是由星火提供的SocketInputDStream[T]拍摄,我简单地重新使用它。我还使用bytesToLines所使用的NextIterator的代码,它所做的全部操作是从包中消耗线路并将其转换为String。如果你有更复杂的逻辑,你可以通过传递converter: InputStream => Iterator[T]来实现你自己的实现。

测试它用简单的UDP包:

echo -n "hello hello hello!" >/dev/udp/localhost/3003 

收率:

------------------------------------------- 
Time: 1482676728000 ms 
------------------------------------------- 
hello hello hello! 

当然,这还需要进一步的测试。我也有一个隐藏的假设,从DatagramPacket创建的每个buffer都是2048字节,这可能是您想要更改的内容。

+0

Python中可用的类似代码的任何可能性。即采用Spark Streaming的Python或采用Spark结构化流的Python。 – user3698581

+0

我需要清除掉我的Python技能。我会尽快看看是否有时间。 –

+0

非常感谢你。我发布了一个问题,并收到了评论http://stackoverflow.com/questions/42458812/spark-streaming-custom-receiver-in-python-receive-udp-over-socket – user3698581