2016-11-28 64 views
3

我在Spark中有一个ETL工作,它也连接到MySQL以获取一些数据。从历史上看,我已经做了如下:Spark ETL作业只执行一次mysql

hiveContext.read().jdbc(
    dbProperties.getProperty("myDbInfo"), 
    "(SELECT id, name FROM users) r", 
    new Properties()).registerTempTable("tmp_users"); 

Row[] res = hiveContext.sql("SELECT " 
    + " u.name, " 
    + " SUM(s.revenue) AS revenue " 
    + "FROM " 
    + " stats s " 
    + " INNER JOIN tmp_users u " 
    + "  ON u.id = s.user_id 
    + "GROUP BY " 
    + " u.name " 
    + "ORDER BY " 
    + " revenue DESC 
    + "LIMIT 10").collect(); 

String ids = ""; 
// now grab me some info for users that are in tmp_user_stats 
for (i = 0; i < res.length; i++) { 
    s += (!s.equals("") ? "," : "") + res[i](0); 
} 

hiveContext.jdbc(
dbProperties.getProperty("myDbInfo"), 
"(SELECT name, surname, home_address FROM users WHERE id IN ("+ids+")) r", 
new Properties()).registerTempTable("tmp_users_prises"); 

然而,这个比例到多个工作节点,每当我用tmp_users表时,它运行的查询,并使它运行(至少)一次,每个节点,这归结于我们的db管理员用刀在办公室跑来跑去。

处理这个问题的最佳方法是什么?我可以像3台机器一样运行作业,将其限制为3个查询,然后将数据写入Hadoop以供其他节点使用它或什么?

本质上 - 正如评论中所建议的那样 - 我可以在ETL作业之外运行一个查询,它可以从MySQL端准备数据并将其导入Hadoop。但是,可能会有后续查询,建议通过Spark和JDBC连接设置更多的解决方案。

我会接受Sqoop解决方案,因为它至少提供了一个更简化的解决方案,尽管我仍然不确定它会完成这项工作。如果我找到了一些东西,我会再次编辑这个问题。

回答

1

您可以缓存数据:

val initialDF = hiveContext.read().jdbc(
    dbProperties.getProperty("myDbInfo"), 
    "(SELECT id, name FROM users) r", 
    new Properties()) 
initialDF.cache(); 
initialDF.registerTempTable("tmp_users"); 

第一次读取后,数据将被缓存在内存

替代(即不伤DBA;))是使用Sqoop与参数--num-mappers=3然后将结果文件导入到Spark