我有火花流在Pyspark与“批量间隔” = 30秒火花流
ssc = StreamingContext(sc, 30)
然后我想用窗口()函数用于获取数据的最后一小时,并把每个30秒横跨这个数据。
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
counts = kvs.map(lambda (k, v): json.loads(v))\
.map(TransformInData).window(108000)
,我已经得到了一个错误
16/02/18 10:23:01 INFO JobScheduler: Added jobs for time 1455790980000 ms
16/02/18 10:23:30 INFO PythonTransformedDStream: Slicing from 1455683040000 ms to 1455791010000 ms (aligned to 1455683040000 ms and 1455791010000 ms)
16/02/18 10:23:30 INFO PythonTransformedDStream: Time 1455790650000 ms is invalid as zeroTime is 1455790650000 ms and slideDuration is 30000 ms and difference is 0 ms
16/02/18 10:23:31 INFO JobScheduler: Added jobs for time 1455791010000 ms
我已阅读本https://groups.google.com/forum/#!topic/spark-users/GQoxJHAAtX4 ,但我不明白为什么它不工作
星火哪个版本是您使用添加
offset
? – sgvd1.6.0版本spark –
同样问题:我认为它是抛出“时间无效..”作为'timeTime-ZeroTIme'(差异)isTimeValid()'方法是Dstream类的0,这意味着'时间=零时间'因此'time <= zeroTime'返回true,导致'isTimeValid'方法返回false返回上面的消息。 –