2017-06-11 104 views
1

这个问题出现以及踏过我开始寻求帮助,但我还没有找到一个解决办法。事实上,你可能会发现你看到的可能的重复数,但我想我把它们都试过在最后几个小时。据我所知,sqlContext将在这里做的伎俩,但我接受任何有效的答案。我使用的Spark 2.1从RDD管道DF Pyspark

我开始与我从MongoDB的向下拉动的ID列表。 输出示例:

[u'182028', u'161936', u'12333', u'120677'] 
'rated_game_ids_lst type:' <type 'list'> 

我然后继续前进,试图创造,我要变成一个DF的RDD:

user_unrated_games = ugr_rdd.filter(lambda x: x[1] not in rated_game_ids_lst).map(lambda x: (19, x[1], x[2])) 

输出示例:

'user_unrated_games:' [(19, u'174430', 3.4), (19, u'169786', 3.4)] 
'user_unrated_games type:' <class 'pyspark.rdd.PipelinedRDD'> 

和样本我以上使用urg_rdd(第一行):

'ugr_rdd:'[Row(user_id=5, game_id=u'182028', rating=9.15)] 
'ugr_rdd_type:' pyspark.rdd.RDD 

我再试试这个:

df = sqlContext.createDataFrame(user_unrated_games, ['user_id', 'game_id', 'rating']) 

这种方法失败,所以我尝试这样做:

user_unrated_games = ugr_rdd.filter(lambda x: x[1] not in rated_game_ids_lst).map(lambda x: Row(user_id=19, game_id=x[1], rating= x[2])) 

输出示例:

('user_unrated_games type:', <class 'pyspark.rdd.PipelinedRDD'>) 
('user_unrated_games:', [Row(game_id=u'174430', rating=3.4, user_id=19), Row(game_id=u'169786', rating=3.4, user_id=19)]) 

,然后这样的:

df = sqlContext.createDataFrame(user_unrated_games) 

这两个尝试分给这个错误:

IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':" 

从那里,我开始尝试改变类型的“USER_ID”的组合等,试图传递RDD原样,试图在我的管道转换为RDD ......坦率地说我尝试了很多东西,但是上面的两个看起来最接近似乎为其他人工作的东西。

回答

1

的问题是,你需要指定架构包括数据类型调用createDataFrame方法时。像这样的应该做的伎俩:

from pyspark.sql.types import * 

rdd = sc.parallelize([(19, 174430, 3.4), (19, 169786, 3.4)]) 

schema = StructType([ 
    StructField('user_id', IntegerType()), 
    StructField('game_id', IntegerType()), 
    StructField('rating', FloatType()) 
    ]) 

df = spark.createDataFrame(rdd, schema) 

df.show() 

注意:我已经测试了这个使用火花2.1.0。在这种情况下sparkSparkSession对象。

+0

请注意这里重要的一点:要转换的rdd中的变量类型需要与您传递给createDataFrame的模式相匹配 – Jomonsugi