2017-08-10 64 views
6

我正在尝试将两个不同窗口的流聚合并将其打印到控制台中。但是,只有第一个流式查询正在打印。 tenSecsQ未打印到控制台中。在火花结构化流式传输中执行单独的流式查询

SparkSession spark = SparkSession 
    .builder() 
    .appName("JavaStructuredNetworkWordCountWindowed") 
    .config("spark.master", "local[*]") 
    .getOrCreate(); 

Dataset<Row> lines = spark 
    .readStream() 
    .format("socket") 
    .option("host", host) 
    .option("port", port) 
    .option("includeTimestamp", true) 
    .load(); 

Dataset<Row> words = lines 
    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())) 
    .toDF("word", "timestamp"); 

// 5 second window 
Dataset<Row> fiveSecs = words 
    .groupBy(
     functions.window(words.col("timestamp"), "5 seconds"), 
     words.col("word") 
    ).count().orderBy("window"); 

// 10 second window 
Dataset<Row> tenSecs = words 
    .groupBy(
      functions.window(words.col("timestamp"), "10 seconds"), 
      words.col("word") 
    ).count().orderBy("window"); 

针对5s和10s聚合流的触发流查询。 10s数据流的输出不会打印。只有5s印在控制台上

// Start writeStream() for 5s window 
StreamingQuery fiveSecQ = fiveSecs.writeStream() 
    .queryName("5_secs") 
    .outputMode("complete") 
    .format("console") 
    .option("truncate", "false") 
    .start(); 

// Start writeStream() for 10s window 
StreamingQuery tenSecsQ = tenSecs.writeStream() 
    .queryName("10_secs") 
    .outputMode("complete") 
    .format("console") 
    .option("truncate", "false") 
    .start(); 

tenSecsQ.awaitTermination(); 
+0

实际上,我不知道套接字流是如何工作的,但对我来说似乎你的第一个Spark流从套接字流中读取所有数据,而第二个没有任何数据。 –

回答

5

我一直在调查这个问题。

摘要:结构化数据流中的每个查询消耗source数据。套接字源为每个定义的查询创建一个新的连接。在这种情况下看到的行为是因为nc仅将输入数据传递到第一个连接。

因此,除非我们可以确保连接的套接字源向每个打开的连接传递相同的数据,否则无法通过套接字连接定义多个聚合。


我在Spark邮件列表中讨论了这个问题。 Databricks开发人员朱世雄回答:

Spark为每个查询创建一个连接。你观察到的行为是因为“nc -lk”是如何工作的。如果您使用netstat来检查tcp连接,则会在启动两个查询时看到两个连接。但是,“nc”仅将输入转发给一个连接。

我定义了一个小实验验证了这种行为: 首先,我创建了一个SimpleTCPWordServer,提供随机的话给每个连接开放和声明两个查询一个基本的结构化数据流的工作。它们之间唯一的区别是,第二次查询定义了一个额外的常数列来区分其输出:

val lines = spark 
    .readStream 
    .format("socket") 
    .option("host", "localhost") 
    .option("port", "9999") 
    .option("includeTimestamp", true) 
    .load() 

val q1 = lines.writeStream 
    .outputMode("append") 
    .format("console") 
    .trigger(Trigger.ProcessingTime("5 seconds")) 
    .start() 

val q2 = lines.withColumn("foo", lit("foo")).writeStream 
    .outputMode("append") 
    .format("console") 
    .trigger(Trigger.ProcessingTime("7 seconds")) 
    .start() 

如果StructuredStreaming将消耗只有一个流,那么我们应该看到这两个查询发表了相同的话。在每个查询消耗单独的流的情况下,那么我们将由每个查询报告不同的单词。

这是观测的输出:

------------------------------------------- 
Batch: 0 
------------------------------------------- 
+--------+-------------------+ 
| value|   timestamp| 
+--------+-------------------+ 
|champion|2017-08-14 13:54:51| 
+--------+-------------------+ 

+------+-------------------+---+ 
| value|   timestamp|foo| 
+------+-------------------+---+ 
|belong|2017-08-14 13:54:51|foo| 
+------+-------------------+---+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+-------+-------------------+---+ 
| value|   timestamp|foo| 
+-------+-------------------+---+ 
| agenda|2017-08-14 13:54:52|foo| 
|ceiling|2017-08-14 13:54:52|foo| 
| bear|2017-08-14 13:54:53|foo| 
+-------+-------------------+---+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+----------+-------------------+ 
|  value|   timestamp| 
+----------+-------------------+ 
| breath|2017-08-14 13:54:52| 
|anticipate|2017-08-14 13:54:52| 
| amazing|2017-08-14 13:54:52| 
| bottle|2017-08-14 13:54:53| 
| calculate|2017-08-14 13:54:53| 
|  asset|2017-08-14 13:54:54| 
|  cell|2017-08-14 13:54:54| 
+----------+-------------------+ 

我们可以清楚地看到,每个查询的数据流是不同的。除非我们可以保证TCP后端服务器向每个打开的连接提供完全相同的数据,否则看起来不可能在socket source提供的数据上定义多个聚合。

相关问题