我正在尝试Ignite和Kafka集成,将kafka消息带入Ignite高速缓存。Ignite和Kafka集成
我的消息密钥是一个随机字符串(要使用Ignite工作,卡夫卡消息密钥不能为空),并且该值是用于人(java类)
当点燃接收JSON字符串表示这样的消息,看起来Ignite将使用消息的密钥(在我的情况下是随机字符串)作为缓存密钥。
是否可以将消息密钥更改为该人员的ID,以便我可以将其放入缓存中。
看起来是streamer.receiver(new StreamReceiver)
是可行的
streamer.receiver(new StreamReceiver<String, String>() {
public void receive(IgniteCache<String, String> cache, Collection<Map.Entry<String, String>> entries) throws IgniteException {
for (Map.Entry<String, String> entry : entries) {
Person p = fromJson(entry.getValue());
//ignore the message key,and use person id as the cache key
cache.put(p.getId(), p);
}
}
});
这是推荐的方法是什么?我不确定在StreamReceiver中调用cache.put是否是一种正确的方法,因为它只是写入缓存之前的预处理步骤。
感谢@a_gura的帮助! – Tom