2

我已经开始从Spark引擎学习Spark流,并且很新的数据分析和火花。我只是想创建一个小IOT应用程序,我想要预测未来的数据。Java Spark Streaming JSON解析

我有TIVA硬件,它发送实时传感器JSON数据如下,

[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}] 

在此T是其中数据被张贴UNIX时间戳。 传感器是一组传感器,每个传感器('s')的数据都是'd'。

我想要做的就是消费这些数据并创建对象,然后使用Spark的Mlib(机器学习)或等效库来预测未来数据。

我希望有一个总体思路,这是否将有可能与所有的技术选择

  1. 我已经决定使用?
  2. 如何使用嵌套的JSON?我尝试使用SQLContext但没有成功。
  3. 一般准则,以实现我在这里要做的。

这是我用来使用来自KAFKA的消息的代码。 PS:我想在Java中这样做,以保持线性和良好的性能。

+0

你能后的代码,你尝试过什么到目前为止?它可以使用Spark SQL和Streaming。 – Shankar

+0

发布代码有问题。 –

+0

当您尝试'sqlContext'来读取json字符串时,您遇到了什么问题?该任务不是可序列化的问题? – Shankar

回答

2

回答您的问题:

1)这是否将有可能与所有的技术选择,我决定使用?

`Ans: Yes it can be done and quiet a normal use-case for spark.` 

2)我该如何使用嵌套的JSON?我尝试使用SQLContext但没有成功。

`Ans: Nested JSON with SQLContext is little tricky. You may want to use Jackson or some other JSON library.` 

3)一般准则,以实现我在这里要做的。

Ans: Consuming messages through kafka seems fine, but only a limited machine learning algorithms are supported through streaming.

如果你想使用其他机器学习算法或第三方库,也许你应该考虑的模型创建作为批处理作业emmiting出在最后的模型。流式作业应该加载模型并获取数据流并仅预测。

+0

你能指导我正确使用这种用例吗?这将是非常有帮助 –

4

由于您使用SPARK 2.0,从SparkSession,就可以读取JSON

json.foreachRDD(rdd -> { 

     DataFrame df= spark.read.json(rdd) 
     //process json with this DF. 
} 

也可以将RDD转换成排的RDD,那么你可以使用createDataFrame方法。

json.foreachRDD(rdd -> { 

      DataFrame df= spark.createDataFrame(rdd); 
      //process json with this DF. 
    } 

嵌套JSON处理可能从DF,您可以按照this文章。

而且,一旦您将您的JSON来DF,您可以在任何火花模块使用它(像火花SQL,ML)

+0

在我的情况下,我尝试使用的SQLContext构造函数已被弃用。我没有得到如何使用'JavaSparkContext'获得'sc'(SparkContext) –

+0

@RahulBorkar:您可以将'JavaSparkContext'传递给SQLContext(javasparkContext) – Shankar

+0

它在Spark 2.11中不赞成使用。另外,在尝试代码时,我得到“方法转换(函数,JavaRDD >)对于JavaDStream类型不明确” –