初探ELK和集中式日志管理系统
字数 1914 2025-08-22 18:37:21
ELK和集中式日志管理系统详解
前言
在规模较大的IT环境中,传统的日志分析方法(如grep、awk)效率低下,面临以下问题:
- 日志量太大难以归档
- 文本搜索速度慢
- 缺乏多维度查询能力
集中式日志管理系统通过将所有节点上的日志统一收集、管理和访问来解决这些问题。ELK提供了一整套开源解决方案,各组件完美衔接。
ELK架构概述
ELK是三个开源项目的首字母缩写:
- Elasticsearch:搜索和分析引擎
- Logstash:服务器端数据处理管道,负责采集、转换和发送数据
- Kibana:数据可视化平台
Elasticsearch详解
核心特性
- 实时分布式搜索分析引擎
- 高速、大规模数据检索能力
关键配置项
| 配置项 | 描述 | 默认值 |
|---|---|---|
| cluster.name | 集群名称 | elasticsearch |
| node.master | 是否可作为主节点 | true |
| node.name | 节点名称 | 自动获取 |
| path.data | 索引数据存储路径 | es根目录下的data |
| http.port | HTTP端口 | 9200 |
| transport.tcp.port | 节点交互端口 | 9300 |
| network.host | 绑定地址 | 0.0.0.0 |
| index.number_of_replicas | 索引副本数 | 1 |
| http.cors.enabled | 是否支持跨域 | false |
安装与测试
- 示例配置:
cluster.name: master
node.name: elk-1
path.data: /data/es
path.logs: /var/log/es
bootstrap.memory_lock: true
network.host: 0.0.0.0
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"
- 启动服务:
bin/elasticsearch
- 验证服务:
curl -i -XGET 'localhost:9200/_count?pretty'
可视化工具
安装elasticsearch-head插件:
git clone git://github.com/mobz/elasticsearch-head.git
cd elasticsearch-head
npm install
npm run start
访问地址:http://localhost:9100/
Logstash详解
核心概念
- 管道结构:必须包含input和output,可选filter
- 数据处理流程:输入→过滤→输出
目录结构
| 目录 | 描述 | 默认路径 |
|---|---|---|
| bin | 二进制脚本和插件 | {extract.path}/bin |
| config | 配置文件 | {extract.path}/config |
| logs | 日志文件 | {extract.path}/logs |
| plugins | 插件目录 | {extract.path}/plugins |
| data | 持久化数据 | {extract.path}/data |
输入插件(Input)
- 标准输入:
input {
stdin { }
}
- 文件输入:
input {
file {
path => "/var/log/secure"
type => "logstash_log"
start_position => "beginning"
}
}
- Syslog输入:
input {
syslog {
port => "514"
type => "syslog"
}
}
- Beats输入:
input {
beats {
port => 5044
}
}
- Redis输入:
input {
redis {
host => "127.0.0.1"
port => "6379"
password => "passwd"
db => "1"
data_type => "list"
key => "redis_key"
}
}
输出插件(Output)
- 标准输出:
output {
stdout {
codec => "rubydebug"
}
}
- 文件输出:
output {
file {
path => "/var/log/logstash/%{host}/{application}"
codec => line { format => "%{message}" }
}
}
- Elasticsearch输出:
output {
elasticsearch {
hosts => "localhost:9200"
index => "logstash-%{+YYYY.MM.dd}"
}
}
- Redis输出:
output {
redis {
data_type => "list"
host => "127.0.0.1"
port => "6379"
db => "1"
password => "passwd"
key => "redis_key"
}
}
过滤器插件(Filter)
Grok过滤器:
- 语法:
%{SYNTAX:SEMANTIC} - 示例:
filter {
grok {
match => {
"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:path} %{NUMBER:bytes} %{NUMBER:duration}"
}
}
}
测试方法
- 命令行快速测试:
bin/logstash -e 'input { stdin { } } output { stdout {} }'
- 文件输入测试:
bin/logstash -e 'input { stdin { } } output { elasticsearch { hosts => ["127.0.0.1:9200"] } stdout { codec => rubydebug }}'
Kibana详解
关键配置项
| 配置项 | 描述 | 默认值 |
|---|---|---|
| server.port | Kibana端口 | 5601 |
| server.host | 后端服务器地址 | 0.0.0.0 |
| elasticsearch.url | Elasticsearch地址 | http://localhost:9200 |
| kibana.index | Kibana索引名称 | .kibana |
| logging.dest | 日志输出目标 | stdout |
安装与配置
- 示例配置:
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://127.0.0.1:9200"]
kibana.index: ".kibana"
- 启动服务:
bin/kibana
- 访问地址:http://IP:5601
实战案例
1. 收集Apache日志
修改Apache配置:
LogFormat "{ \"@timestamp\": \"%{%Y-%m-%dT%H:%M:%S%z}t\", \"@version\": \"1\", \"tags\":[\"apache\"], \"message\": \"%h %l %u %t \\\"%r\\\" %>s %b\", \"clientip\": \"%a\", \"duration\": %D, \"status\": %>s, \"request\": \"%U%q\", \"urlpath\": \"%U\", \"urlquery\": \"%q\", \"bytes\": %B, \"method\": \"%m\", \"site\": \"%{Host}i\", \"referer\": \"%{Referer}i\", \"useragent\": \"%{User-agent}i\" }" json
CustomLog "logs/access_log" json
2. 收集Nginx日志
修改Nginx配置:
http {
log_format json '{"@timestamp":"$time_iso8601",'
'"@version":"1",'
'"client":"$remote_addr",'
'"url":"$uri",'
'"status":"$status",'
'"domain":"$host",'
'"host":"$server_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"referer": "$http_referer",'
'"ua": "$http_user_agent"'
'}';
server {
access_log /var/log/nginx/access_json.log json;
}
}
3. 收集用户历史命令
修改/etc/bashrc:
HISTDIR='/var/log/command.log'
if [ ! -f $HISTDIR ];then
touch $HISTDIR
chmod 666 $HISTDIR
fi
export HISTTIMEFORMAT="{\"TIME\":\"%F %T\",\"HOSTNAME\":\"$HOSTNAME\",\"LOGIN_IP\":\"$(who -u am i 2>/dev/null| awk '{print $NF}'|sed -e 's/[()]//g')\",\"LOGIN_USER\":\"$(who am i|awk '{print $1}')\",\"CURRENT_USER\":\"${USER}\",\"CMD\":\""
export PROMPT_COMMAND='history 1|tail -1|sed "s/^[ ]\+[0-9]\+ //"|sed "s/^[0-9]\+ //" | sed "s/var/log/command.log'
4. 收集SSH日志
修改SSH配置:
SyslogFacility local6
修改rsyslog配置:
local6.* /var/log/sshd.log
综合配置示例
input {
file {
path => "/var/log/nginx/access.log"
type => "nginx"
codec => "json"
}
file {
path => "/var/log/httpd/access_log"
type => "apache"
codec => "json"
}
file {
path => "/var/log/command.log"
type => "history"
codec => "json"
}
file {
path => "/var/log/sshd.log"
type => "ssh"
}
}
filter {
grok {
match => [
"message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host} %{DATA:syslog_program}(?:
$$
%{POSINT:pid}
$$
)? %{WORD:login} password for %{USERNAME:username} from %{IP:ip} %{GREEDYDATA:message}",
"message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host} %{DATA:syslog_program}(?:
$$
%{POSINT:pid}
$$
)?: message repeated 2 times:
$$
%{WORD:login} password for %{USERNAME:username} from %{IP:ip} %{GREEDYDATA:message}",
"message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host} %{DATA:syslog_program}(?:\[%{POSINT:pid}
$$
)? %{WORD:login} password for invalid user %{USERNAME:username} from %{IP:ip} %{GREEDYDATA:message}",
"message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host} %{DATA:syslog_program}(?:
$$
%{POSINT:pid}
$$
)? %{WORD:login} %{WORD:auth_method} for %{USERNAME:username} from %{IP:ip} %{GREEDYDATA:message}"
]
}
}
output {
if [type] == "nginx" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
}
if [type] == "apache" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "apache-%{+YYYY.MM.dd}"
}
}
if [type] == "history" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "history-%{+YYYY.MM.dd}"
}
}
if [type] == "ssh" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "ssh-%{+YYYY.MM.dd}"
}
}
}
Windows日志收集
使用Winlogbeat收集Windows事件日志:
关键配置
winlogbeat.event_logs:
- name: Application
provider:
- Application Error
- Application Hang
- Windows Error Reporting
- EMET
- name: Security
level: critical, error, warning
event_id: 4624, 4625, 4634, 4672, 4720
- name: System
ignore_older: 168h
setup.kibana:
host: "http://100.2.170.124:5601"
output.elasticsearch:
hosts: ["100.2.170.124:9200"]
安装与启动
- 安装服务:
.\install-service-winlogbeat.ps1
.\winlogbeat.exe setup
- 启动服务:
net start winlogbeat
总结
ELK栈提供了完整的日志管理解决方案:
- Elasticsearch:高效存储和检索日志数据
- Logstash:灵活的数据收集和处理管道
- Kibana:强大的数据可视化和分析界面
通过合理配置,可以收集各种类型的日志(Apache、Nginx、系统日志、Windows事件日志等),并进行统一管理和分析。