Logstash来自ES家族,是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理。
Logstash的数据处理流水线有三个主要角色完成:inputs –> filters –> outputs:
logstash安装启动都非常简单,这里就简单提一下关键步骤:
1、部署jdk环境(这部分就不做介绍了)
2、安装 logstash
①、下载:
wget https://artifacts.elastic.co/downloads/logstash/logstash-5.5.1.tar.gz
②、解压:
tar -xvf logstash-5.5.1.tar.gz
③、启动
nohup ./bin/logstash -f config/logstash.conf >/dev/null 2>&1 &
1、读日志文件:
2、监听beat数据
3、读取redis数据
1、MySQL慢日志:
filter {
#区分字段:
if [@metadata][type] == "mysql_slow_log" {
grok {
# 正则匹配(不同MySQL版本慢日志格式可能不通,此处适用于MySQL 5.7+版本)
match => [ "message", "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+\[(?:%{IP:client_ip})?\]\s*\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time
:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<query>(?<a
ction>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$"]
# 慢日志里面的主机IP为主机名,因此这里变相处理下,加入server_ip字段,值为beatname指定的值
add_field => [ "server_ip", "%{[beat][name]}" ]
# 匹配到了就加入标签
add_tag => [ "matched" ]
}
# 未匹配的数据直接drop
if ("matched" not in [tags]) {
drop {}
}
date {
# 这里对慢日志的时间戳进行格式转换
match => [ "timestamp", "UNIX","YYYY-MM-dd HH:mm:ss"]
remove_field => [ "timestamp" ]
}
# 此处对SQL进行MD5运算,并存到fingerprint字段,用于区分同一条SQL
mutate {
add_field => {"sql_hash" => "%{query}"}
gsub => [
"sql_hash", "'.+?'", "",
"sql_hash", "-?\d*\.{0,1}\d+", ""
]
}
fingerprint {
method => "MD5"
key => ["sql_hash"]
}
# 移除不需要的字段
mutate {
remove_field => "sql_hash"
remove_field => "[beat][hostname]"
remove_field => "[beat][name]"
remove_field => "@version"
remove_field => "[beat][version]"
remove_field => "input_type"
remove_field => "offset"
remove_field => "tags"
remove_field => "type"
remove_field => "message"
}
}
}2、WEB访问日志
filter {
# 只处理标签为web_access_log的数据
if [@metadata][type] == "web_access_log" {
# 为了兼容中文路径,这里做了下数据替换
mutate {
gsub => ["message", "\\x", "\\\x"]
}
# 排除HEAD请求
if ( 'method":"HEAD' in [message] ) {
drop {}
}
# Nginx、Apache已经将日志格式定制为json,所以简单处理即可
json {
# 从数据中取出message
source => "message"
# 删除多余字段
remove_field => "message"
remove_field => "[beat][hostname]"
remove_field => "[beat][name]"
remove_field => "@version"
remove_field => "[beat][version]"
remove_field => "input_type"
remove_field => "offset"
remove_field => "tags"
remove_field => "type"
remove_field => "host"
}
}
}3、系统日志
大同小异,就不做注释了。
filter {
if [@metadata][type] == "messages" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
add_field => [ "ip", "%{[beat][name]}" ]
add_tag => [ "matched" ]
}
if ("matched" not in [tags]) {
drop {}
}
date {
locale => "en_US"
timezone => "Asia/Shanghai"
match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
target => "@timestamp"
}
ruby {
code => "event['@timestamp'] = event['@timestamp'].getlocal"
}
mutate {
remove_field => "[beat][hostname]"
remove_field => "[beat][name]"
remove_field => "@version"
remove_field => "[beat][version]"
remove_field => "input_type"
remove_field => "offset"
remove_field => "tags"
remove_field => "type"
remove_field => "host"
}
}1、直接打屏(DEBUG)
2、上报ES