Beats: Filebeat 和 pipeline processors

本文介绍如何使用Elasticsearch的Pipeline功能对Filebeat收集的日志数据进行预处理,包括删除和添加字段,展示了如何定义和应用Pipeline。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

我们知道我们可以使用 Filebeat 很方便地把我们的 log 数据收集进来并直接写入到我们的 Elasticsearch 之中。

 

就像我们上面的这个图显示的一样。这样我们就不需要另外一个 Logstash 的部署了。Logstash 可以很方便地帮我们对数据进行处理,比如对数据进行转换, 丰富数据等等。有一种情况,我们不想部署自己的 Logstash,但是我们还是像对我们的数据进行一些处理,那么我们该怎么办?我们其实可以利用 ingest node 所提供的 Pipeline 帮我们对数据进行处理。

Ingest node

如果大家还不知道如何配置我们的 node 为一个 ingest node 的话,可以参阅我之前的文章 “Elasticsarch中的一些重要概念:cluster, node, index, document, shards及replica”。我们需要在 Elasticsearch 中的配置文件 elasticsearch.yml 文件中配置:

node.ingest: true

ingest node 提供了在对文档建立索引之前对其进行预处理的功能:

  • 接收节点拦截索引或批量 API 请求
  • 运用转换(transformation)
  • 将文档传递回索引或批量 API

 

什么是pipeline呢?

如果大家想对pipleline有更多的了解,请参阅我的文章“如何在Elasticsearch中使用pipeline API来对事件进行处理”。

一个pipleline就是一套处理器:

  • 一个processor就像是Logstash里的一个filter
  • 拥有对通过管道(pipeline)的文档的读写权限

 

那么 Elastic 到底提供了哪些 processor 呢?我们可以参阅 Elastic 的官方文档,我们可以看到许多的 pocessors 可以被利用。

 

 

大家如果有兴趣的话,请查阅我们的官方文档做更一步的阅读。

定义一个 Pipleline

定义一个 Pipeline 也是非常直接的,你可以使用 PUT 命令配合 Ingest API 来操作。它是存在于 cluster state 里的。

PUT _ingest/pipeline/my-pipeline-id
{
  "description": "DESCRIPTION",
  "processors": [
    {
      ...
    }
  ],
  "on_failure": [
    {
      ...
    }
  ]
}

这里my-pipleline-id是我们自己命令的在该cluster唯一标识是的pipleline ID。在里面,我们可以定义我们喜欢的processors数组。在处理失败后,我们也可以定义相应的processors来完成。

 

例子:

在今天的这个例子里,我们来使用 Filebeat 来读取一个 log 文件,并使用 processors 对这个 log 的数据进行处理。

准备数据

我们在网址 https://logz.io/sample-data 下载一个叫做 apache-daily-access.lo g的 log 文件。我们用 Atom 编辑器打开这个文件,显示有 17279 条数据:

每一条的数据是这样的格式:

20.168.183.41 - - [11/Sep/2019:00:00:05 +0000] "GET /category/health HTTP/1.1" 200 132 "/item/software/623" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"

 

配置 Filebeat

如果大家还不知如何安装自己的 Filebeat 的话,请参阅我之前的文章 “Beats:通过 Filebeat 把日志传入到 Elasticsearch”。

 

为了能够 filebeat 把这个 log 数据传输到 Elasticsearch,我们可以使用如下的配置文件。我们创建一个叫做 filebeat_processor.yml 文件:

filebeat.inputs:
- type: log
  enabled: true
  fields:
    apache: true
  paths:
    - /Users/liuxg/data/apache-daily-access.log
 
output.elasticsearch:
    hosts: ["localhost:9200"]
    pipeline: "my_pipeline_id"

在这里请注意,我把上面下载的 apache-daily-access.log 文件置于我自己电脑的 /User/liuxg/data 目录下。在你自己做练习的时候,需要根据自己的文件路径进行调整。在这里,我们使用了一个叫做 my_pipleline_id 的 pipeline。它的定义如下:

PUT _ingest/pipeline/my_pipeline_id
{
  "description": "Drop ECS field and add one new field",
  "processors": [
    {
      "remove": {
        "field": "ecs"
      },
      "set": {
        "field": "added_field",
        "value": 0
      }
    }
  ]
}

在上面,我们定义了两个 processor: remove 及 set。一个是删除一个叫做 ecs 的项,另外一个是添加一个叫做 added_field 的项,并把它的值设置为0。

在正常的情况下,如果在我们的配置文件中没有定义那个 pipleline 的情况下,那么他们的结果是:

      {
        "_index" : "filebeat-7.3.0-2019.09.11-000001",
        "_type" : "_doc",
        "_id" : "637VIG0BJD_DqHjgqvC5",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2019-09-11T14:58:55.902Z",
          "message" : """144.228.123.71 - - [11/Sep/2019:01:52:35 +0000] "GET /category/games HTTP/1.1" 200 117 "/search/?c=Books+Software" "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11"""",
          "input" : {
            "type" : "log"
          },
          "fields" : {
            "apache" : true
          },
          "ecs" : {
            "version" : "1.0.1"
          },
          "host" : {
            "name" : "localhost"
          },
          "agent" : {
            "hostname" : "localhost",
            "id" : "c88813ba-fdea-4a98-a0be-468fb53566f3",
            "version" : "7.3.0",
            "type" : "filebeat",
            "ephemeral_id" : "ec3328d6-f7f0-4134-a2b6-8ff0c5141cc5"
          },
          "log" : {
            "offset" : 300352,
            "file" : {
              "path" : "/Users/liuxg/data/apache-daily-access.log"
            }
          }
        }
      }

你可以参考我的文章 “Beats:通过Filebeat把日志传入到Elasticsearch” 来查看这个结果。显然这里是有 ecs 这个 field 的。

 

运行 Filebeat

接下来,我们在 Filebeat 的安装目录,运行如下的命令:

./filebeat -c filebeat_processor.yml

我们在 Kibana 中可以通过如下的命令来查看:

GET _cat/indices?v

显然我们已经看到了一个已经生产的以 filebeat 为开头的文件名。我们可以通过如下的命令来查看它的数据:

GET filebeat-7.4.2/_search

那么其中的一个文档的 soure 是这样的:

       "_source" : {
          "agent" : {
            "hostname" : "localhost",
            "id" : "45832d40-b664-466b-a523-3bc58890ea50",
            "type" : "filebeat",
            "ephemeral_id" : "dbbba131-9c33-4e82-a00a-9e8e09d3e799",
            "version" : "7.4.2"
          },
          "log" : {
            "file" : {
              "path" : "/Users/liuxg/data/apache-daily-access.log"
            },
            "offset" : 11497
          },
          "message" : """164.51.31.185 - - [11/Sep/2019:00:04:15 +0000] "GET /item/giftcards/232 HTTP/1.1" 200 130 "/category/electronics" "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11"""",
          "input" : {
            "type" : "log"
          },
          "@timestamp" : "2019-11-23T13:11:57.478Z",
          "host" : {
            "name" : "localhost"
          },
          "fields" : {
            "apache" : true
          },
          "added_field" : 0
        }

显然 ecs 这个 field 已经不见了,而另外一个叫做 added_field 新的 field 被成功添加进来了。这个说明我们的 pipleline 是起作用的。

评论 2
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值