我正在使用spark和scala进行项目工作,我对两者都很陌生,但有很多来自stackoverflow的帮助我已完成所有数据处理并将处理后的数据存储在mysql中。现在我终于面临一个问题,我不明白如何解决它。第一次当我处理数据时,我使用这种方法存储数据帧,第一次表是空的。使用Scala更新Spark数据库中的数据
df.write.mode("append").jdbc("dburl", "tablename", "dbproperties");
假设我处理的数据在数据库中看起来像这样。
id name eid number_of_visitis last_visit_date
1 John C110 12 2016-01-13 00:00:00
2 Root C111 24 2016-04-27 00:00:00
3 Michel C112 8 2016-07-123 00:00:00
4 Jonny C113 45 2016-06-10 00:00:00
现在处理这个新的数据后,命名为“根”与EID“C111”访问办公室2倍“二零一六年八月三十日00:00:00”的人,现在我只需要更新的这个人纪录数据库。我将如何做到这一点。现在更新的表应该看起来像这样。
id name eid number_of_visitis last_visit_date
1 John C110 12 2016-01-13 00:00:00
2 Root C111 26 2016-08-30 00:00:00
3 Michel C112 8 2016-07-123 00:00:00
4 Jonny C113 45 2016-06-10 00:00:00
我有数据万人在此表中,如果我加载全表火花数据帧和更新所需的记录,然后将需要更多的时间并且也没有意义,因为为什么我加载全表当我想更新只有一行。我试过这个代码,但它将新行添加到表,而不是更新行。
df.write.mode("append").jdbc("dburl", "tablename", "dbproperties");
有什么办法可以做到火花?
我在互联网上看到过这个,我可以这样做来更新。
val numParallelInserts = 10
val batchSize = 1000
new CoalescedRDD(sessions, numParallelInserts) mapPartitionsWithSplit { (split, iter) => Iterator((split, iter)) } foreach { case (split, iter) =>
val db = connect()
val sql = "INSERT INTO sessions (id, ts) VALUES (?, ?)"
val stmt = db.prepareStatement(sql)
iter.grouped(batchSize).zipWithIndex foreach { case (batch, batchIndex) =>
batch foreach { session =>
stmt.setString(1, session.id)
stmt.setString(2, TimestampFormat.print(session.ts))
stmt.addBatch()
}
stmt.executeBatch()
db.commit();
logInfo("Split " + (split+1) + "/" + numParallelInserts + " inserted batch " + batchIndex + " with " + batch.size + " elements")
}
db.close();
你用 “覆盖” 模式试过吗? – dsr301
覆盖重新创建具有不确切数据类型的表,并删除所有较旧的数据,并只插入新处理的数据。 –