2017-10-05 86 views
-1

我有3个数据帧:星火据帧更新值

1. Item dataframe: 

+-------+---------+ 
|id_item|item_code| 
+-------+---------+ 
| 991| A0049| 
| 992| C1248| 
| 993| C0860| 
| 994| C0757| 
| 995| C0682| 
+-------+---------+ 

2. User dataframe: 

+------+--------+ 
|id_usn|  usn| 
+------+--------+ 
|417567|39063291| 
|417568|39063294| 
|417569|39063334| 
|417570|39063353| 
|417571|39063376| 
+------+--------+ 

3. Summary dataframe 

+-------+--------------------+ 
|id_item|  summary  | 
+-------+--------------------+ 
| 991|[[417567,0.579901...| 
| 992|[[417567,0.001029...| 
| 443|[[417585,0.219624...| 
+-------+--------------------+ 

and schema of this dataFrame: 

root 
|-- id_item: integer (nullable = true) 
|-- summary: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- id_usn: long (nullable = true) 
| | |-- rating: double (nullable = true) 

现在,在StructType id_usn,我想更换id_usn总结DataFrame由usn in User DataFrame,

我正在使用Spark!

请帮我解决这个问题!

+1

**是否尝试使用'join'然后选择'select'?我担心这个问题太简单了,无法证明它的正确性,并试图找到一些值得拥有的东西。 –

+0

我可以试试,但我的问题是替换id_usn,因为它在struct –

+0

@JacekLaskowski:你对这个问题有什么想法吗? –

回答

1

希望它有帮助。

from pyspark.sql import functions as F 

sdf1 = summarydf.select('id_item','summary',F.explode('summary').alias('col_summary')).select('*',F.col('col_summary').id_usn.alias('id_usn'),F.col('col_summary').rating.alias('rating')).drop('col_summary') 
df = sdf1.join(itemdf,'id_item').join(userdf,'id_usn').select('item_code',F.struct('usn','rating').alias('tmpcol')).groupby('item_code').agg(F.collect_list('tmpcol').alias('summary')) 
+---------+--------------------+ 
|item_code|    summary| 
+---------+--------------------+ 
| C1248|[[39063291,0.0010...| 
| A0049|[[39063291,0.5799...| 
+---------+--------------------+