引言

在维护ELK (Elasticsearch, Logstash, Kibana) 日志系统时,一个常见的线上事故是下游的Elasticsearch因磁盘空间耗尽而无法接收新的日志。这会导致上游的Filebeat和Logstash出现数据积压。当磁盘问题解决后,我们通常不希望重新处理这些积压的、可能已经失去时效性的日志,而是希望从当前时间点开始恢复日志采集。本文将详细介绍如何处理此类事故,核心在于让Filebeat跳过积压的日志。

事故场景:Elasticsearch磁盘写满导致日志积压

问题描述: 日志系统链路为 Filebeat -> Logstash -> Elasticsearch。Elasticsearch因磁盘满了,导致服务中断两小时。现在磁盘空间已清理,服务恢复,但Filebeat开始从两小时前中断的位置重新发送日志。我们希望丢弃这两个小时的积压日志,直接从当前时间开始采集。

核心解决方案:重置Filebeat的读取位置

问题的关键在于Filebeat会持久化记录每个日志文件的读取进度(offset)。即使下游服务中断,Filebeat也会在原地等待,一旦连接恢复,它会从上次记录的位置继续发送。要跳过积压的日志,我们必须重置这个进度记录。

第一步:暂停日志采集服务

在进行任何修改之前,务必先停止Filebeat和Logstash,防止在操作过程中产生新的状态文件或数据冲突。

1
2
3
4
5
# 停止Logstash
systemctl stop logstash

# 停止Filebeat
systemctl stop filebeat

第二步:清理Filebeat状态文件

Filebeat将其状态(包括每个文件的读取偏移量)记录在一个名为 registry 的目录中。删除这个目录是让Filebeat“忘记”旧进度的最直接方法。

  1. 定位 registry 目录
    该目录通常位于Filebeat的数据路径下,例如 /var/lib/filebeat/registry/usr/share/filebeat/data/registry。请根据你的实际安装情况确认路径。

  2. 删除状态文件

    1
    2
    3
    # 警告:此操作会使Filebeat丢失所有文件的读取记录
    # 请确认路径正确后再执行
    rm -rf /var/lib/filebeat/registry

效果:删除 registry 目录后,下次启动Filebeat时,它会认为所有日志文件都是第一次处理,并默认从文件的末尾开始读取新内容。这恰好满足了我们“跳过旧日志”的需求。

第三步:重启服务并验证

完成状态清理后,按顺序重启服务。

1
2
3
4
5
# 先启动Filebeat
systemctl start filebeat

# 再启动Logstash
systemctl start logstash

验证效果

  • 检查Filebeat日志:观察Filebeat的日志,确认它是否已经开始监控文件的新增内容,而不是从头读取。
    1
    tail -f /var/log/filebeat/filebeat.log
  • 检查Elasticsearch索引:在Kibana或通过API检查Elasticsearch,确认只有最新的日志被写入,没有旧的积压日志。
    1
    curl -X GET "http://localhost:9200/_cat/indices?v"

其他可选方案与注意事项

方案二:在Logstash中过滤旧日志

如果不想或不能直接删除Filebeat的 registry 文件,也可以在Logstash层面添加一个过滤器,丢弃时间戳早于某个时间点的日志。

在Logstash的 filter 配置中添加以下ruby过滤器:

1
2
3
4
5
6
7
8
9
10
11
12
filter {
ruby {
# 丢弃2小时(7200秒)前的日志
code => "
now = Time.now.to_i
event_time = event.get('[@timestamp]').to_i
if event_time < (now - 7200)
event.cancel
end
"
}
}

缺点:这种方法会增加Logstash的处理负担,因为它需要接收并判断每一条积压的日志,然后才能丢弃。相比之下,直接在Filebeat层面解决效率更高。

注意事项

  • 备份:在执行 rm 等破坏性操作前,最好先备份 registry 目录,以防操作失误需要回滚。
  • 测试日志:如果Filebeat重启后没有立即发送新日志,可能是因为被监控的日志文件没有新内容写入。你可以手动追加一行测试日志来触发采集。
    1
    echo "test log entry" >> /path/to/your/app.log
  • 磁盘监控与ILM:为避免类似事故再次发生,务必为Elasticsearch配置磁盘使用率告警,并设置索引生命周期管理(ILM)策略来自动管理旧索引,防止磁盘被写满。

总结

处理因下游服务中断导致的Filebeat日志积压问题,最有效的方法是停止Filebeat,删除其registry状态目录,然后重启服务。这能让Filebeat直接从日志文件的末尾开始采集,从而跳过所有积压的日志。虽然也可以在Logstash层面过滤,但从源头(Filebeat)解决是更推荐的最佳实践。

1
2
systemctl stop logstash
systemctl stop filebeat

修改 Filebeat 读取状态

Filebeat 会记录日志文件的读取偏移量(通过 .filestate.filebeat.yaml 文件),需要手动更新或删除这些状态以跳过旧日志。

方法 1:删除状态文件(推荐)

找到 Filebeat 的数据目录,通常位于 /var/lib/filebeat,然后删除状态文件:

1
2
3
# 路径需要复查下
# /usr/share/filebeat/data/registry or /var/lib/filebeat/registry
rm -rf /var/lib/filebeat/registry

注意:删除状态文件后,Filebeat 会从当前日志文件的末尾开始读取。

方法 2:手动修改状态文件

如果不想删除状态文件,可以手动修改文件中的偏移量,使 Filebeat 从日志文件末尾开始读取:

1
2
3
# 路径需要复查下
# /usr/share/filebeat/data/registry or /var/lib/filebeat/registry
nano /var/lib/filebeat/registry/filebeat

找到相关日志文件的偏移量,将其更新为文件的最后偏移值。

配置 Logstash 丢弃旧日志(可选)

在 Logstash 的配置中,可以基于 [@timestamp] 丢弃旧日志,确保两个小时前的数据不被处理。

修改 Logstash 配置文件

在 Logstash 的 filter 部分添加以下逻辑:

1
2
3
4
5
6
7
8
9
10
11
filter {
ruby {
code => "
now = Time.now.to_i
event_time = event.get('[@timestamp]').to_i
if event_time < (now - 7200) # 7200秒 = 2小时
event.cancel
end
"
}
}

这样,Logstash 会自动丢弃两个小时以前的日志。

重启服务

完成上述修改后,重启服务:

1
2
systemctl start filebeat
systemctl start logstash

验证效果

  • 查看 Filebeat 读取状态
    确保 Filebeat 从日志文件末尾开始读取:
1
tail -f /var/log/filebeat/filebeat.log

检查日志文件中是否有偏移量信息,确保偏移量为最新值。

  • 检查 Logstash 是否丢弃旧日志
    在 Logstash 的日志中检查是否有丢弃的日志记录:
1
tail -f /var/log/logstash/logstash-plain.log
  • 验证 Elasticsearch 索引
    通过以下命令检查是否有新的日志写入到 Elasticsearch:
1
curl -X GET "http://localhost:9200/_cat/indices?v"

注意事项

  • 日志文件的偏移量 如果日志文件未追加新内容,可能导致 Filebeat 卡住。可以手动追加一条日志到文件,确保 Filebeat 开始工作:
1
2
# /path/to/logfile.log为具体抓取日志路径
echo "Dummy log for testing" >> /path/to/logfile.log
  • 备份数据 修改 Filebeat 或 Logstash 配置前,建议备份相关文件,以便恢复。

  • 清理策略 确保 Elasticsearch 磁盘空间充足,必要时配置索引生命周期管理(ILM)以定期清理旧数据。