2016-10-07 137 views
0

我有一个需求,即需要从多个源系统(Mysql实例)每隔5分钟获取一次数据,并使用其他一些数据(S3中可以说)加入和丰富它们。Spark中的Mysql数据处理

我想在Spark中进行这个处理来将我的执行分配给多个执行者。

主要的问题是每次我在Mysql中进行查找时,我只想获取最新记录(可以用lastModifiedOn> timestamp来说)。 如何有效地选择性地获取MySql行? 这是我曾尝试:

val filmDf = sqlContext.read.format("jdbc") 
    .option("url", "jdbc:mysql://localhost/sakila") 
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "film").option("user", "root").option("password", "") 
    .load() 
+0

你可以更新你已经尝试了什么你的问题? – eliasah

+0

@eliasah是的将更新帖子。 – Karshit

回答

0

您应该使用SQL火花与JDBC数据源。我向你展示一个例子。

val res = spark.read.jdbc(
     url = "jdbc:mysql://localhost/test?user=minty&password=greatsqldb", 
     table = "TEST.table", 
     columnName = "lastModifiedOn", 
     lowerBound = lowerTimestamp, 
     upperBound = upperTimestamp, 
     numPartitions = 20, 
     connectionProperties = new Properties() 
    ) 

有Apache中的星火测试套件例子:https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

+0

谢谢,你能告诉我numPartition在这里意味着什么吗? 和connectionProperties以及? – Karshit

+0

结果将是一个带有numPartitions分区的DataFrame。 Spark将并行执行numPartitions查询以检索结果。示例: lowerBound = 1,upperBound = 10,numPartitions = 2,Spark将执行两个查询,第一个位于1和5之间,第二个位于6和10之间。 – gasparms

+0

connectionProperties是一个将某些属性传递给db的映射。取决于你的数据库你可以使用它或不。 – gasparms