0

我是新来的整个卡夫卡/火花的东西。我有Spark Streaming(PySpark)从卡夫卡制作人处获取数据。它运行良好一分钟,然后总是抛出一个kafka.common.OffsetOutOfRangeException。卡夫卡消费者版本是0.8(显然,PySpark不支持0.10)。我在AWS Ubuntu 14.04上有一个拥有3名工作人员的主节点。我不知道这是否相关,但这里的卡夫卡日志相对较大(〜1-10kb),因此我调整了生产者/经纪人/消费者配置。数据正在通过罚款,虽然可能比我认为生产者可能产生的速度慢(这可能是问题的根源?)。卡夫卡+火花流:kafka.common.OffsetOutOfRangeException

类似的问题是由这里增加的滞留时间/尺寸解决:Kafka OffsetOutOfRangeException

但我的保留时间是一个小时,大小为1GB的每个节点的server.properties,更重要的是,有一个在星火的时间没有变化失败和设置的保留时间/大小。

是否有任何其他可能的调整,也许在Spark Streaming配置?我在网上看到的所有答案都与Kafka配置有关,但对我的情况来说似乎没有什么不同。

编辑1:我试过a)有多个流从生产者读取和b)减慢生产者流本身与time.sleep(1.0)。没有持久的效果。

n_secs = 1 
ssc = StreamingContext(sc, n_secs) 
kds = [KafkaUtils.createDirectStream(ssc, ['test-video'], { 
        'bootstrap.servers':'localhost:9092', 
        'group.id':'test-video-group', 
        'fetch.message.max.bytes':'15728640', 
        'auto.offset.reset':'largest'}) for _ in range(n_streams)] 

stream = ssc.union(*kds) 
+0

看起来你正在使用0.8的新消费者吗?我猜测这是由引导服务器而不是zk连接。你如何做抵消? – dawsaw

+0

@dawsaw它自动提交,但在周末我认为我已经确定这是Spark Streaming中的背压问题。 – thefourtheye

回答

0

有没有可能是你的生产产生了太多的信息太快,这样1G是不够的每个经纪人? 1G在所有现实中似乎都很低。在Spark Streaming决定了需要在微批处理中处理的偏移量范围并尝试根据偏移量从代理中检索消息后,消息由于大小限制而消失。请增加经纪人的大小,使其像100G一样大,看看是否能解决你的问题。

+0

我同意(经过一些进一步的测试),它几乎可以肯定是Spark Streaming无法跟上的生成器生成的消息。但我想我需要改变我正在制作的和/或Spark配置,而不是更改Kafka配置。 – thefourtheye

相关问题