初探ELK和集中式日志管理系统
字数 1914 2025-08-22 18:37:21

ELK和集中式日志管理系统详解

前言

在规模较大的IT环境中,传统的日志分析方法(如grep、awk)效率低下,面临以下问题:

  • 日志量太大难以归档
  • 文本搜索速度慢
  • 缺乏多维度查询能力

集中式日志管理系统通过将所有节点上的日志统一收集、管理和访问来解决这些问题。ELK提供了一整套开源解决方案,各组件完美衔接。

ELK架构概述

ELK是三个开源项目的首字母缩写:

  • Elasticsearch:搜索和分析引擎
  • Logstash:服务器端数据处理管道,负责采集、转换和发送数据
  • Kibana:数据可视化平台

Elasticsearch详解

核心特性

  • 实时分布式搜索分析引擎
  • 高速、大规模数据检索能力

关键配置项

配置项 描述 默认值
cluster.name 集群名称 elasticsearch
node.master 是否可作为主节点 true
node.name 节点名称 自动获取
path.data 索引数据存储路径 es根目录下的data
http.port HTTP端口 9200
transport.tcp.port 节点交互端口 9300
network.host 绑定地址 0.0.0.0
index.number_of_replicas 索引副本数 1
http.cors.enabled 是否支持跨域 false

安装与测试

  1. 示例配置:
cluster.name: master
node.name: elk-1
path.data: /data/es
path.logs: /var/log/es
bootstrap.memory_lock: true
network.host: 0.0.0.0
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"
  1. 启动服务:
bin/elasticsearch
  1. 验证服务:
curl -i -XGET 'localhost:9200/_count?pretty'

可视化工具

安装elasticsearch-head插件:

git clone git://github.com/mobz/elasticsearch-head.git
cd elasticsearch-head
npm install
npm run start

访问地址:http://localhost:9100/

Logstash详解

核心概念

  • 管道结构:必须包含input和output,可选filter
  • 数据处理流程:输入→过滤→输出

目录结构

目录 描述 默认路径
bin 二进制脚本和插件 {extract.path}/bin
config 配置文件 {extract.path}/config
logs 日志文件 {extract.path}/logs
plugins 插件目录 {extract.path}/plugins
data 持久化数据 {extract.path}/data

输入插件(Input)

  1. 标准输入
input {
  stdin { }
}
  1. 文件输入
input {
  file {
    path => "/var/log/secure"
    type => "logstash_log"
    start_position => "beginning"
  }
}
  1. Syslog输入
input {
  syslog {
    port => "514"
    type => "syslog"
  }
}
  1. Beats输入
input {
  beats {
    port => 5044
  }
}
  1. Redis输入
input {
  redis {
    host => "127.0.0.1"
    port => "6379"
    password => "passwd"
    db => "1"
    data_type => "list"
    key => "redis_key"
  }
}

输出插件(Output)

  1. 标准输出
output {
  stdout {
    codec => "rubydebug"
  }
}
  1. 文件输出
output {
  file {
    path => "/var/log/logstash/%{host}/{application}"
    codec => line { format => "%{message}" }
  }
}
  1. Elasticsearch输出
output {
  elasticsearch {
    hosts => "localhost:9200"
    index => "logstash-%{+YYYY.MM.dd}"
  }
}
  1. Redis输出
output {
  redis {
    data_type => "list"
    host => "127.0.0.1"
    port => "6379"
    db => "1"
    password => "passwd"
    key => "redis_key"
  }
}

过滤器插件(Filter)

Grok过滤器

  • 语法:%{SYNTAX:SEMANTIC}
  • 示例:
filter {
  grok {
    match => {
      "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:path} %{NUMBER:bytes} %{NUMBER:duration}"
    }
  }
}

测试方法

  1. 命令行快速测试:
bin/logstash -e 'input { stdin { } } output { stdout {} }'
  1. 文件输入测试:
bin/logstash -e 'input { stdin { } } output { elasticsearch { hosts => ["127.0.0.1:9200"] } stdout { codec => rubydebug }}'

Kibana详解

关键配置项

配置项 描述 默认值
server.port Kibana端口 5601
server.host 后端服务器地址 0.0.0.0
elasticsearch.url Elasticsearch地址 http://localhost:9200
kibana.index Kibana索引名称 .kibana
logging.dest 日志输出目标 stdout

安装与配置

  1. 示例配置:
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://127.0.0.1:9200"]
kibana.index: ".kibana"
  1. 启动服务:
bin/kibana
  1. 访问地址:http://IP:5601

实战案例

1. 收集Apache日志

修改Apache配置:

LogFormat "{ \"@timestamp\": \"%{%Y-%m-%dT%H:%M:%S%z}t\", \"@version\": \"1\", \"tags\":[\"apache\"], \"message\": \"%h %l %u %t \\\"%r\\\" %>s %b\", \"clientip\": \"%a\", \"duration\": %D, \"status\": %>s, \"request\": \"%U%q\", \"urlpath\": \"%U\", \"urlquery\": \"%q\", \"bytes\": %B, \"method\": \"%m\", \"site\": \"%{Host}i\", \"referer\": \"%{Referer}i\", \"useragent\": \"%{User-agent}i\" }" json
CustomLog "logs/access_log" json

2. 收集Nginx日志

修改Nginx配置:

http {
    log_format json '{"@timestamp":"$time_iso8601",'
    '"@version":"1",'
    '"client":"$remote_addr",'
    '"url":"$uri",'
    '"status":"$status",'
    '"domain":"$host",'
    '"host":"$server_addr",'
    '"size":$body_bytes_sent,'
    '"responsetime":$request_time,'
    '"referer": "$http_referer",'
    '"ua": "$http_user_agent"'
    '}';
    
    server {
        access_log /var/log/nginx/access_json.log json;
    }
}

3. 收集用户历史命令

修改/etc/bashrc:

HISTDIR='/var/log/command.log'
if [ ! -f $HISTDIR ];then
    touch $HISTDIR
    chmod 666 $HISTDIR
fi
export HISTTIMEFORMAT="{\"TIME\":\"%F %T\",\"HOSTNAME\":\"$HOSTNAME\",\"LOGIN_IP\":\"$(who -u am i 2>/dev/null| awk '{print $NF}'|sed -e 's/[()]//g')\",\"LOGIN_USER\":\"$(who am i|awk '{print $1}')\",\"CURRENT_USER\":\"${USER}\",\"CMD\":\""
export PROMPT_COMMAND='history 1|tail -1|sed "s/^[ ]\+[0-9]\+ //"|sed "s/^[0-9]\+ //" | sed "s/var/log/command.log'

4. 收集SSH日志

修改SSH配置:

SyslogFacility local6

修改rsyslog配置:

local6.* /var/log/sshd.log

综合配置示例

input {
  file {
    path => "/var/log/nginx/access.log"
    type => "nginx"
    codec => "json"
  }
  file {
    path => "/var/log/httpd/access_log"
    type => "apache"
    codec => "json"
  }
  file {
    path => "/var/log/command.log"
    type => "history"
    codec => "json"
  }
  file {
    path => "/var/log/sshd.log"
    type => "ssh"
  }
}

filter {
  grok {
    match => [
      "message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host} %{DATA:syslog_program}(?:
$$
%{POSINT:pid}
$$
)? %{WORD:login} password for %{USERNAME:username} from %{IP:ip} %{GREEDYDATA:message}",
      "message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host} %{DATA:syslog_program}(?:
$$
%{POSINT:pid}
$$
)?: message repeated 2 times: 
$$
 %{WORD:login} password for %{USERNAME:username} from %{IP:ip} %{GREEDYDATA:message}",
      "message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host} %{DATA:syslog_program}(?:\[%{POSINT:pid}
$$
)? %{WORD:login} password for invalid user %{USERNAME:username} from %{IP:ip} %{GREEDYDATA:message}",
      "message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host} %{DATA:syslog_program}(?:
$$
%{POSINT:pid}
$$
)? %{WORD:login} %{WORD:auth_method} for %{USERNAME:username} from %{IP:ip} %{GREEDYDATA:message}"
    ]
  }
}

output {
  if [type] == "nginx" {
    elasticsearch {
      hosts => ["127.0.0.1:9200"]
      index => "nginx-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "apache" {
    elasticsearch {
      hosts => ["127.0.0.1:9200"]
      index => "apache-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "history" {
    elasticsearch {
      hosts => ["127.0.0.1:9200"]
      index => "history-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "ssh" {
    elasticsearch {
      hosts => ["127.0.0.1:9200"]
      index => "ssh-%{+YYYY.MM.dd}"
    }
  }
}

Windows日志收集

使用Winlogbeat收集Windows事件日志:

关键配置

winlogbeat.event_logs:
  - name: Application
    provider: 
      - Application Error
      - Application Hang
      - Windows Error Reporting
      - EMET
  - name: Security
    level: critical, error, warning
    event_id: 4624, 4625, 4634, 4672, 4720
  - name: System
    ignore_older: 168h

setup.kibana:
  host: "http://100.2.170.124:5601"

output.elasticsearch:
  hosts: ["100.2.170.124:9200"]

安装与启动

  1. 安装服务:
.\install-service-winlogbeat.ps1
.\winlogbeat.exe setup
  1. 启动服务:
net start winlogbeat

总结

ELK栈提供了完整的日志管理解决方案:

  1. Elasticsearch:高效存储和检索日志数据
  2. Logstash:灵活的数据收集和处理管道
  3. Kibana:强大的数据可视化和分析界面

通过合理配置,可以收集各种类型的日志(Apache、Nginx、系统日志、Windows事件日志等),并进行统一管理和分析。

ELK和集中式日志管理系统详解 前言 在规模较大的IT环境中,传统的日志分析方法(如grep、awk)效率低下,面临以下问题: 日志量太大难以归档 文本搜索速度慢 缺乏多维度查询能力 集中式日志管理系统通过将所有节点上的日志统一收集、管理和访问来解决这些问题。ELK提供了一整套开源解决方案,各组件完美衔接。 ELK架构概述 ELK是三个开源项目的首字母缩写: Elasticsearch :搜索和分析引擎 Logstash :服务器端数据处理管道,负责采集、转换和发送数据 Kibana :数据可视化平台 Elasticsearch详解 核心特性 实时分布式搜索分析引擎 高速、大规模数据检索能力 关键配置项 | 配置项 | 描述 | 默认值 | |--------|------|--------| | cluster.name | 集群名称 | elasticsearch | | node.master | 是否可作为主节点 | true | | node.name | 节点名称 | 自动获取 | | path.data | 索引数据存储路径 | es根目录下的data | | http.port | HTTP端口 | 9200 | | transport.tcp.port | 节点交互端口 | 9300 | | network.host | 绑定地址 | 0.0.0.0 | | index.number_ of_ replicas | 索引副本数 | 1 | | http.cors.enabled | 是否支持跨域 | false | 安装与测试 示例配置: 启动服务: 验证服务: 可视化工具 安装elasticsearch-head插件: 访问地址:http://localhost:9100/ Logstash详解 核心概念 管道结构 :必须包含input和output,可选filter 数据处理流程 :输入→过滤→输出 目录结构 | 目录 | 描述 | 默认路径 | |------|------|----------| | bin | 二进制脚本和插件 | {extract.path}/bin | | config | 配置文件 | {extract.path}/config | | logs | 日志文件 | {extract.path}/logs | | plugins | 插件目录 | {extract.path}/plugins | | data | 持久化数据 | {extract.path}/data | 输入插件(Input) 标准输入 : 文件输入 : Syslog输入 : Beats输入 : Redis输入 : 输出插件(Output) 标准输出 : 文件输出 : Elasticsearch输出 : Redis输出 : 过滤器插件(Filter) Grok过滤器 : 语法: %{SYNTAX:SEMANTIC} 示例: 测试方法 命令行快速测试: 文件输入测试: Kibana详解 关键配置项 | 配置项 | 描述 | 默认值 | |--------|------|--------| | server.port | Kibana端口 | 5601 | | server.host | 后端服务器地址 | 0.0.0.0 | | elasticsearch.url | Elasticsearch地址 | http://localhost:9200 | | kibana.index | Kibana索引名称 | .kibana | | logging.dest | 日志输出目标 | stdout | 安装与配置 示例配置: 启动服务: 访问地址:http://IP:5601 实战案例 1. 收集Apache日志 修改Apache配置: 2. 收集Nginx日志 修改Nginx配置: 3. 收集用户历史命令 修改/etc/bashrc: 4. 收集SSH日志 修改SSH配置: 修改rsyslog配置: 综合配置示例 Windows日志收集 使用Winlogbeat收集Windows事件日志: 关键配置 安装与启动 安装服务: 启动服务: 总结 ELK栈提供了完整的日志管理解决方案: Elasticsearch :高效存储和检索日志数据 Logstash :灵活的数据收集和处理管道 Kibana :强大的数据可视化和分析界面 通过合理配置,可以收集各种类型的日志(Apache、Nginx、系统日志、Windows事件日志等),并进行统一管理和分析。