2016-04-26 30 views
0

我使用spark编写了从mysql到cassandra的数据迁移代码。我试图概括它,让给定一个conf文件它可以迁移任何表。这里即时通讯停留在2个地方:如何在spark-scala中使用select()和map()?

  1. val dataframe2 = dataframe.select("a","b","c","d","e","f") 从MySQL装入表后,我只想选择几个栏,我有这些列的列表的名称。这怎么用呢?
  2. val RDDtuple = dataframe2.map(r => (r.getAs(0), r.getAs(1), r.getAs(2), r.getAs(3), r.getAs(4), r.getAs(5))) 在这里,每个表格可能有不同数量的列,所以如何实现这一点?
+0

我不知道我在代码中了解问题 – eliasah

+0

我已经编写了im在select()中显式指定列名称(我不想要),如果im错误会纠正我,但select()会不要将数组或列表作为参数。另外,如果列数不固定,map()如何改变? –

+0

当然是的,select可以把一个重复的参数作为参数。至于你的第二个查询,你可以将你的行转换成一个带有隐式函数的字符串,然后在这个字符串上分割以创建一个数组,如果你想将它转换成一个元组,那么你需要在后面处理它。 – eliasah

回答

0
  1. select()使用变量数列,你列的名单可以转换这样的:

val columns = List("a", "b", "c", "d") val dfSelectedCols = dataFrame.select(columns.head, columns.tail :_*)

说明:DataFrame's select(String, String...)的第一个参数是必须的,所以在使用columns.head。列表的其余部分需要使用columns.tail :_*转换为可变参数。

  • 这不是从你的例子很清楚,但我想这xRDD[Row]和你正试图转换成TupleRDD S,对吧?请提供更多细节,并使用有意义的变量名称。 x,yz是不好的选择,特别是如果没有明确的打字。
  • +0

    Thanx很多,这个工程。就(2)而言,x是'数据框',我希望将它转换为元组的'RDD'。对不起,名字,我会记住下一次 –