我使用spark编写了从mysql到cassandra的数据迁移代码。我试图概括它,让给定一个conf文件它可以迁移任何表。这里即时通讯停留在2个地方:如何在spark-scala中使用select()和map()?
val dataframe2 = dataframe.select("a","b","c","d","e","f")
从MySQL装入表后,我只想选择几个栏,我有这些列的列表的名称。这怎么用呢?val RDDtuple = dataframe2.map(r => (r.getAs(0), r.getAs(1), r.getAs(2), r.getAs(3), r.getAs(4), r.getAs(5)))
在这里,每个表格可能有不同数量的列,所以如何实现这一点?
我不知道我在代码中了解问题 – eliasah
我已经编写了im在select()中显式指定列名称(我不想要),如果im错误会纠正我,但select()会不要将数组或列表作为参数。另外,如果列数不固定,map()如何改变? –
当然是的,select可以把一个重复的参数作为参数。至于你的第二个查询,你可以将你的行转换成一个带有隐式函数的字符串,然后在这个字符串上分割以创建一个数组,如果你想将它转换成一个元组,那么你需要在后面处理它。 – eliasah