ELK filebeat&logstash 收集grok解析Java应用日志 — Jevic

ELK filebeat&logstash 收集grok解析Java应用日志

2019/04/15 ELK

由于Java 日志输出的特殊性,导致在日志收集发送到ES后,所有信息都显示为一行,不便于搜索以及后续的图形化信息展示等;此次使用logstash grok 插件等对java 应用日志进行拆分处理;

建议推动开发人员对日志格式按照统一的分隔符来进行分割,然后使用ruby插件来处理,快速而高效,使用grok 处理效率低且及其容易丢失数据;

filebeat

filebeat.prospectors:
- input_type: log
  fields:
    vpc_ip: LOCAL_IP
    app_name: APP_NAME
    index_name: INDEX_NAME
  paths:
    - 'LOG_PATH'
  ignore_older: 1m
  close_inactive: 50s
  tail_files: true
  multiline:
    pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}'
    negate: true
    match: after
output.kafka:
  hosts: ["broker1:9092","broker1:9092","broker1:9092"]
  topic: testlog
  required_acks: 1
  compression: gzip
  compression_level: 1
  max_message_bytes: 100000000
logging.level: info
max_procs: 2
  • /usr/bin/filebeat -c /etc/filebeat/filebeat.yml
  • tail_files: true 首次启动从文件结尾开始读取
  • 如若重启后依旧希望从文件结尾读取需要删除 /var/lib/filebeat/ 目录下的offset registry 注册文件
  • rm -rf /var/lib/filebeat/*

logstash

  • 默认没有处理前写入ES的数据如下所示
{
  "_index": "xxxx",
  "_type": "doc",
  "_id": "t_avD2oBBaewsVVMu4Zz",
  "_version": 1,
  "_score": null,
  "_source": {
    "@version": "1",
    "source": "xxx.log",
    "message": "xxxxx",
    "offset": 1680860846,
    "@timestamp": "2019-04-01T03:55:35.038Z",
    "fields": {
      "namespace": "x-xx",
      "service-name": "xxx",
      "pod-ip": "xxx"
    },
    "beat": {
      "name": "xxx",
      "hostname": "xxxx",
      "version": "6.0.0"
    }
  },
  "fields": {
    "@timestamp": [
      "2019-04-01T03:55:35.038Z"
    ]
  },
  "sort": [
    1555041335038
  ]
}
input {
	kafka {
        	bootstrap_servers => ["xxx:9092"]
        	group_id => "logstash_xxxx"
        	topics => ["xxx"]
        	auto_offset_reset => ["latest"]
        	consumer_threads => 6
        	decorate_events => true
        	codec => "json"
        }
}


filter {
      if "exception.Exception" in [message] or "exception.ApiException" in [message] {
          drop { }
      }
      grok {
           match => [ "message", "(?<time>^\d+-\d+-\d+\s\d+:\d+:\d+.\d+)" ]
         }
      grok {
           match => [ "message", "(?<level>[I|E]\w+)" ]
         }
      grok {
           match => [ "message", "(?<tid>[nio|o|io].*-\d+)" ]
         }
      grok {
           match => [ "message", "(?<class>[c]\w+.\w+.\w+.\w+.\w+.\w+.\w+.)" ]
         }
      grok {
           match => [ "message", "(?<method>\[\s\s+.*\d\]{1}\s)" ]
         }
      grok {
           match => [ "message", "(?<ms>[0-9]+[m][s])" ]
           #match => [ "message", "(?<ms>\d+)+ms" ]
         }
      grok {
           match => [ "message", "(?<msg>\s:+.*)" ]
         }

      date {
            match => [ "time" , "yyyy-MM-dd HH:mm:ss.SSS" ]
            target => "@timestamp"
      }
     ## 解析嵌套json格式数据
      mutate {
         add_field => { "@fields" => "%{fields}" }
      }
      
      json {
         source => "@fields"
         ## 删除filebeat 自带的不需要的元数据
         remove_field => [ "beat","@fields","fields","index_name","offset","source","message","time","tags"]
      }
      mutate {
        add_field => { "message" => "%{msg}" }
        remove_field => [ "msg" ]
      }
}
#

output {
        elasticsearch {
        ## 使用Search guard
        hosts => ["https://es:9200"]
        index => "xxxxx_%{+YYYYMMdd}"
        user => admin
        password => "xxxxxxxxx"
        ssl => true
        ssl_certificate_verification => true
        truststore => "/opt/logstash/ssl/truststore.jks"
        truststore_password => "xxxxxxxxxx"
         }
       
#stdout { codec => rubydebug }
}
  • 启动logstash
/opt/logstash/bin/logstash -f /opt/logstash/etc/test.conf --path.data=/opt/logstash/data/testlog

Search

    Post Directory