我正在S3中构建数据湖。因此,我想将原始数据流存储到s3中,下面是我的代码片断,我尝试了本地存储。将数据流传输到S3
val tweets = TwitterUtils.createStream(ssc, None)
val engtweets = tweets.filter(status => status.getLang() == "en").map(x => x.getText())
import sql.implicits._
engtweets.foreachRDD { rdd =>
val df = rdd.toDF()
df.write.format("json").save("../Ramesh")
}
我想在s3中存储原始数据(整个JSON对象)。
谢谢你的回复。我能够在本地存储数据。由于数据流式传输,我想将数据存储在firehose中并将其推送到S3。因此,我用Java编写了一个存储到kinesis firehose中的方法,并且工作正常。但是,我无法从Dstreams获取字符串。 val tweets = TwitterUtils.createStream(ssc,None) val engtweets = tweets.filter(status => status.getLang()==“en”) val statuses = engtweets.map(engtweets => engtweets.toString() )statuses.map(record => record.getBytes()。toString())。print() –