2016-12-14 79 views
1

我正在尝试Ignite和Kafka集成,将kafka消息带入Ignite高速缓存。Ignite和Kafka集成

我的消息密钥是一个随机字符串(要使用Ignite工作,卡夫卡消息密钥不能为空),并且该值是用于人(java类)

当点燃接收JSON字符串表示这样的消息,看起来Ignite将使用消息的密钥(在我的情况下是随机字符串)作为缓存密钥。

是否可以将消息密钥更改为该人员的ID,以便我可以将其放入缓存中。

看起来是streamer.receiver(new StreamReceiver)是可行的

 streamer.receiver(new StreamReceiver<String, String>() { 
      public void receive(IgniteCache<String, String> cache, Collection<Map.Entry<String, String>> entries) throws IgniteException { 
       for (Map.Entry<String, String> entry : entries) { 
        Person p = fromJson(entry.getValue()); 
        //ignore the message key,and use person id as the cache key 
        cache.put(p.getId(), p); 
       } 
      } 
     }); 

这是推荐的方法是什么?我不确定在StreamReceiver中调用cache.put是否是一种正确的方法,因为它只是写入缓存之前的预处理步骤。

回答

2

数据流化器会将您的所有密钥映射到高速缓存亲和节点,创建批次条目并将批次发送到亲和节点。在它StreamReceiver将收到您的条目后,获得Person的ID并调用cache.put(K, V)。将条目引导为将您的密钥映射到相应的缓存关联节点并将更新请求发送到此节点。

一切看起来不错。但是从Kafka映射你的随机密钥的结果和映射Person的ID的结果将是不同的(最可能是不同的节点)。因此,由于冗余网络跳数,您的性能会变差。

不幸的是,当前的KafkaStreamer实现不支持流元组提取器(请参阅,例如StreamSingleTupleExtractor类)。但是您可以使用现有的卡夫卡流式传输实例轻松创建自己的卡夫卡流式传输实现。

您也可以尝试使用KafkaStreamerkeyDecodervalDecoder以便从Kafka消息中提取Person的ID。我不确定,但它可以帮助。

+0

感谢@a_gura的帮助! – Tom