2016-08-09 90 views
2

我试图从多个主题过滤kafka事件,但是一旦来自一个主题的所有事件都被过滤,logstash无法从其他kafka主题获取事件。我使用的主题有3个分区和2次重复下面是我logstash配置文件具有多个kafka输入的Logstash

input { 
    kafka{    
     auto_offset_reset => "smallest" 
     consumer_id => "logstashConsumer1"   
     topic_id => "unprocessed_log1" 
     zk_connect=>"192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181" 
     type => "kafka_type_1" 
} 
kafka{    
    auto_offset_reset => "smallest" 
    consumer_id => "logstashConsumer1"   
    topic_id => "unprocessed_log2" 
    zk_connect => "192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181" 
    type => "kafka_type_2" 
} 
} 
filter{ 
    if [type] == "kafka_type_1"{ 
    csv { 
     separator=>" " 
     source => "data"   
    } 
} 
if [type] == "kafka_type_2"{  
    csv { 
     separator => " "   
     source => "data" 
    } 
} 
} 
output{ 
    stdout{ codec=>rubydebug{metadata => true }} 
} 
+2

尝试在你的第二个'kafka'输入 – Val

+0

@val非常感谢使用不同的消费(例如'logstashConsumer2')!它的工作 – Abhijeet

+0

你设法使它工作? – Val

回答

0

它是一种非常晚的答复,但如果你想利用输入多个主题,并输出到另一个卡夫卡多输出,你可以这样做这个:

input { 
    enter code herekafka { 
    topics => ["topic1", "topic2"] 
    codec => "json" 
    bootstrap_servers => "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092" 
    decorate_events => true 
    group_id => "logstash-multi-topic-consumers" 
    consumer_threads => 5 
    } 
} 

output { 
    if [kafka][topic] == "topic1" { 
    kafka { 
     codec => "json" 
     topic_id => "new_topic1" 
     bootstrap_servers => "output-kafka-1:9092" 
    } 
    } 
    else if [kafka][topic] == "topic2" { 
     kafka { 
     codec => "json" 
     topic_id => "new_topic2" 
     bootstrap_servers => "output-kafka-1:9092" 
     } 
    } 
} 

请注意,详细介绍您的引导服务器,给你的kafka广告监听器的名称。

REF-1:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-group_id

REF-2:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events