2016-11-28 40 views
1

我正在使用Zeppelin 0.6.2和Spark 2.0。如何使用更新值更新给定另一个DataFrame的DataFrame?

我想在循环内执行一个查询,它不是很有效。

我需要为数据帧的每一行循环大约5000行并执行一个查询,它将在另一个数据帧中增加一个值。

这里是我的尝试吧:

val t2 = time 
t2.registerTempTable("t2") 
u.collect().foreach{ r => 
println(r(0)) 
val c=r(1) 
val start="\""+r(2)+"\"" 
val end="\""+r(3)+"\"" 
sql("INSERT INTO TABLE t2 SELECT time, recordings + "+c+" AS recordings FROM time WHERE time >= " + start + " AND time < " + end) 
} 

我想取两个dataframes的一小部分,但它仍然很慢。我觉得我没有这样做。

任何想法如何快速更新数据帧?

回答

1

我需要为数据帧的每一行循环约5000行并执行一个查询,它将在另一个数据帧中增加一个值。

我可以看到u,timet2表。 t2是别名time因此您可以稍后在INSERT查询中使用它。对?

PROTIP:我很高兴有他们的模式。

让我们假设你有5000行数据帧称为df5k

// it's a fake 5k = a mere 5 rows for the sake of simplicity 
// I think `u` is your 5k table (that you unnecessarily `collect` to `foreach`) 
val u = Seq(
    (0, 0, 0, 3), 
    (1, 3, 4, 5), 
    (2, 6, 6, 8), 
    (3, 9, 9, 17)).toDF("id", "c", "start", "end") 

// I think `t2` is an alias for `time` and you want to update `t2` 
val time = Seq(
    (1, 10), 
    (4, 40), 
    (9, 90)).toDF("time", "recordings") 

// this is the calculation of the new records 
val new_t2 = u.join(time) 
    .where('time >= 'start) 
    .where('time < 'end) 
    .withColumn("recordings + c", 'recordings + 'c) 
    .select('time, $"recordings + c" as 'recordings) 

// the following is an equivalent of INSERT INTO using Dataset API 
val solution = time.union(new_t2) 

注意:你没有更新的数据帧,但建立新的价值观的新数据帧。

+0

非常感谢!我甚至不确定我的问题对其他人有意义。我使用你的输入修改了我的代码,并且我收到了一条消息,说我应该添加spark.conf.set(“spark.sql.crossJoin.enabled”,true)来做到这一点,并在那一刻我意识到,交叉连接是解决方案I需要而不是一个foreach行。 – ieaiaio

+0

有趣。我每天都在使用2.1.0-SNAPSHOT,所以我们确实在使用不同版本的Spark,但我不知道在2.x版本之间可能会有如此重要的变化。你的Spark版本究竟是什么?使用'spark.version'来找出它。你绝对应该坚持Spark SQL计算“事物”的方式。如果它解决了您的使用案例,请尽早接受我的答案。谢谢! –

+0

顺便说一句,能否请您将最新版本的代码添加到问题中(以便其他人也可以从您的更改中受益)?我和其他SOers会非常感激。我也可以帮助你使用'spark.conf.set(“spark.sql.crossJoin.enabled”,true)''。谢谢! –

相关问题