2017-04-25 169 views
0

我需要一个任务来解析和组合exim_mainlog for ELK。Logstash多行日志日志

问题是下一:

我logstash多插件不会收集与唯一的消息ID日志文件的行成一个事件。 当我尝试按正确顺序发送4个字符串时,它的效果很好。 订购这样的:

2017-04-10 00:00:30 1cxKsn-0001GB-2t CTAS=IN RefID= (ISpam= IFlags=v=2.2 cv=Op4/823t c=1 sm=1 tr=0 a=6HVp5djceeYjte4jJb6Ryw==:17 a=AzvcPWV-tVgA:10 a=uHJYF-HtSykr7tHsIToA:9 a=CTTii-5M3Z-LMe4tr8cA:9 a=QEXdDO2ut3YA:10 a=pyshpDcKeHPZtuIe0Z8A:9) 
2017-04-10 00:00:30 1cxKsn-0001GB-2t <= [email protected] H=m37s3-2-28db.ispgateway.com [176.221.47.15] P=smtp S=2567 [email protected] 
2017-04-10 00:00:30 1cxKsn-0001GB-2t => [email protected] R=internal_gw T=remote_smtp H=192.168.1.11 [192.168.1.11] C="250 OK id=1cxKso-0002iK-Q7" 
2017-04-10 00:00:30 1cxKsn-0001GB-2t Completed 

如果otder是正确的 - 一切正常良好。 但是,当同一事件的行之间插入其他垃圾信息时,它会发生故障。

实际日志是这样的:

2017-04-10 00:00:30 1cxKsn-0001GB-2t CTAS=IN RefID= (ISpam= IFlags=v=2.2 cv=Op4/823t c=1 sm=1 tr=0 a=6HVp5djceeYjte4jJb6Ryw==:17 a=AzvcPWV-tVgA:10 a=uHJYF-HtSykr7tHsIToA:9 a=CTTii-5M3Z-LMe4tr8cA:9 a=QEXdDO2ut3YA:10 a=pyshpDcKeHPZtuIe0Z8A:9) 
2017-04-10 00:00:30 1cxKsn-0001GB-2t <= [email protected] H=m37s3-2-28db.ispgateway.com [176.221.47.15] P=smtp S=2567 [email protected] 
2017-04-10 00:00:30 1cxKsn-0001GB-2t => [email protected] R=internal_gw T=remote_smtp H=192.168.1.11 [192.168.1.11] C="250 OK id=1cxKso-0002iK-Q7" 
2017-04-10 00:00:30 1cxKsn-0001GB-2t Completed 
2017-04-10 00:00:30 fixed_login authenticator failed for (faYNpaLtF) [192.168.24.24]: 535 Incorrect authentication data 
2017-04-10 00:00:30 fixed_login authenticator failed for (lkLmh6Lk) [192.168.24.24]: 535 Incorrect authentication data 
2017-04-10 00:00:30 fixed_login authenticator failed for (dLKdHZ) [192.168.24.24]: 535 Incorrect authentication data 
2017-04-10 00:00:30 H=mx4.rissoidupgrades.com [79.137.110.132] F=<[email protected]> rejected RCPT <[email protected]>: ICIR16 - unknown user 
2017-04-10 00:00:30 unexpected disconnection while reading SMTP command from ([111.111.111.111]) [117.241.112.188] (error: Connection reset by peer) 
2017-04-10 00:00:30 1cxKso-0001GQ-1R CTAS=IN RefID= (ISpam=Confirmed IFlags=v=2.2 cv=Op4/823t c=1 sm=1 tr=0 a=LMNu0MzFDzFZvX0DaJwgIA==:17 a=AwJkFeBFn10A:10 a=AzvcPWV-tVgA:10 a=HFQ-CQzmNWWYERzML24A:9) 
2017-04-10 00:00:31 1cxKso-0001GQ-1R <= [email protected] H=abcdrfg.managed.com [62.138.219.130] P=esmtp S=671 [email protected]smanaged.com 
2017-04-10 00:00:30 fixed_login authenticator failed for (faYNpaLtF) [192.168.24.24]: 535 Incorrect authentication data 
2017-04-10 00:00:30 fixed_login authenticator failed for (lkLmh6Lk) [192.168.24.24]: 535 Incorrect authentication data 
2017-04-10 00:00:30 fixed_login authenticator failed for (dLKdHZ) [192.168.24.24]: 535 Incorrect authentication data 
2017-04-10 00:00:30 H=mx4.rissoidupgrades.com [79.137.110.132] F=<[email protected]> rejected RCPT <[email protected]>: ICIR16 - unknown user 
2017-04-10 00:00:30 unexpected disconnection while reading SMTP command from ([117.241.112.188]) [117.241.112.188] (error: Connection reset by peer) 
2017-04-10 00:00:31 1cxKso-0001GQ-1R => [email protected] R=internal_gw T=remote_smtp H=192.168.1.11 [192.168.1.11] C="250 OK id=1cxKsp-0002iR-QJ" 
2017-04-10 00:00:31 1cxKso-0001GQ-1R Completed 

当时我想有ID 1cxKsn-0001GB-2t1cxKso-0001GQ-1R两个事件在我Kibana结束。

这里是我的模式:

EXIM_MSGID [0-9A-Za-z]{6}-[0-9A-Za-z]{6}-[0-9A-Za-z]{2} 
EXIM_FLAGS (<=|[-=>*]>|[*]{2}|==) 
EXIM_DATE %{YEAR:exim_year}-%{MONTHNUM:exim_month}-%{MONTHDAY:exim_day} %{TIME:exim_time} 
EXIM_DATE_EMPTY %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME} 
EXIM_PID \[%{POSINT}\] 
EXIM_QT ((\d+y)?(\d+w)?(\d+d)?(\d+h)?(\d+m)?(\d+s)?) 
EXIM_EXCLUDE_TERMS (Message is frozen|(Start|End) queue run| Warning: | retry time not reached | no (IP address|host name) found for (IP address|host) | unexpected disconnection while reading SMTP command | no immediate delivery: |another process is handling this message) 
EXIM_REMOTE_HOST (H=(%{NOTSPACE:remote_hostname})?(\(%{NOTSPACE:remote_heloname}\))?\[%{IP:remote_host}\]) 
EXIM_INTERFACE (I=\[%{IP:exim_interface}\](:%{NUMBER:exim_interface_port})) 
EXIM_PROTOCOL (P=%{NOTSPACE:protocol}) 
EXIM_MSG_SIZE (S=%{NUMBER:exim_msg_size}) 
EXIM_HEADER_ID (id=%{NOTSPACE:exim_header_id}) 
EXIM_SUBJECT (T=%{QS:exim_subject}) 
NUM_EMAIL (%{HOSTNAME}\@%{HOSTNAME}) 
EXIM_RECEIVER (=>\s*%{EMAILADDRESS:receiver}(\s*<%{EMAILADDRESS:envelope_sndr}>)?|=>\s*%{NUM_EMAIL:receiver}(\s*<%{EMAILADDRESS:envelope_sndr}>)?) 
EXIM_ROUTER (R=%{WORD:router}) 
EXIM_TRANSPORT (T=%{WORD:transport}) 
EXIM_REMOTE_SMTP_CONFIRM (C="%{GREEDYDATA:smtp_remote_response}") 



EXIM_SPAM %{EXIM_DATE_EMPTY} %{EXIM_MSGID} CTAS=%{WORD:exim_spam_dest} RefID=(%{WORD:exim_refid})? \((I|O)Spam=(%{WORD:exim_spam})? ((I|O)Virus=%{WORD:exim_virus})?(I|O)Flags=(%{GREEDYDATA:exim_spam_flags})? cv=%{GREEDYDATA:exim_spam_other} \) 

EXIM_LEFT %{EXIM_DATE_EMPTY} %{EXIM_MSGID} %{EXIM_FLAGS:exim_flags} %{GREEDYDATA:exim_email} (%{EXIM_REMOTE_HOST})? %{EXIM_PROTOCOL} (?:X=%{GREEDYDATA:exim_auth_details})?(?:A=%{GREEDYDATA:exim_authenticator})?(?:%{EXIM_MSG_SIZE:exim_mes_size})? (?:id=%{NUM_EMAIL:exim_uid})? 

EXIM_RIGHT %{EXIM_DATE_EMPTY} %{EXIM_MSGID} %{EXIM_RECEIVER} %{EXIM_ROUTER} %{EXIM_TRANSPORT} %{EXIM_REMOTE_HOST} %{EXIM_REMOTE_SMTP_CONFIRM} 

EXIM_SPAM_CHECK_ST %{EXIM_DATE} %{EXIM_MSGID:exim_msgid} Completed 

这里是我的filter.conf:

filter { 
    if [type] == "exim" { 
     multiline { 
     patterns_dir => "/etc/logstash/patterns.d" 
     pattern => "%{EXIM_DATE} %{EXIM_MSGID:msgid}" 
     what => "previous" 
     } 
     grok { 
     patterns_dir => "/etc/logstash/patterns.d" 
     break_on_match => false 
     match   => [ "message", "^%{EXIM_SPAM}" ] 
     } 
     grok { 
     patterns_dir => "/etc/logstash/patterns.d" 
     break_on_match => false 
     match   => [ "message", "^%{EXIM_LEFT}" ] 
     } 

     grok { 
     patterns_dir => "/etc/logstash/patterns.d" 
     break_on_match => false 
     match   => [ "message", "^%{EXIM_RIGHT}" ] 
    } 
    grok { 
     patterns_dir => "/etc/logstash/patterns.d" 
     break_on_match => false 
     match   => [ "message", "^%{EXIM_SPAM_CHECK_ST}" ] 
    } 
    } 
} 

回答

0

有关多记录一个事件累计收集信息你有一些方法:

    使用“Aggregate filter plugin”的
  • 使用“ElasticS earch过滤器插件“

首先,您需要将所有事件信息收集到一个MessageId中。 但是这可能会带来很多问题 - 一些日志行没有MessageId,而且许多Exim工作人员按照混合顺序编写自己的行。 第二,如果您使用ElastisSearch存储事件信息,则可以提出任何其他请求来搜索先前保存的事件并更新其文件。

ES这样的一些例子https://gist.github.com/greem/6e02b57ff26eaacb01b2