2014-02-19 49 views
1

我对logstash很新。 我可以运行logstash jar文件并查看kibana网页。 这很酷~~如何从logstash中的日志消息中获取数字?

现在,我想改变下一行(syslog消息)到下一行。

Feb 19 18:45:29 SD550 Jack: REG,0x1000,4,10,20,30,40 
==> 
{ 'timestamp': 'Feb 19 18:45:29', 
    'host': 'SD550', 0x1000:10, 0x1001:20, 0x1002:30, 0x1003:40 } 

在日志消息,“0×1000”是一个起始寄存器地址,“4”是寄存器值的数目,并且下一个值只是值。所以,这意味着0x1000:10,0x1001:20,0x1002:30,0x1003:40。 重要的一点是寄存器值的数量是可以改变的。因此,日志消息的长度可以是可变的。即使它有任何长度,我想得到一个正确的结果。 (例如,0x2000,2,12,22 ==> 0x2000:12,0x2001:22)

这是我不完整的logstash配置文件。我发现了一些过滤器,如grok,mutate和extractnumbers。但是,我不知道该怎么做我想做的事。

input { 
    file { 
     path => "/var/log/syslog" 
     type => "syslog" 
    } 
} 

filter { 
    ??? 
} 

output { 
    elasticsearch { } 
} 

我知道我想要很多,抱歉的家伙。另外,我的最终目标是在kibana中为特定的注册画一个TIME(x)/ VALUE(y)图表。可能吗?我能给你一些建议吗?

谢谢 Youngmin金

回答

0

谢谢大家谁回答我的问题.. 特别是,本林。

在你的帮助下,我得到了这个结果。

{ 
     "@version" => "1", 
    "@timestamp" => "2014-02-20T11:07:28.125Z", 
      "type" => "syslog", 
      "host" => "ymkim-SD550", 
      "path" => "/var/log/syslog", 
      "ts" => "Feb 20 21:07:27", 
      "user" => "ymkim", 
      "func" => "REG", 
      "8192" => 16, 
      "8193" => 32, 
      "8194" => 17, 
      "8195" => 109 
} 

$ logger REG,2000,4,10,20,11,6d

这是我的配置文件。

input { 
    file { 
     path => "/var/log/syslog" 
     type => "syslog" 
    } 
} 

filter { 
    grok { 
     match => ["message", "%{SYSLOGTIMESTAMP:ts} %{SYSLOGHOST:hostname} %{WORD:user}: %{WORD:func},%{WORD:address},%{NUMBER:regNumber},%{GREEDYDATA:regValue}"] 
    } 

    if [func] == "REG" { 
     modbus_csv { 
      start_address => "address" 
      num_register => "regNumber" 
      source => "regValue" 
      remove_field => ["regValue", "hostname", "message", 
       "address", "regNumber"] 
     } 
    } 

} 

output { 
    stdout { debug => true } 
    elasticsearch { } 
} 

和修改后的csv过滤器,名为modbus_csv.rb。

# encoding: utf-8 
require "logstash/filters/base" 
require "logstash/namespace" 

require "csv" 

# CSV filter. Takes an event field containing CSV data, parses it, 
# and stores it as individual fields (can optionally specify the names). 
class LogStash::Filters::MODBUS_CSV < LogStash::Filters::Base 
    config_name "modbus_csv" 
    milestone 2 

    # The CSV data in the value of the source field will be expanded into a 
    # datastructure. 
    config :source, :validate => :string, :default => "message" 

    # Define a list of column names (in the order they appear in the CSV, 
    # as if it were a header line). If this is not specified or there 
    # are not enough columns specified, the default column name is "columnX" 
    # (where X is the field number, starting from 1). 
    config :columns, :validate => :array, :default => [] 
    config :start_address, :validate => :string, :default => "0" 
    config :num_register, :validate => :string, :default => "0" 

    # Define the column separator value. If this is not specified the default 
    # is a comma ','. 
    # Optional. 
    config :separator, :validate => :string, :default => "," 

    # Define the character used to quote CSV fields. If this is not specified 
    # the default is a double quote '"'. 
    # Optional. 
    config :quote_char, :validate => :string, :default => '"' 

    # Define target for placing the data. 
    # Defaults to writing to the root of the event. 
    config :target, :validate => :string 

    public 
    def register 

    # Nothing to do here 

    end # def register 

    public 
    def filter(event) 
    return unless filter?(event) 

    @logger.debug("Running modbus_csv filter", :event => event) 

    matches = 0 

    @logger.debug(event[@num_register].hex) 
    for i in 0..(event[@num_register].hex) 
     @columns[i] = event[@start_address].hex + i 
    end 
    if event[@source] 
     if event[@source].is_a?(String) 
     event[@source] = [event[@source]] 
     end 

     if event[@source].length > 1 
     @logger.warn("modbus_csv filter only works on fields of length 1", 
        :source => @source, :value => event[@source], 
        :event => event) 
     return 
     end 

     raw = event[@source].first 
     begin 
     values = CSV.parse_line(raw, :col_sep => @separator, :quote_char => @quote_char) 

     if @target.nil? 
      # Default is to write to the root of the event. 
      dest = event 
     else 
      dest = event[@target] ||= {} 
     end 

     values.each_index do |i| 
      field_name = @columns[i].to_s || "column#{i+1}" 
      dest[field_name] = values[i].hex 
     end 

     filter_matched(event) 
     rescue => e 
     event.tag "_modbus_csvparsefailure" 
     @logger.warn("Trouble parsing modbus_csv", :source => @source, :raw => raw, 
         :exception => e) 
     return 
     end # begin 
    end # if event 

    @logger.debug("Event after modbus_csv filter", :event => event) 

    end # def filter 

end # class LogStash::Filters::Csv 

最后,我得到了我想要的图表。 (* func = REG(13)4096平均每10m |(13个命中))

0

你要使用神交相匹配的各个领域,有许多建于神交模式,这将有助于你与此有关。 %{SYSLOGBASE}将为您获取时间戳和主机,然后可能会抓取其他模式,例如%{NUMBER}和其他人发现的模式https://github.com/logstash/logstash/blob/v1.3.3/patterns/grok-patterns

由于您的变量日志长度,您的模式很容易得到有点复杂,但是我认为只要将所有数字匹配并将它们存入数组,然后在mutate中将它们映射到寄存器值,就可以避开它。

至于在kibana中生成图表,一旦数据格式正确,这不会很困难。有一个内置的时间序列图形类型,很容易填充。

0

2月19日18点45分29秒SD550杰克:REG,0x1000,4,10,20,30,40如果使用看起来像上面的数据下面的配置文件,并打开kibana

,你的工作。它将字段分成可以搜索的不同类别。我对这一切都很陌生,但这就是我会这么做的。屏幕截图低于以及简单的时间饼图的我把约8以上的行之后在具有不同时间和地址值

input { 
    tcp { 
    type => "test" 
    port => 3333 
    } 
} 


filter { 
    grok { 
     match => ["message", "%{MONTH:month} %{DAY:day} %{TIME:time} %{WORD:sd550} %{WORD:name}: %{WORD:asmThing},%{WORD:address},%{NUMBER:firstno}%{NUMBER:2no}%{NUMBER:3no}%{NUMBER:4no}%{NUMBER:5no}"] 
} 
} 
output { 
    elasticsearch { 
    # Setting 'embedded' will run a real elasticsearch server inside logstash. 
    # This option below saves you from having to run a separate process just 
    # for ElasticSearch, so you can get started quicker! 
    embedded => true 
    } 
} 

Test Kibana

+0

寄存器文件长度是可变的。但你的grok过滤器是固定的!当有超过4个寄存器时,你的过滤器将失败。 –

+0

我没有意识到它改变了。我的过滤器不能用于更改,你是正确的。 – GPPK

0

我有一个想法。要使用多个寄存器地址:值处理可变日志长度,可以使用grok筛选器首先筛选消息。然后使用csv过滤器来分隔每个寄存器值。

筛选:

filter { 
    grok { 
      match => ["message", "%{MONTH:month} %{NUMBER:day} %{TIME:time} %{WORD:host} %{WORD:user}: %{WORD:unit},%{WORD:address},%{NUMBER:regNumber},%{GREEDYDATA:regValue}"] 
      add_field => ["logdate","%{month} %{day} %{time}"] 
      remove_field => ["month","day", "time"] 
    } 

    csv { 
      source => "regValue" 
      remove_field => ["regValue"] 
    } 
} 

输出:

{ 
    "message" => "Feb 19 18:45:29 SD550 Jack: REG,0x1000,4,10,20,30,40", 
    "@version" => "1", 
"@timestamp" => "2014-02-20T02:05:53.608Z", 
     "host" => "SD550" 
     "user" => "Jack", 
     "unit" => "REG", 
    "address" => "0x1000", 
"regNumber" => "4", 
    "logdate" => "Feb 19 18:45:29", 
    "column1" => "10", 
    "column2" => "20", 
    "column3" => "30", 
    "column4" => "40" 
} 

然而,地址字段名称由CSV过滤器给(你不能CSV filter column给字段名,因为场数为变量)。如果你想符合你的要求,你需要修改csv filter

相关问题