2016-05-26 70 views
3

我在https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html上看到了一个Dataframes教程,这个教程写在Python。我正试图将它翻译成Scala火花在地图中创建行

他们有下面的代码:

df = context.load("/path/to/people.json") 
# RDD-style methods such as map, flatMap are available on DataFrames 
# Split the bio text into multiple words. 
words = df.select("bio").flatMap(lambda row: row.bio.split(" ")) 
# Create a new DataFrame to count the number of words 
words_df = words.map(lambda w: Row(word=w, cnt=1)).toDF() 
word_counts = words_df.groupBy("word").sum() 

于是,我第一次看到从csv数据到一个数据帧df后来才知​​道有:

val title_words = df.select("title").flatMap { row =>  
    row.getAs[String("title").split(" ") } 
val title_words_df = title_words.map(w => Row(w,1)).toDF() 
val word_counts = title_words_df.groupBy("word").sum() 

,但我不知道:

  1. 如何将字段名称分配到行中的行开头与VAL title_words_df nning = ...

  2. 我有错误 “的值toDF不是org.apache.spark.rdd.RDD [org.apache.spark.sql.Row]成员”

在此先感谢您的帮助。

回答

2

如何将字段名分配到行

的Python 是完全不同类型的对象比其对应的Scala的。它是一个增加了名称的元组,使它与无类型集合(o.a.s.sql.Row)相比更加等效于产品类型。

我有错误 “的值toDF不是org.apache.spark.rdd.RDD [org.apache.spark.sql.Row]成员”

由于o.a.s.sql.Row是基本上是无类型的,它不能与toDF一起使用,并且需要createDataFrame具有明确的模式。

import org.apache.spark.sql.types._ 

val schema = StructType(Seq(
    StructField("word", StringType), StructField("cnt", LongType) 
)) 

sqlContext.createDataFrame(title_words.map(w => Row(w, 1L)), schema) 

如果你想你的代码相当于你应该使用的产品类型,而不是的Python版本。这意味着无论是Tuple

title_words.map((_, 1L)).toDF("word", "cnt") 

或案例类:

case class Record(word: String, cnt: Long) 

title_words.map(Record(_, 1L)).toDF 

但在实践中,应该不需要使用RDDS:

import org.apache.spark.sql.functions.{explode, lit, split} 

df.select(explode(split($"title", " ")), lit(1L))