本文共 6141 字,大约阅读时间需要 20 分钟。
之前讲的是如何从logstash从不同的数据源读取数据,以及如何使用grok,这样的filter来完成数据的正规化,json化,output配置将书数据输出何处, logstash是配置将数据如何输出到集群中
除了grok显示正规化,还可以定义每一个对应的名称
filter { grok { match => { “message” => “%{IPORHOST:clientip} [%{HTTPDATE:time}] “%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}” %{NUMBER:http_status_code} %{NUMBER:bytes} “(?<http_referer>\S+)” “(?<http_user_agent>\S+)” “(?<http_x_forwarded_for>\S+)”” } remote_field: message 可以把message移除,只保留分片以后的 } }nginx.remote.ip [nginx][remote][ip] 需要这么写来自定义
https://www.elastic.co/guide/en/logstash/current/logstash-config-for-filebeat-modules.html#parsing-apache2input {
beats { port => 5044 host => “0.0.0.0” } } filter { if [fileset][module] == “nginx” { if [fileset][name] == “access” { grok { match => { “message” => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} [%{HTTPDATE:[nginx][access][time]}] “%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}” %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} “%{DATA:[nginx][access][referrer]}” “%{DATA:[nginx][access][agent]}”"] } remove_field => “message” } mutate { add_field => { “read_timestamp” => “%{@timestamp}” } } date { filter过滤器 完成日期转换 match => [ “[nginx][access][time]”, “dd/MMM/YYYY:HⓂ️s Z” ] remove_field => “[nginx][access][time]” } useragent { source => “[nginx][access][agent]” target => “[nginx][access][user_agent]” remove_field => “[nginx][access][agent]” } geoip { source => “[nginx][access][remote_ip]” 把原来访问的地址, target => “[nginx][access][geoip]” 转换成geoip,不是转换,只是新增一个字段,可以下载ip地址库放到geoip } } else if [fileset][name] == “error” { grok { match => { “message” => ["%{DATA:[nginx][error][time]} [%{DATA:[nginx][error][level]}] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] } remove_field => “message” } mutate { rename => { “@timestamp” => “read_timestamp” } 完成字段名转换 } date { match => [ “[nginx][error][time]”, “YYYY/MM/dd HⓂ️s” ] remove_field => “[nginx][error][time]” } } } } output { elasticsearch { hosts => localhost manage_template => false index => “%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}” } }https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html
会告诉你去哪里下载库 els1.x版本用1版本,els5.x版本用2版本https://dev.maxmind.com/geoip/geoip2/geolite2/ 有精确到国家和城市的,整个文件每隔一段时间就更新,所以要及时更新 数据库应该放在logstash主机上,被geoip来过滤使用 把logstash停止服务,把之前的数据删除 下面需要修改logstash的配置文件 将来可以输入多个源,然后输出的时候判定 哪个类型往哪里输出 filter { grok { match => { “message” => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} [%{HTTPDATE:[nginx ][access][time]}] “%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]} " %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} “%{DATA:[nginx][access][referrer]}” " %{DATA:[nginx][access][agent]}”"] } remove_field => “message” } date { match => [ “[nginx][access][time]”, “dd/MMM/YYYY:HⓂ️s Z” ] remove_field => “[nginx][access][time]” } useragent { source => “[nginx][access][agent]” 客户端浏览器 target => “[nginx][access][user_agent]” remove_field => “[nginx][access][agent]” } geoip { source => “[nginx][access][remote_ip]” target => “geoip” database => “/etc/logstash/GeoLite2-City.mmdb” }} output { elasticsearch { hosts => ["node1:9200","node2:9200","node3:9200"] index => "logstash-ngxaccesslog-%{+YYYY.MM.dd}" } }
下载数据库放到目录中,解压展开,展开后是个目录
启动之后正常访问应该会导出到els集群中 基于kibana可以查看信息 试试用x-forward-for伪装地址 现在还都是固定的同一地址 直接写日志试试 有地址显示 现在就可以添加图 利用柱状图看下,删除 重新创建一次 可能是kibana对之前做了缓存,重启服务 再次生成几行 确实有数据,geoip的解析没有问题 先删除 重新加一个修改配置文件,重新生成索引 之前假入的数据都echo,时间一样,所以显示不出来15分内的 之前假入的数据都echo,时间一样,所以显示不出来15分内的filter { grok { match => { “message” => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} [%{HTTPDATE:[nginx ][access][time]}] “%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]} " %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} “%{DATA:[nginx][access][referrer]}” " %{DATA:[nginx][access][agent]}”"] } remove_field => “message” } date { match => [ “[nginx][access][time]”, “dd/MMM/YYYY:HⓂ️s Z” ] remove_field => “[nginx][access][time]” } useragent { source => “[nginx][access][agent]” target => “[nginx][access][user_agent]” remove_field => “[nginx][access][agent]” } geoip { source => “[nginx][access][remote_ip]” target => “geoip” database => “/etc/logstash/GeoLite2-City.mmdb” }} output { elasticsearch { hosts => ["node1:9200","node2:9200","node3:9200"] index => "logstash-ngxaccesslog-%{+YYYY.MM.dd}" } } 注意: 1、输出的日志文件名必须以“logstash-”开头,方可将geoip.location的type自动设定为"geo_point"; 2、target => "geoip" 除了使用grok filter plugin实现日志输出json化之外,还可以直接配置服务输出为json格式;
修改下日志信息
暂时还是不行 geohash找不到是因为把经纬度的字符串转换成了浮点型,这两个类型需要是geo_point的类型,必须是这种类型,否则识别不了 对logstash创建一个可视化之前可以选择但是不显示,因为生成的索引必须以logstash开头 类型必须是geopoint才能识别 **对应的索引名必须以logstash开头2 2.对应的属性就是geoip_point ** 注意: 1、输出的日志文件名必须以“logstash-”开头,方可将geoip.location的type自动设定为"geo_point"; 2、target => "geoip"除了使用grok filter plugin实现日志输出json化之外,还可以直接配置服务输出为json格式;
调整下日志时间教近的,这些信息可以正常输出
保存一下 每一次日志输出都靠grok日志转换的,事实上可以在nginx直接让日志输出成json格式除了使用grok filter plugin实现日志输出json化之外,还可以直接配置服务输出为json格式;
可以让直接日志信息输出为json格式,就不用grok进行转换,apache,tomcat,nginx都可以转载地址:http://ickgn.baihongyu.com/