2017-10-08 67 views
0

我正在S3中构建数据湖。因此,我想将原始数据流存储到s3中,下面是我的代码片断,我尝试了本地存储。将数据流传输到S3

val tweets = TwitterUtils.createStream(ssc, None) 
val engtweets = tweets.filter(status => status.getLang() == "en").map(x => x.getText()) 
    import sql.implicits._ 
engtweets.foreachRDD { rdd => 
    val df = rdd.toDF() 
    df.write.format("json").save("../Ramesh") 
} 

我想在s3中存储原始数据(整个JSON对象)。

回答

0

您可以简单地使用saveAsTextFile方法与路径前缀为

s3a://<file path> 

需要,您的Amazon S3是建立正确有或没有凭据。

https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_s3.html

+0

谢谢你的回复。我能够在本地存储数据。由于数据流式传输,我想将数据存储在firehose中并将其推送到S3。因此,我用Java编写了一个存储到kinesis firehose中的方法,并且工作正常。但是,我无法从Dstreams获取字符串。 val tweets = TwitterUtils.createStream(ssc,None) val engtweets = tweets.filter(status => status.getLang()==“en”) val statuses = engtweets.map(engtweets => engtweets.toString() )statuses.map(record => record.getBytes()。toString())。print() –

1

就设置在核心site.xml中的访问密钥和秘密密钥如下:

<property> 
    <name>fs.s3a.access.key</name> 
    <value>...</value> 
</property> 
<property> 
    <name>fs.s3a.secret.key</name> 
    <value>...</value> 
</property> 

一旦你做到了这一点,你应该能够使用S3协议写入到S3像:s3a:///

希望这有助于!