我知道有相当多的node.js模块实现卡夫卡消费者获取消息和写入弹性。但我只需要每个msg的一些字段,而不是全部。有没有我不知道的现有解决方案?卡夫卡到Elasticsearch消费与node.js
回答
问题是要求node.js中的一个例子。该kafka-node
module provides a very nice mechanism for getting a Consumer
,您可以用elasticsearch-js模块组合:
// configure Elasticsearch client
var elasticsearch = require('elasticsearch');
var esClient = new elasticsearch.Client({
// ... connection details ...
});
// configure Kafka Consumer
var kafka = require('kafka-node');
var Consumer = kafka.Consumer;
var client = new kafka.Client();
var consumer = new Consumer(
client,
[
// ... topics/partitions ...
],
{ autoCommit: false }
);
consumer.on('message', function(message) {
if (message.some_special_field === "drop") {
return; // skip it
}
// drop fields (you can use delete message['field1'] syntax if you need
// to parse a more dynamic structure)
delete message.field1;
delete message.field2;
delete message.field3;
esClient.index({
index: 'index-name',
type: 'type-name',
id: message.id_field, // ID will be auto generated if none/unset
body: message
}, function(err, res) {
if (err) {
throw err;
}
});
});
consumer.on('error', function(err) {
console.log(err);
});
注意:使用索引API是不是一个很好的做法,当你有,因为它需要Elasticsearch创造每一个线程发送吨的消息通过操作,这显然是浪费的,并且如果线程池耗尽,它最终会导致拒绝的请求。在任何大量摄入的情况下,更好的解决方案是使用诸如Elasticsearch Streams(或构建于其之上的Elasticsearch Bulk Index Stream)之类的东西来研究,它建立在官方elasticsearch-js客户端之上。然而,我从来没有使用这些客户端扩展,所以我不知道他们做了甚么或者不能工作,但是用法会简单地替换我显示索引发生的部分。
我不相信node.js方法在维护和复杂性方面实际上比下面的Logstash方法好,所以我已经把它们留在这里作为参考。
更好的方法可能是从Logstash中消费Kafka,然后将其发送给Elasticsearch。
您应该可以使用Logstash以简单的方式使用Kafka input和Elasticsearch output执行此操作。
Logstash管道中的每个文档都称为“事件”。 Kafka输入假定它将接收到JSON(可由其编解码器配置),该JSON将填充来自该消息的所有字段的单个事件。
然后,您可以删除那些您无意处理的字段,或者有条件地删除整个事件。当然,您需要配置Kafka输入(来自第一个链接)和Elasticsearch输出(以及第二个链接)。当然,您需要配置Kafka输入(来自第一个链接)和Elasticsearch输出(以及第二个链接)。
- 1. 卡夫卡消费者不消费
- 2. 卡夫卡gruop消费者
- 3. 卡夫卡消费10节
- 4. 卡夫卡消费群挂
- 5. 卡夫卡消费者与JAVA
- 6. 卡夫卡的NodeJS与消费群PM2
- 7. 卡夫卡10.2新消费者与旧消费者
- 8. 在卡夫卡消费活的消息
- 9. 消费消费使用卡夫卡消费者 - Java
- 10. 卡夫卡0.9:消费从最早的卡夫卡偏移
- 11. 监测卡夫卡消费滞后
- 12. 复位消费者在卡夫卡0.10
- 13. 无法创建卡夫卡消费者
- 14. Spark Streaming中的卡夫卡消费者
- 15. 卡夫卡消费者同步行为
- 16. 问题在消费者卡夫卡
- 17. 暂停高级卡夫卡消费者
- 18. 卡夫卡消费者不是从
- 19. 卡夫卡消费者行为
- 20. 卡夫卡消费群体和分区
- 21. 关闭卡夫卡消费者
- 22. 卡夫卡消费者行为
- 23. 卡夫卡多个主题消费
- 24. 春季集成卡夫卡消费者
- 25. 如何暂停卡夫卡消费者?
- 26. 卡夫卡 - 同步消费群体
- 27. 卡夫卡消费者/生产者API
- 28. 卡夫卡消费者 - Java客户端
- 29. 卡夫卡0.90消费者坚持
- 30. 云中的卡夫卡消费者
我犯了与@pickypg相同的错误,并在一般情况下回答(回答因为脱离主题而被删除),但它可能有助于添加“node.js”标记。 – JDP10101