我是新来的整个卡夫卡/火花的东西。我有Spark Streaming(PySpark)从卡夫卡制作人处获取数据。它运行良好一分钟,然后总是抛出一个kafka.common.OffsetOutOfRangeException
。卡夫卡消费者版本是0.8(显然,PySpark不支持0.10)。我在AWS Ubuntu 14.04上有一个拥有3名工作人员的主节点。我不知道这是否相关,但这里的卡夫卡日志相对较大(〜1-10kb),因此我调整了生产者/经纪人/消费者配置。数据正在通过罚款,虽然可能比我认为生产者可能产生的速度慢(这可能是问题的根源?)。卡夫卡+火花流:kafka.common.OffsetOutOfRangeException
类似的问题是由这里增加的滞留时间/尺寸解决:Kafka OffsetOutOfRangeException
但我的保留时间是一个小时,大小为1GB的每个节点的server.properties
,更重要的是,有一个在星火的时间没有变化失败和设置的保留时间/大小。
是否有任何其他可能的调整,也许在Spark Streaming配置?我在网上看到的所有答案都与Kafka配置有关,但对我的情况来说似乎没有什么不同。
编辑1:我试过a)有多个流从生产者读取和b)减慢生产者流本身与time.sleep(1.0)
。没有持久的效果。
即
n_secs = 1
ssc = StreamingContext(sc, n_secs)
kds = [KafkaUtils.createDirectStream(ssc, ['test-video'], {
'bootstrap.servers':'localhost:9092',
'group.id':'test-video-group',
'fetch.message.max.bytes':'15728640',
'auto.offset.reset':'largest'}) for _ in range(n_streams)]
stream = ssc.union(*kds)
看起来你正在使用0.8的新消费者吗?我猜测这是由引导服务器而不是zk连接。你如何做抵消? – dawsaw
@dawsaw它自动提交,但在周末我认为我已经确定这是Spark Streaming中的背压问题。 – thefourtheye