2016-11-25 70 views
0

我有一个RDD(test_rdd),如下Pyspark保存RDD卡桑德拉

[ { 'user_lname':u'TEst1' , 'USER_ID':u'2aa8ae30-c0e5-48bb-AB16-a2ed2e78c8c3' , 'user_phone':u'1234567890','user_fname':u'TestingTesting2','amount':1222,'event_timestamp':u'2016-09-29T07:49:50.866 + 00:00'},

{'user_lname':u'TEst2','user_id':u'2aa8ae30-c0e5-48bb-ac16-a2ed2e78c8c3','user_phone':u'1234567891','user_fname':u'TestingTesting','amount':12 ,'event_timestamp':u'2016-10-27T07:49:50.866 + 00:00'},

{'user_lname':u'test3','u ser_id':u'2aa8ae30-c1e5-48bb-ab16-a2ed2e78c8c3','user_phone':u'1234567892','user_fname':u'TestingTesting3','amount':122,'event_timestamp':u'2016-09- 27T07:49:50.866 + 00:00'} ]

我想将上面的RDD保存到cassandra表中。
我得到下面的错误,当我使用
test_rdd.saveToCassandra( “keyspace1”, “表1”)

回溯(最近通话最后一个):
文件“/var/spark/test/k.py ”,线路179,在
parsed_data.saveToCassandra( “keyspace1”, “表1”)
AttributeError的: 'PipelinedRDD' 对象没有属性 'saveToCassandra'

回答

0

要么

  • 遵循官方spark-cassandra-connector
  • 指令转换为DataFrametoDF
  • Dataframe

    df.write.format("org.apache.spark.sql.cassandra").options(
        table=table, keyspace=keyspace 
    ).save() 
    
+1

谢谢。我使用第二种方法。 pyspark-cassandra和spark-cassandra-connector之间的区别是什么。 –