2017-02-15 70 views
0

环境Logstash过滤和解析模输出

  • 的Ubuntu 16.04
  • Logstash 5.2.1
  • ElasticSearch 5.1

我已经配置好了DEIS平台将日志发送到我们的没有问题的Logstack节点。不过,我对Ruby还是个新手,Regexes并不是我的强项。

日志示例

2017-02-15T14:55:24UTC deis-logspout[1]: 2017/02/15 14:55:24 routing all to udp://x.x.x.x:xxxx\n 

Logstash配置:

input { 
    tcp { 
     port => 5000 
     type => syslog 
     codec => plain 
    } 
    udp { 
     port => 5000 
     type => syslog 
     codec => plain 
    } 
} 

filter { 
    json { 
     source => "syslog_message" 
    } 
} 

output { 
    elasticsearch { hosts => ["foo.somehost"] } 
} 

Elasticsearch输出:

"@timestamp" => 2017-02-15T14:55:24.408Z, 
"@version" => "1", 
"host" => "x.x.x.x", 
"message" => "2017-02-15T14:55:24UTC deis-logspout[1]: 2017/02/15 14:55:24 routing all to udp://x.x.x.x:xxxx\n", 
"type" => "json" 

期望的结果:

"@timestamp" => 2017-02-15T14:55:24.408Z, 
"@version" => "1", 
"host" => "x.x.x.x", 
"type" => "json" 
"container" => "deis-logspout" 
"severity level" => "Info" 
"message" => "routing all to udp://x.x.x.x:xxxx\n" 

我怎样才能提取信息从消息中为它们单独的领域?

+0

你的意思是你的提取*消息*为四个不同的领域* *(部件如你所提到的)?您是否已经从上面的ES **输出**中看到了这一点。那么你在这里有什么问题?关于方法? – Kulasangar

+0

对不起,如果这不简明,我更新了这个问题。 – user7565843

回答

0

不幸的是,你对你想要做什么的假设略有偏差,但我们可以解决这个问题!

您为JSON创建了一个正则表达式,但是您没有解析JSON。你只是简单地解析一个混杂的syslog日志(参见source中的syslogStreamer),但实际上并不是syslog格式(RFC 5424或3164)。 Logstash随后提供JSON输出。

让我们分解消息,它将成为您解析的源代码。关键是你必须正面解析消息。

消息:

2017-02-15T14:55:24UTC deis-logspout[1]: 2017/02/15 14:55:24 routing all to udp://x.x.x.x:xxxx\n 
  • 2017-02-15T14:55:24UTC:时间戳是一种常见的神交图案。这主要遵循TIMESTAMP_ISO8601但不完全。
  • deis-logspout[1]:这将是您的日志源,您可以命名容器。您可以使用grok模式URIHOST
  • routing all to udp://x.x.x.x:xxxx\n:由于对于大多数日志的消息被包含在该消息的末尾,可以只然后使用神交图案GREEDYDATA这是.*在正则表达式的等效。
  • 2017/02/15 14:55:24:另一个时间戳(为什么?)与常见的grok模式不匹配。

使用grok过滤器,可以将语法(从正则表达式抽象)映射到语义(您提取的值的名称)。例如%{URIHOST:container}

你会看到我做了一些hack grock过滤器来完成格式化工作。即使您不打算捕获结果,也可以匹配文本的部分内容。如果您无法更改时间戳的格式以符合标准,请创建一个自定义模式。

配置:

input { 
    tcp { 
     port => 5000 
     type => deis 
    } 
    udp { 
     port => 5000 
     type => deis 
    } 
} 

filter { 
    grok { 
     match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}(UTC|CST|EST|PST) %{URIHOST:container}\[%{NUMBER}\]: %{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME} %{GREEDYDATA:msg}" } 
    } 
} 

output { 
    elasticsearch { hosts => ["foo.somehost"] } 
} 

输出:

{ 
    "container" => "deis-logspout", 
    "msg" => "routing all to udp://x.x.x.x:xxxx", 
    "@timestamp" => 2017-02-22T23:55:28.319Z, 
    "port" => 62886, 
    "@version" => "1", 
    "host" => "10.0.2.2", 
    "message" => "2017-02-15T14:55:24UTC deis-logspout[1]: 2017/02/15 14:55:24 routing all to udp://x.x.x.x:xxxx", 
    "timestamp" => "2017-02-15T14:55:24" 
    "type" => "deis" 
} 

你还可以变异的项目下降@timestamp,@host等,因为这些是由Logstash提供默认。另一个建议是使用date filter将发现的任何时间戳转换为可用格式(更适合搜索)。

根据日志格式的不同,您可能必须稍微改变模式。我只有一个例子可以解决。这也保留了原始的完整信息,因为在Logstash中完成的任何字段操作都具有破坏性(它们用相同名称的字段覆盖值)。

资源:

+0

感谢您的意见。我将如何配置一个Grok过滤器来解析消息? – user7565843

+0

再次感谢Signus,我已经更新了上述问题以获得更好的上下文。 – user7565843