2017-04-06 73 views
3

我有一个使用来自Kafka Streams的KTable的单实例Java应用程序。直到最近,我可以使用KTable检索所有数据,突然间一些消息似乎消失了。那里应该有〜33k消息和唯一的密钥。为什么Kafka KTable缺少条目?

当我想通过密钥检索消息时,我没有收到一些消息。我使用ReadOnlyKeyValueStore检索消息:

final ReadOnlyKeyValueStore<GenericRecord, GenericRecord> store = ((KafkaStreams)streams).store(storeName, QueryableStoreTypes.keyValueStore()); 
store.get(key); 

这些是我设置为KafkaStreams的配置设置。

final Properties config = new Properties(); 
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverId); 
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); 
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); 
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class); 
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); 
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); 

卡夫卡:0.10.2.0-CP1
合流:3.2.0

调查给我带来了一些非常令人担忧的见解。使用REST代理我手动读取分区,发现有些偏移返回错误。

请求: /topics/{topic}/partitions/{partition}/messages?offset={offset}

{ 
    "error_code": 50002, 
    "message": "Kafka error: Fetch response contains an error code: 1" 
} 

没有客户端,无论是Java和命令行然而返回任何错误。他们只是跳过错误的缺失消息导致KTables中的数据丢失。一切都很好,没有注意到,似乎某些消息被破坏了。

我有两个代理,所有主题的复制因子为2,并且完全复制。两家经纪商分别返还。重新启动经纪人没有任何区别。

  • 可能是什么原因?
  • 如何在客户端检测到这种情况?
+0

我不知道'StoreManager'是什么 - 这不是Kafka Streams的一部分。你使用窗式或非窗式KTable吗?你使用什么版本的Kafka Streams? –

+2

@ MatthiasJ.Sax对不起,我的错误让我的问题更加精确。 – Maciej

+0

感谢您的更新。这听起来很奇怪。 “他们只是跳过错误的消息导致数据丢失” - 这听起来也非常奇怪 - AFAIK,消费者没有内置的“跨越”消息机制。也许你应该问在Kafka用户列表http://kafka.apache.org/contact(这可能是一个错误...) - 虽然Kafka Streams问题似乎并不是Kafka Streams问题,因为Kafka Streams内部只是使用了Kafka Consumer,如果消费者表现得很奇怪,那么Kafka Streams就无法解决这个问题。 –

回答

1

通过default Kafka Broker配置关键cleanup.policy设置为delete。将其设置为compact以保留每个密钥的最新消息。 See compaction

删除旧消息不会更改最小偏移量,因此尝试检索下面的消息会导致错误。错误非常模糊。 Kafka Streams客户端将开始从最小偏移量读取消息,因此没有错误。唯一可见的影响是KTables中缺少数据。

当应用程序运行时,由于caches,即使从Kafka本身删除消息,所有数据仍然可用。清理后它们会消失。