2017-07-26 13 views
0

我略加修改的例子 - https://github.com/apache/spark/blob/v2.2.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala星火结构化数据流,多次查询无法运行,从这里拍摄同时

我加秒writeStream(片):

scala 
case class MyWriter1() extends ForeachWriter[Row]{ 
    override def open(partitionId: Long, version: Long): Boolean = true 

    override def process(value: Row): Unit = { 
    println(s"custom1 - ${value.get(0)}") 
    } 

    override def close(errorOrNull: Throwable): Unit = true 
} 

case class MyWriter2() extends ForeachWriter[(String, Int)]{ 
    override def open(partitionId: Long, version: Long): Boolean = true 

    override def process(value: (String, Int)): Unit = { 
    println(s"custom2 - $value") 
    } 

    override def close(errorOrNull: Throwable): Unit = true 
} 


object Main extends Serializable{ 

    def main(args: Array[String]): Unit = { 
    println("starting") 

    Logger.getLogger("org").setLevel(Level.OFF) 
    Logger.getLogger("akka").setLevel(Level.OFF) 

    val host = "localhost" 
    val port = "9999" 

    val spark = SparkSession 
     .builder 
     .master("local[*]") 
     .appName("app-test") 
     .getOrCreate() 

    import spark.implicits._ 

    // Create DataFrame representing the stream of input lines from connection to host:port 
    val lines = spark.readStream 
     .format("socket") 
     .option("host", host) 
     .option("port", port) 
     .load() 

    // Split the lines into words 
    val words = lines.as[String].flatMap(_.split(" ")) 

    // Generate running word count 
    val wordCounts = words.groupBy("value").count() 

    // Start running the query that prints the running counts to the console 
    val query1 = wordCounts.writeStream 
     .outputMode("update") 
     .foreach(MyWriter1()) 
     .start() 

    val ds = wordCounts.map(x => (x.getAs[String]("value"), x.getAs[Int]("count"))) 

    val query2 = ds.writeStream 
     .outputMode("update") 
     .foreach(MyWriter2()) 
     .start() 

    spark.streams.awaitAnyTermination() 

    } 
} 

遗憾的是,只有第一个查询运行,第二从不(MyWriter2从未被称为)

请指教我做错了什么。根据doc:您可以在单个SparkSession中启动任意数量的查询。他们将全部同时运行共享群集资源。

回答

0

您正在使用.awaitAnyTermination()当第一个流返回时,它将终止应用程序,您必须等待两个流在您终止之前完成。

这样的事情应该做的伎俩:

query1.awaitTermination() 
query2.awaitTermination() 
+0

不幸的是它不。只有query1运行。所以我注意到的是:该查询首先启动。还有什么我应该注意的吗? –

+0

运行应用程序时会发生什么?它是否在第二个查询运行之前终止?显然你必须删除'awaitAnyTermination' – raam86

+1

我删除了awaitAnyTermination和两个查询使用awaitTermination(实际上我已经尝试了所有可能的选项,包括Thread.sleep ...);应用程序不终止,它保持运行并从套接字接收数据;但每次有新数据出现时,只有第一个查询会触发。 –

0

您是否使用nc -lk 9999将数据发送到火花?每个查询创建一个连接到ncnc只能发送数据到第一个连接(查询),你可以写一个tcp服务器而不是nc