Skip to content

Kafka/Redis + ELFK分布式日志收集

Filebeat --> Kafka --> Logstash --> ES Cluster --> Kibana/Grafana

Filebeat --> Redis --> Logstash --> ES Cluster --> Kibana/Grafana

使用背景

由于ELFK的局限性,随着 Beats 收集的每秒数据量越来越大,Logstash 可能无法承载这么大量日志的处理。虽然说,可以增加 Logstash 节点数量,提高每秒数据的处理速度,但是仍需考虑可能Elasticsearch无法承载这么大量的日志的写入。此时,我们可以考虑 引入消息队列Kafka),进行缓存。

Filebeat --> Kafka/Redis

# vim /usr/local/filebeat-7.7.1-linux-x86_64/filebeat.yml
filebeat.inputs:
- type: log
  paths:
    - /usr/local/apache-tomcat-9.0.34/logs/tomcat_access_log.*.log
  #fields:
  #  log_source: messages
  #fields_under_root: true
  
output.kafka:
  hosts: ["192.168.0.108:9092"]
  topic: tomcat
  partition.round_robin:
    reachable_only: false
# vim /usr/local/filebeat-7.7.1-linux-x86_64/filebeat.yml
filebeat.inputs:
- type: log
  paths:
    - /usr/local/apache-tomcat-9.0.34/logs/tomcat_access_log.*.log
  #fields:
  #  log_source: messages
  #fields_under_root: true
  
output.kafka:
  hosts: ["192.168.0.108:9092"]
  topic: tomcat
  partition.round_robin:
    reachable_only: false

输出到redis:

# filebeat.inputs 内容和上面类似...
# 简单输出示例
output.redis:
  hosts: ["localhost"]		# 如果是集群, 则需要添加多个
  # port: 6379
  password: "my_password"
  key: "filebeat"
  db: 0
  timeout: 5
  
# output.redis:
#   hosts: ["localhost"]
#   key: "default_list"
#   keys:
#     - key: "info_list"   # send to info_list if `message` field contains INFO
#       when.contains:
#         message: "INFO"
#     - key: "debug_list"  # send to debug_list if `message` field contains DEBUG
#       when.contains:
#         message: "DEBUG"
#     - key: "%{[fields.list]}"
#       mappings:
#         http: "frontend_list"
#         nginx: "frontend_list"
#          mysql: "backend_list"
# filebeat.inputs 内容和上面类似...
# 简单输出示例
output.redis:
  hosts: ["localhost"]		# 如果是集群, 则需要添加多个
  # port: 6379
  password: "my_password"
  key: "filebeat"
  db: 0
  timeout: 5
  
# output.redis:
#   hosts: ["localhost"]
#   key: "default_list"
#   keys:
#     - key: "info_list"   # send to info_list if `message` field contains INFO
#       when.contains:
#         message: "INFO"
#     - key: "debug_list"  # send to debug_list if `message` field contains DEBUG
#       when.contains:
#         message: "DEBUG"
#     - key: "%{[fields.list]}"
#       mappings:
#         http: "frontend_list"
#         nginx: "frontend_list"
#          mysql: "backend_list"

启动 Kafka/Redis

这里为了方便起见,我使用docker-compose脚本来进行快速部署,脚本内容参见[使用 docker-compose 部署 Kafka](../Kafka/使用 docker-compose 部署 Kafka.md)

如果你已经安装好了kafka-manager, 可以在面板上看到相关topic已经创建, 并且写入数据之后, 已经有了分区偏移量, 如下图所示:

检测filebeat是否将日志传递给kafka:

bin/kafka-console-consumer.sh --zookeeper 192.168.0.108:2181 --topic tomcat --from-beginning
bin/kafka-console-consumer.sh --zookeeper 192.168.0.108:2181 --topic tomcat --from-beginning

启动redis:

systemctl start redis
systemctl start redis

进入redis交互式客户端进行查看(由于信息太长, 在此进行缩略显示):

127.0.0.1:6379> keys *
1) "filebeat"
127.0.0.1:6379> LPOP filebeat
"{\"@timestamp\":\"2020-07-15T14:52:07.315Z\",\"@metadata\":
...
`"GET / HTTP/1.1`\\\",\\\"status\\\":\\\"200\\\",\\\"SendBytes\\\":\\\"11216\\\",\\\"Query?string\\\":\\\"\\\",\\\"partner\\\":\\\"-\\\",\\\"AgentVersion\\\":\\\"curl/7.29.0\\\"}\",\"input\":{\"type\":\"log\"},\"host\":
...
127.0.0.1:6379> LPOP filebeat
"{\"@timestamp\":\"2020-07-15T14:52:08.339Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"7.7.1\"},\"agent\":
...
`"GET/test/index.html`HTTP/1.1\\\",\\\"status\\\":\\\"200\\\",\\\"SendBytes\\\":\\\"19\\\",\\\"Query?string\\\":\\\"\\\",\\\"partner\\\":\\\"-\\\",\\\"AgentVersion\\\":\\\"curl/7.29.0\\\"}\",\"log\":{\"offset\":234,\"file\":{\"path\":\``"/usr/local/apache-tomcat-9.0.34/logs/tomcat_access_log.2020-07-15.log`
...

127.0.0.1:6379> keys *
1) "filebeat"
127.0.0.1:6379> LPOP filebeat
"{\"@timestamp\":\"2020-07-15T14:52:07.315Z\",\"@metadata\":
...
`"GET / HTTP/1.1`\\\",\\\"status\\\":\\\"200\\\",\\\"SendBytes\\\":\\\"11216\\\",\\\"Query?string\\\":\\\"\\\",\\\"partner\\\":\\\"-\\\",\\\"AgentVersion\\\":\\\"curl/7.29.0\\\"}\",\"input\":{\"type\":\"log\"},\"host\":
...
127.0.0.1:6379> LPOP filebeat
"{\"@timestamp\":\"2020-07-15T14:52:08.339Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"7.7.1\"},\"agent\":
...
`"GET/test/index.html`HTTP/1.1\\\",\\\"status\\\":\\\"200\\\",\\\"SendBytes\\\":\\\"19\\\",\\\"Query?string\\\":\\\"\\\",\\\"partner\\\":\\\"-\\\",\\\"AgentVersion\\\":\\\"curl/7.29.0\\\"}\",\"log\":{\"offset\":234,\"file\":{\"path\":\``"/usr/local/apache-tomcat-9.0.34/logs/tomcat_access_log.2020-07-15.log`
...

Kafka/Redis --> Logstash --> ElasticSearch

kafka输入:

# vim /usr/local/logstash-7.7.1/config/kafka2es.conf
input {
    kafka {
    codec => "json"
    topics => ["tomcat"]
    bootstrap_servers => "192.168.0.108:9092"
    auto_offset_reset => "latest"
    group_id => "logstash-g1"
    }
}
output {
    elasticsearch {
    hosts => "http://192.168.0.108:9200"
    index => "tomcat-%{+YYYY.MM.dd}"
	}
}
# vim /usr/local/logstash-7.7.1/config/kafka2es.conf
input {
    kafka {
    codec => "json"
    topics => ["tomcat"]
    bootstrap_servers => "192.168.0.108:9092"
    auto_offset_reset => "latest"
    group_id => "logstash-g1"
    }
}
output {
    elasticsearch {
    hosts => "http://192.168.0.108:9200"
    index => "tomcat-%{+YYYY.MM.dd}"
	}
}

redis输入:

input {
  redis {
    host => "localhost"
    port => "6379"
    db => "0"
    key => "filebeat"
    data_type => "list"
    password => ""
    codec  => "json"
 	}
}

# output 内容与 kafka 相似.;.
input {
  redis {
    host => "localhost"
    port => "6379"
    db => "0"
    key => "filebeat"
    data_type => "list"
    password => ""
    codec  => "json"
 	}
}

# output 内容与 kafka 相似.;.

启动eslogstash之后, 打开浏览器, 使用插件访问elasticsearch的web管理页面, 查看是否已经收集到tomcat日志信息.

打开Kibana管理界面, 添加索引, 并在Dashboard中查看来自Logstash的日志.

参考链接