2017-02-10 35 views
1

我正在尝试使用三个表的连接在SPARK SQL中编写查询。但是查询输出为空。它对单个表格工作正常。我的Join查询是正确的,因为我已经在oracle数据库中执行了它。我需要在这里修改什么修正?星火版本是2.0.0在Spark中加入超过2个表SQL

from pyspark.sql import SQLContext, Row 
sqlContext = SQLContext(sc) 

lines = sc.textFile("/Users/Hadoop_IPFile/purchase") 
lines2 = sc.textFile("/Users/Hadoop_IPFile/customer") 
lines3 = sc.textFile("/Users/Hadoop_IPFile/book") 

parts = lines.map(lambda l: l.split("\t")) 
purchase = parts.map(lambda p: Row(year=p[0],cid=p[1],isbn=p[2],seller=p[3],price=int(p[4]))) 
schemapurchase = sqlContext.createDataFrame(purchase) 
schemapurchase.registerTempTable("purchase") 


parts2 = lines.map(lambda l: l.split("\t")) 
customer = parts2.map(lambda p: Row(cid=p[0],name=p[1],age=p[2],city=p[3],sex=p[4])) 
schemacustomer = sqlContext.createDataFrame(customer) 
schemacustomer.registerTempTable("customer") 

parts3 = lines.map(lambda l: l.split("\t")) 
book = parts3.map(lambda p: Row(isbn=p[0],name=p[1])) 
schemabook = sqlContext.createDataFrame(book) 
schemabook.registerTempTable("book") 

result_purchase = sqlContext.sql("""SELECT DISTINCT customer.name AS name FROM purchase JOIN book ON purchase.isbn = book.isbn JOIN customer ON customer.cid = purchase.cid WHERE customer.name != 'Harry Smith' AND purchase.isbn IN (SELECT purchase.isbn FROM customer JOIN purchase ON customer.cid = purchase.cid WHERE customer.name = 'Harry Smith')""") 

result = result_purchase.rdd.map(lambda p: "name: " + p.name).collect() 
for name in result: 
    print(name) 


DataSet 
--------- 
Purchase 
1999 C1 B1 Amazon 90 
2001 C1 B2 Amazon 20 
2008 C2 B2 Barnes Noble 30 
2008 C3 B3 Amazon 28 
2009 C2 B1 Borders 90 
2010 C4 B3 Barnes Noble 26 


Customer 
C1 Jackie Chan 50 Dayton M 
C2 Harry Smith 30 Beavercreek M 
C3 Ellen Smith 28 Beavercreek F 
C4 John Chan 20 Dayton M 

Book 
B1 Novel 
B2 Drama 
B3 Poem 

我发现下面一些网页的指令,但它仍然没有工作:schemapurchase.join(schemabook,schemapurchase.isbn == schemabook.isbn)schemapurchase.join(schemacustomer,schemapurchase .cid == schemacustomer.cid)

+0

你想要的输出是什么? – pheeleeppoo

+1

“成龙”是我正在寻找的输出。 – SPram

回答

2

鉴于此输入DataFrames就像你的例子(对不起,如果一些列名是错的,我猜他们):

购买:

+----+---+----+------------+-----+ 
|year|cid|isbn|  shop|price| 
+----+---+----+------------+-----+ 
|1999| C1| B1|  Amazon| 90| 
|2001| C1| B2|  Amazon| 20| 
|2008| C2| B2|Barnes Noble| 30| 
|2008| C3| B3|  Amazon| 28| 
|2009| C2| B1|  Borders| 90| 
|2010| C4| B3|Barnes Noble| 26| 
+----+---+----+------------+-----+ 

客户:

+---+-----------+---+-----------+-----+ 
|cid|  name|age|  city|genre| 
+---+-----------+---+-----------+-----+ 
| C1|Jackie Chan| 50|  Dayton| M| 
| C2|Harry Smith| 30|Beavercreek| M| 
| C3|Ellen Smith| 28|Beavercreek| F| 
| C4| John Chan| 20|  Dayton| M| 
+---+-----------+---+-----------+-----+ 

书:

+----+-----+ 
|isbn|genre| 
+----+-----+ 
| B1|Novel| 
| B2|Drama| 
| B3| Poem| 
+----+-----+ 

可以使用数据框的功能转换该SQL查询,像如下:

val result = purchase.join(book, purchase("isbn")===book("isbn")) 
        .join(customer, customer("cid")===purchase("cid")) 
        .where(customer("name") !== "Harry Smith") 
        .join(temp, purchase("isbn")===temp("purchase_isbn")) 
        .select(customer("name").as("NAME")).distinct() 

其中“TEMP”是结果“SELECT IN”,这可以认为是另一个连接的结果:

val temp = customer.join(purchase, customer("cid")===purchase("cid")) 
        .where(customer("name")==="Harry Smith") 
        .select(purchase("isbn").as("purchase_isbn"))  


+-------------+ 
|purchase_isbn| 
+-------------+ 
|   B2| 
|   B1| 
+-------------+ 

所以最后的结果是:

+-----------+ 
|  NAME| 
+-----------+ 
|Jackie Chan| 
+-----------+ 

考虑这个答案好像你就可以开始从思考一个点(太多连接可能对性能不利影响,例如)。

+0

感谢您的解决方案。 – SPram