2016-06-07 72 views
1

我知道有相当多的node.js模块实现卡夫卡消费者获取消息和写入弹性。但我只需要每个msg的一些字段,而不是全部。有没有我不知道的现有解决方案?卡夫卡到Elasticsearch消费与node.js

+2

我犯了与@pickypg相同的错误,并在一般情况下回答(回答因为脱离主题而被删除),但它可能有助于添加“node.js”标记。 – JDP10101

回答

2

问题是要求node.js中的一个例子。该kafka-node module provides a very nice mechanism for getting a Consumer,您可以用elasticsearch-js模块组合:

// configure Elasticsearch client 
var elasticsearch = require('elasticsearch'); 
var esClient = new elasticsearch.Client({ 
    // ... connection details ... 
}); 
// configure Kafka Consumer 
var kafka = require('kafka-node'); 
var Consumer = kafka.Consumer; 
var client = new kafka.Client(); 
var consumer = new Consumer(
    client, 
    [ 
    // ... topics/partitions ... 
    ], 
    { autoCommit: false } 
); 

consumer.on('message', function(message) { 
    if (message.some_special_field === "drop") { 
    return; // skip it 
    } 

    // drop fields (you can use delete message['field1'] syntax if you need 
    // to parse a more dynamic structure) 
    delete message.field1; 
    delete message.field2; 
    delete message.field3; 

    esClient.index({ 
    index: 'index-name', 
    type: 'type-name', 
    id: message.id_field, // ID will be auto generated if none/unset 
    body: message 
    }, function(err, res) { 
    if (err) { 
     throw err; 
    } 
    }); 
}); 

consumer.on('error', function(err) { 
    console.log(err); 
}); 

注意:使用索引API是不是一个很好的做法,当你有,因为它需要Elasticsearch创造每一个线程发送吨的消息通过操作,这显然是浪费的,并且如果线程池耗尽,它最终会导致拒绝的请求。在任何大量摄入的情况下,更好的解决方案是使用诸如Elasticsearch Streams(或构建于其之上的Elasticsearch Bulk Index Stream)之类的东西来研究,它建立在官方elasticsearch-js客户端之上。然而,我从来没有使用这些客户端扩展,所以我不知道他们做了甚么或者不能工作,但是用法会简单地替换我显示索引发生的部分。

我不相信node.js方法在维护和复杂性方面实际上比下面的Logstash方法好,所以我已经把它们留在这里作为参考。


更好的方法可能是从Logstash中消费Kafka,然后将其发送给Elasticsearch。

您应该可以使用Logstash以简单的方式使用Kafka inputElasticsearch output执行此操作。

Logstash管道中的每个文档都称为“事件”。 Kafka输入假定它将接收到JSON(可由其编解码器配置),该JSON将填充来自该消息的所有字段的单个事件。

然后,您可以删除那些您无意处理的字段,或者有条件地删除整个事件。当然,您需要配置Kafka输入(来自第一个链接)和Elasticsearch输出(以及第二个链接)。当然,您需要配置Kafka输入(来自第一个链接)和Elasticsearch输出(以及第二个链接)。

+0

这看起来像一个很好的答案,但不适用于节点 –

+1

哦,woops。我误解了'节点'就像一个ES节点。不是node.js. :)我没有意识到node.js中有任何等价物,但'kafka-node'使它变得非常简单,它应该与'elasticsearch-js'客户端结合使用来完成同样的事情。 – pickypg

+0

我会添加一个简短的例子。 – pickypg