2016-09-01 34 views
0

我正在使用spark和scala进行项目工作,我对两者都很陌生,但有很多来自stackoverflow的帮助我已完成所有数据处理并将处理后的数据存储在mysql中。现在我终于面临一个问题,我不明白如何解决它。第一次当我处理数据时,我使用这种方法存储数据帧,第一次表是空的。使用Scala更新Spark数据库中的数据

 df.write.mode("append").jdbc("dburl", "tablename", "dbproperties"); 

假设我处理的数据在数据库中看起来像这样。

 id  name  eid  number_of_visitis last_visit_date 
     1  John  C110  12     2016-01-13 00:00:00 
     2  Root  C111  24     2016-04-27 00:00:00 
     3  Michel  C112  8     2016-07-123 00:00:00 
     4  Jonny  C113  45     2016-06-10 00:00:00 

现在处理这个新的数据后,命名为“根”与EID“C111”访问办公室2倍“二零一六年八月三十日00:00:00”的人,现在我只需要更新的这个人纪录数据库。我将如何做到这一点。现在更新的表应该看起来像这样。

 id  name  eid  number_of_visitis last_visit_date 
     1  John  C110  12     2016-01-13 00:00:00 
     2  Root  C111  26     2016-08-30 00:00:00 
     3  Michel  C112  8     2016-07-123 00:00:00 
     4  Jonny  C113  45     2016-06-10 00:00:00 

我有数据万人在此表中,如果我加载全表火花数据帧和更新所需的记录,然后将需要更多的时间并且也没有意义,因为为什么我加载全表当我想更新只有一行。我试过这个代码,但它将新行添加到表,而不是更新行。

 df.write.mode("append").jdbc("dburl", "tablename", "dbproperties"); 

有什么办法可以做到火花?

我在互联网上看到过这个,我可以这样做来更新。

val numParallelInserts = 10 
val batchSize = 1000 

new CoalescedRDD(sessions, numParallelInserts) mapPartitionsWithSplit { (split, iter) => Iterator((split, iter)) } foreach { case (split, iter) => 
    val db = connect() 

    val sql = "INSERT INTO sessions (id, ts) VALUES (?, ?)" 
    val stmt = db.prepareStatement(sql) 

    iter.grouped(batchSize).zipWithIndex foreach { case (batch, batchIndex) => 
    batch foreach { session => 
     stmt.setString(1, session.id) 
     stmt.setString(2, TimestampFormat.print(session.ts)) 
     stmt.addBatch() 
    } 
    stmt.executeBatch() 
    db.commit(); 
    logInfo("Split " + (split+1) + "/" + numParallelInserts + " inserted batch " + batchIndex + " with " + batch.size + " elements") 
    } 

    db.close(); 
+0

你用 “覆盖” 模式试过吗? – dsr301

+1

覆盖重新创建具有不确切数据类型的表,并删除所有较旧的数据,并只插入新处理的数据。 –

回答

0

你可以尝试使用sql来做到这一点。将更新的(甚至是新的)数据存储在新的临时表中,然后将临时表合并到主表中。要做到这一点

的一种方式是 -

  1. 更新所有主表使用临时表

    update main_table set visits = main_table.visits + temp_table.visits from temp_table where main_table.eid = temp_table.eid;

  2. 删除临时表中的所有重复记录(记录叶色只有临时表中的新记录)

    delete from temp_table where main_table.eid = temp_table.eid;

  3. 插入从临时表中的所有记录到主表

    insert into main_table select * from temp_table;

  4. 删除临时表

    drop table temp_table;

+0

这是在db级别,并有更多的步骤,我想以最短的方式做到这一点。我有数以百万计的数据,因此复制,删除和插入需要时间。我更新了这个问题,请看看你能否理解我想要做的事情。 –

+0

我在红字数据库上用了几十亿行。 – Kakaji