问题1

日志系统链路:filebeat -> logdash -> elasticsearch,然后es两个小时前因磁盘满了,导致无法正常传输。现在将磁盘清理出空间后,日志可以正常导入了,但是是从两个小时前开始的,现希望这两个小时的日志不要了,直接从当前时间导

在你的日志系统中,如果你希望丢弃 Elasticsearch 停止工作的这两个小时的日志,而从当前时间开始继续导入日志,可以按照以下步骤操作:

暂停当前日志处理

在修改配置前,暂停 Logstash 和 Filebeat 服务,防止新的日志继续处理:

1
2
systemctl stop logstash
systemctl stop filebeat

修改 Filebeat 读取状态

Filebeat 会记录日志文件的读取偏移量(通过 .filestate.filebeat.yaml 文件),需要手动更新或删除这些状态以跳过旧日志。

方法 1:删除状态文件(推荐)

找到 Filebeat 的数据目录,通常位于 /var/lib/filebeat,然后删除状态文件:

1
2
3
# 路径需要复查下
# /usr/share/filebeat/data/registry or /var/lib/filebeat/registry
rm -rf /var/lib/filebeat/registry

注意:删除状态文件后,Filebeat 会从当前日志文件的末尾开始读取。

方法 2:手动修改状态文件

如果不想删除状态文件,可以手动修改文件中的偏移量,使 Filebeat 从日志文件末尾开始读取:

1
2
3
# 路径需要复查下
# /usr/share/filebeat/data/registry or /var/lib/filebeat/registry
nano /var/lib/filebeat/registry/filebeat

找到相关日志文件的偏移量,将其更新为文件的最后偏移值。

配置 Logstash 丢弃旧日志(可选)

在 Logstash 的配置中,可以基于 [@timestamp] 丢弃旧日志,确保两个小时前的数据不被处理。

修改 Logstash 配置文件

在 Logstash 的 filter 部分添加以下逻辑:

1
2
3
4
5
6
7
8
9
10
11
filter {
ruby {
code => "
now = Time.now.to_i
event_time = event.get('[@timestamp]').to_i
if event_time < (now - 7200) # 7200秒 = 2小时
event.cancel
end
"
}
}

这样,Logstash 会自动丢弃两个小时以前的日志。

重启服务

完成上述修改后,重启服务:

1
2
systemctl start filebeat
systemctl start logstash

验证效果

  • 查看 Filebeat 读取状态
    确保 Filebeat 从日志文件末尾开始读取:
1
tail -f /var/log/filebeat/filebeat.log

检查日志文件中是否有偏移量信息,确保偏移量为最新值。

  • 检查 Logstash 是否丢弃旧日志
    在 Logstash 的日志中检查是否有丢弃的日志记录:
1
tail -f /var/log/logstash/logstash-plain.log
  • 验证 Elasticsearch 索引
    通过以下命令检查是否有新的日志写入到 Elasticsearch:
1
curl -X GET "http://localhost:9200/_cat/indices?v"

注意事项

  • 日志文件的偏移量 如果日志文件未追加新内容,可能导致 Filebeat 卡住。可以手动追加一条日志到文件,确保 Filebeat 开始工作:
1
2
# /path/to/logfile.log为具体抓取日志路径
echo "Dummy log for testing" >> /path/to/logfile.log
  • 备份数据 修改 Filebeat 或 Logstash 配置前,建议备份相关文件,以便恢复。

  • 清理策略 确保 Elasticsearch 磁盘空间充足,必要时配置索引生命周期管理(ILM)以定期清理旧数据。