2016-11-09 14 views
-1

我在hadoop中有两个Csv,比如说csv1,csv2。两个cvs都包含两列(timestamp和somevalue),例如对于csv1列是t1,v1和对于csv2列是t2,v2。 我想为每个t1 = t2(对于相同的时间戳)计算v1 * v2并使用spark java Api将结果作为文本文件存储在hdfs中。如何从两个csv中读取值并在b/w的spark列中执行操作java api?

我是新来的星星,有人帮我。

Thanx提前。

回答

0

我能做到这一点在Scala中,也许你能得到的我在做什么的依据和实现它自己:

scala> val df1=sc.parallelize(Seq((1001,2),(1002,3),(1003,4))).toDF("t1","v1") 
df1: org.apache.spark.sql.DataFrame = [t1: int, v1: int] 


scala> val df2=sc.parallelize(Seq((1001,3),(1002,4),(1005,4))).toDF("t2","v2") 
df2: org.apache.spark.sql.DataFrame = [t2: int, v2: int] 

scala> df1.join(df2,df1("t1")===df2("t2")) 
res1: org.apache.spark.sql.DataFrame = [t1: int, v1: int ... 2 more fields] 

scala> res1.show 
+----+---+----+---+                
| t1| v1| t2| v2| 
+----+---+----+---+ 
|1002| 3|1002| 4| 
|1001| 2|1001| 3| 
+----+---+----+---+ 

scala> import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.functions._ 

scala> val result=res1.withColumn("foo",res1("v1") * res1("v2")) 
result: org.apache.spark.sql.DataFrame = [t1: int, v1: int ... 3 more fields] 

scala> result.show 
+----+---+----+---+---+               
| t1| v1| t2| v2|foo| 
+----+---+----+---+---+ 
|1002| 3|1002| 4| 12| 
|1001| 2|1001| 3| 6| 
+----+---+----+---+---+ 

我希望这能解决你的问题。

+0

Thanx解决方案,我试着用这些概念,但没有得到确切的解决方案。时间戳列包含值如2016-09-01 15:31:58 + 00:00。我想加载csv并将其拆分成列,结果应该像(t1,v * v2)。 –

+0

然后使用先将其转换为火花时间戳,然后执行这些步骤,或者如果您想以简单方式执行此操作,则只需使用字符串即可。 –

相关问题