0

我使用SparkSQL 2.2.0从Cassandra加载数据并将其索引到Elasticsearch。我拥有的数据由客户(第一张表people)和订单(第二张表orders)组成。
表格订单具有指向相应客户的列person_id
我的需求是查询(并在Elasticsearch稍后索引)people表和orders,因此我可以为每个客户购买她购买的订单数量。
我想出的最简单的方法是在person_id列中将两个表读入org.apache.spark.sql.Dataset<Row> s并在加入。然后我groupBy(person_id)
这给了我一个数据集有两列:person_idcount,我不得不加入people表,所以我可以计数与其他人的数据。SparkSQL加入父/子数据集

Dataset<Row> peopleWithOrders = people.join(orders, people.col("id").equalTo(orders.col("person_id")), "left_outer"); 

Dataset<Row> peopleOrdersCounts = peopleWithOrders.groupBy("id").count().withColumnRenamed("id", "personId"); 

Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer") 
      .withColumnRenamed("count", "nbrOfOrders") 
      .select("id", "name", "birthDate", "nbrOfOrders"); 

people表具有1_000_000行和orders一个2_500_000。每个客户有2或3个订单。
我正在使用MAC Book专业版,配备2,2 GHz Intel Core i7处理器和16 GB 1600 MHz DDR3内存。所有Cassandra,Spark 2.2 master和(single)worker都在同一台机器上。
这3个连接需要15到20秒。
我的问题是:是否有性能提升的空间。做窗口聚合函数有益处,因为我在日志中看到ShuffleMapTask。

在此先感谢

回答

0

我认为第一步是不必要的。你可以这样做:

Dataset<Row> peopleOrdersCounts = orders.groupBy("person_id").count(); 

Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer") 
      .withColumnRenamed("count", "nbrOfOrders") 
      .select("id", "name", "birthDate", "nbrOfOrders"); 

我希望这会有所帮助。

+0

是的,的确如此。我的错。但这仍然“相对缓慢”(ab 16s)。我想知道“窗口聚合函数”是否会有帮助,或者这是正常的方法 –

+0

据我所知,这是实现它的方法。特别是在“group by”的情况下。您可以查看用户定义的聚合函数(UDAF),但即使这些函数也适用于特定情况。有没有其他的操作可能会减慢速度? – Nikhil