2015-05-19 34 views
0

我是Spark和Scala的新手,所以我的问题可能很容易,但我仍然很难找到答案。我需要加入两个Spark流,但在将这些流转换为适当格式时遇到问题。请参阅下面我的代码:将输入流转换为键值对流

val lines7 = ssc.socketTextStream("localhost", 9997) 
val pairs7 = lines7.map(line => (line.split(" ")[0], line)) 

val lines8 = ssc.socketTextStream("localhost", 9998) 
val pairs8 = lines8.map(line => (line.split(" ")[0], line)) 

val newStream = pairs7.join(pairs8) 

这不起作用,因为“加入”功能,预计在流格式DStream[String, String]和地图功能的结果是DStream[(String, String)]

现在我的问题是如何编码这个映射函数来获得适当的输出(小解释也会很好)?

在此先感谢。

+0

我不知道Spark,但从我可以看到的,[DStream](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming .dstream.DStream)只需要一个类型参数,所以'DStream [String,String]'是不可能的。你可能想要别的东西,但我猜不出是什么。无论是那个,还是'DStream [(String,String)]'是你想要的,但你还不知道它。 –

+0

是否在'StreamingContext._'中导入了隐式转换? – maasg

回答

1

这按预期工作:

import org.apache.spark.streaming.{Seconds, StreamingContext} 

val ssc = new StreamingContext(sc, Seconds(30)) 
val lines7 = ssc.socketTextStream("localhost", 9997) 
val pairs7 = lines7.map(line => (line.split(" ")(0), line)) 
val lines8 = ssc.socketTextStream("localhost", 9998) 
val pairs8 = lines8.map(line => (line.split(" ")(0), line)) 
val newStream = pairs7.join(pairs8) 

newStream.foreachRDD(rdd => println(rdd.collect.map(_.toString).mkString(","))) 

ssc.start 

我看到的唯一问题是语法错误的:line.split(" ")[0] VS line.split(" ")(0)但我想这会被编译器发现。