2

我正在使用ELK和logstash-logback-encoder将日志推送到Logstash。现在我想使用相同的堆栈,即ELK和logstash-logback-encoder进行分析。Logstash:有没有方法根据字段名称调用grok模板

流速:

API(Create User)----> Commit data to RDBMS -----> 
Callback Listener(on post persist and post update) ---> 
Logger.info("IndexName: {} . DocId: {} .User json: {}", "Customer", user.getID(), user.getJson()); 

Logger.info(); logstash-logback-encoder将把数据推送到Logstash,这会将数据推送到ES。

logstash.conf是如下:

input { 
tcp { 
port => 5044 
codec => multiline { 
what => "previous" 
    } 
} 
} 
filter{ 
grok { 
    match => ["message", "(?<index_name>(?<=IndexName:).*?(?=\s))"] 
    match => ["message", "(?<doc_id>(?<=DocId:).*?(?=\s))"] 
    break_on_match => false 
    remove_tag => ["_grokparsefailure","multiline"] 
} 
mutate { 
    gsub => ['message', "\t", " "] 
    gsub => ['message',"\e\[(\d*;)*(\d*)m"," "] 
} 
} 
output { 
if [index_name] == "Customer" { 
     elasticsearch { 
       hosts => ["localhost:9200"] 
       index => "analytics-customers" 
       document_id => "%{doc_id}" 
       } 
     }else { 
      elasticsearch { 
      hosts => ["localhost:9200"] 
     } 
     } 
    stdout { codec => rubydebug } 
    } 

我的问题是,如果我想使用Logstash的分析,然后我用神交解析JSON。用我拥有的表和字段数量,logstash.conf将变得非常庞大。

有没有一种方法可以在logstash.conf中应用grok模板,我可以根据索引名称调用grok模板。 像:

grok { 
match => ["message", "(?<index_name>(?<=IndexName:).*?(?=\s))"] 
if(index_name=="User"){ 
//Invoke User template which will fetch/create fields from passed json. 
} 
if(index_name=="Order"){ 
//Invoke Order template which will fetch/create fields from passed json. 
} 
} 

回答

0

如果你能设法得到日志的单行线,这将是最好的方式。因为您可以将编解码器更改为“json_lines”,并且所有内容都会自动解析!

否则可以使用IF(描述here

实施例:

if [subsystem] == "http" { 
    mutate{ ... } 
    grok{ ... } 
}