Elasticsearch:ES|QL 入门 - Python notebook

截止今天,ES|QL 已经在 8.14 中正式发布。请详细阅读文章 “Elasticsearch 管道查询语言 ES|QL 现已正式发布”。在今天的文章中,我们将使用 Jupyter notebook 来展示使用 Elastic 发布的 Elasticsearch Python 客户端来访问 Elasticsearch 并针对 ES|QL 来进行查询。你将学习如何:

  • 运行 ES|QL 查询
  • 使用处理命令
  • 对表格进行排序
  • 查询数据
  • 链式处理命令
  • 计算值
  • 计算统计数据
  • 访问列
  • 创建直方图
  • 丰富数据

安装

Elasticsearch 及 Kibana

 如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参考如下的链接来进行安装:

在安装的时候,我们选择 Elastic Stack 8.x 来进行安装。特别值得指出的是:ES|QL 只在 Elastic Stack 8.11 及以后得版本中才有。你需要下载 Elastic Stack 8.11 及以后得版本来进行安装。

在首次启动 Elasticsearch 的时候,我们可以看到如下的输出:

我们需要记下 Elasticsearch 超级用户 elastic 的密码。我们可以在它的安装目录中找到相应的 Elasticsearch 证书:

$ pwd
/Users/liuxg/elastic/elasticsearch-8.14.0/config/certs
$ ls
http.p12      http_ca.crt   transport.p12

我们还需要安装 Elasticsearch 的 python 依赖包:

pip3 install elasticsearch==8.14.0 python-dotenv

我们可以通过如下的命令来查看它的版本:

$ pip3 list | grep elasticsearch
elasticsearch               8.14.0

下载代码

我们按照如下的方式来下载代码:

git clone https://github.com/liu-xiao-guo/elasticsearch-labs

有关 esql 的代码在 notebooks 下的 esql 目录下:

$ pwd
/Users/liuxg/python/elasticsearch-labs/notebooks/esql
$ ls
esql-getting-started.ipynb

为了能够使得我们连接到自己部署的 Elasticsearch 集群中,我们可以拷贝 Elasticsearch 的证书:

$ pwd
/Users/liuxg/python/elasticsearch-labs/notebooks/esql
$ ls
esql-getting-started.ipynb
$ cp ~/elastic/elasticsearch-8.14.0/config/certs/http_ca.crt .
$ ls
esql-getting-started.ipynb http_ca.crt

创建环境变量

我们打开一个 terminal,并打入如下的命令:

export ES_ENDPOINT="localhost"
export ES_USER="elastic"
export ES_PASSWORD="Xw4_Nohry-LgaOum6oh-"

我们需要根据自己的 Elasticsearch 配置来对上面的配置进行修改。

代码演示

在我们进行下面的 notebook 展示之前,我们在当前的 terminal 中打入如下的命令:

jupyter notebook esql-getting-started.ipynb 

导入模块

# Import packages
from dotenv import load_dotenv
import os
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

初始化 Elasticsearch 客户端

load_dotenv()
 
ES_USER = os.getenv("ES_USER")
ES_PASSWORD = os.getenv("ES_PASSWORD")
ES_ENDPOINT = os.getenv("ES_ENDPOINT")
 
url = f"https://{ES_USER}:{ES_PASSWORD}@{ES_ENDPOINT}:9200"
print(url)
 
client = Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)
print(client.info())

上面显示,我们的客户端连接到 Elasticsearch 是成功的。更多关于如何连接到 Elasticsearch 的知识,请详细参阅文章 “Elasticsearch:关于在 Python 中使用 Elasticsearch 你需要知道的一切 - 8.x”。

添加示例数据到 Elasticsearch

在为示例数据集建立索引之前,让我们使用正确的映射创建一个名为 sample_data 的索引。

index_name = "sample_data"

mappings = {
    "mappings": {
        "properties": {"client_ip": {"type": "ip"}, "message": {"type": "keyword"}}
    }
}

# Create the index
if not client.indices.exists(index=index_name):
    client.indices.create(index=index_name, body=mappings)

在运行完上面的代码后,我们可以在 Kibana 中找到已经被创建的索引 sample_data:

现在我们可以使用 Elasticsearch Python 客户端的 bulk helpers 对数据进行索引。

# Documents to be indexed
documents = [
    {
        "@timestamp": "2023-10-23T12:15:03.360Z",
        "client_ip": "172.21.2.162",
        "message": "Connected to 10.1.0.3",
        "event_duration": 3450233,
    },
    {
        "@timestamp": "2023-10-23T12:27:28.948Z",
        "client_ip": "172.21.2.113",
        "message": "Connected to 10.1.0.2",
        "event_duration": 2764889,
    },
    {
        "@timestamp": "2023-10-23T13:33:34.937Z",
        "client_ip": "172.21.0.5",
        "message": "Disconnected",
        "event_duration": 1232382,
    },
    {
        "@timestamp": "2023-10-23T13:51:54.732Z",
        "client_ip": "172.21.3.15",
        "message": "Connection error",
        "event_duration": 725448,
    },
    {
        "@timestamp": "2023-10-23T13:52:55.015Z",
        "client_ip": "172.21.3.15",
        "message": "Connection error",
        "event_duration": 8268153,
    },
    {
        "@timestamp": "2023-10-23T13:53:55.832Z",
        "client_ip": "172.21.3.15",
        "message": "Connection error",
        "event_duration": 5033755,
    },
    {
        "@timestamp": "2023-10-23T13:55:01.543Z",
        "client_ip": "172.21.3.15",
        "message": "Connected to 10.1.0.1",
        "event_duration": 1756467,
    },
]

# Prepare the actions for the bulk API using list comprehension
actions = [{"_index": index_name, "_source": doc} for doc in documents]

# Perform the bulk index operation and capture the response
success, failed = bulk(client, actions)

if failed:
    print(f"Some documents failed to index: {failed}")
else:
    print(f"Successfully indexed {success} documents.")

上面的响应为:

Successfully indexed 7 documents.

我们可以在 Kibana 中进行查看:

上面显示,有 7 个文档已经被写入。

我们使用如下的代码来抑制 “default limit of [500] that pollute responses” 的警告:

# Suppress specific Elasticsearch warnings about default limit of [500] that pollute responses

import warnings
from elasticsearch import ElasticsearchWarning

warnings.filterwarnings("ignore", category=ElasticsearchWarning)

接下来,我们定义一个函数来格式化响应以便能更好的地阅读响应:

# Format response to return human-readable tables


def format_response(response_data):
    column_names = [col["name"] for col in response_data["columns"]]
    column_widths = [
        max(
            len(name),
            max(
                (
                    len(str(row[i]) if row[i] is not None else "None")
                    for row in response_data["values"]
                ),
                default=0,
            ),
        )
        for i, name in enumerate(column_names)
    ]
    row_format = " | ".join(["{:<" + str(width) + "}" for width in column_widths])
    print(row_format.format(*column_names))
    print("-" * sum(column_widths) + "-" * (len(column_widths) - 1) * 3)
    for row in response_data["values"]:
        # Convert None values in the row to "None" before formatting
        formatted_row = [(str(cell) if cell is not None else "None") for cell in row]
        print(row_format.format(*formatted_row))

第一个 ES|QL 查询

是时候我们操作第一个 ES|QL 的查询了。

每个 ES|QL 查询都以 source command (源命令)开头。源命令会生成一个表,通常包含来自 Elasticsearch 的数据。

FROM source 命令返回一个表,其中包含来自数据流、索引或别名的文档。结果表中的每一行代表一个文档。此查询从 sample_data 索引中返回最多 500 个文档:

esql_query = "FROM sample_data"

response = client.esql.query(query=esql_query)
format_response(response)
@timestamp               | client_ip    | event_duration | message              
--------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233        | Connected to 10.1.0.3
2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889        | Connected to 10.1.0.2
2023-10-23T13:33:34.937Z | 172.21.0.5   | 1232382        | Disconnected         
2023-10-23T13:51:54.732Z | 172.21.3.15  | 725448         | Connection error     
2023-10-23T13:52:55.015Z | 172.21.3.15  | 8268153        | Connection error     
2023-10-23T13:53:55.832Z | 172.21.3.15  | 5033755        | Connection error     
2023-10-23T13:55:01.543Z | 172.21.3.15  | 1756467        | Connected to 10.1.0.1

每列对应一个字段,并且可以通过该字段的名称进行访问。

ℹ️ ES|QL 关键字不区分大小写。 FROM sample_data 与 froms ample_data 相同。

处理命令

源命令后面可以跟一个或多个处理命令,用竖线字符分隔:|。处理命令通过添加、删除或更改行和列来更改输入表。处理命令可以执行过滤、投影、聚合等。

例如,你可以使用 LIMIT 命令来限制返回的行数,最多为 10,000 行:

esql_query = """
FROM sample_data
| LIMIT 3
"""

response = client.esql.query(query=esql_query)
format_response(response)
@timestamp               | client_ip    | event_duration | message              
--------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233        | Connected to 10.1.0.3
2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889        | Connected to 10.1.0.2
2023-10-23T13:33:34.937Z | 172.21.0.5   | 1232382        | Disconnected      

对表格进行排序

另一个处理命令是 SORT 命令。默认情况下,FROM 返回的行没有定义的排序顺序。使用 SORT 命令对一列或多列上的行进行排序:

esql_query = """
FROM sample_data
| SORT @timestamp DESC
"""

response = client.esql.query(query=esql_query)
format_response(response)
@timestamp               | client_ip    | event_duration | message              
--------------------------------------------------------------------------------
2023-10-23T13:55:01.543Z | 172.21.3.15  | 1756467        | Connected to 10.1.0.1
2023-10-23T13:53:55.832Z | 172.21.3.15  | 5033755        | Connection error     
2023-10-23T13:52:55.015Z | 172.21.3.15  | 8268153        | Connection error     
2023-10-23T13:51:54.732Z | 172.21.3.15  | 725448         | Connection error     
2023-10-23T13:33:34.937Z | 172.21.0.5   | 1232382        | Disconnected         
2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889        | Connected to 10.1.0.2
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233        | Connected to 10.1.0.3

查询数据

使用 WHERE 命令来查询数据。例如,要查找持续时间超过 5 毫秒的所有事件:

esql_query = """
FROM sample_data
| WHERE event_duration > 5000000
"""

response = client.esql.query(query=esql_query)
format_response(response)
@timestamp               | client_ip   | event_duration | message         
--------------------------------------------------------------------------
2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153        | Connection error
2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755        | Connection error

WHERE 支持多个运算符

例如,你可以使用 LIKE 对消息列运行通配符查询:

esql_query = """
FROM sample_data
| WHERE message LIKE "Connected*"
"""

response = client.esql.query(query=esql_query)
format_response(response)
@timestamp               | client_ip    | event_duration | message              
--------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233        | Connected to 10.1.0.3
2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889        | Connected to 10.1.0.2
2023-10-23T13:55:01.543Z | 172.21.3.15  | 1756467        | Connected to 10.1.0.1

更多处理命令

还有许多其他处理命令,例如用于保留或删除列的 KEEP 和 DROP、用于使用 Elasticsearch 中索引的数据丰富表的 ENRICH 以及用于处理数据的 DISSECT 和 GROK。有关概述,请参阅处理命令

链式处理命令

你可以链接处理命令,并用竖线字符分隔:|。每个处理命令都作用于前一个命令的输出表。查询的结果是最终处理命令生成的表。

以下示例首先根据 @timestamp 对表进行排序,然后将结果集限制为 3 行:

esql_query = """
FROM sample_data
| SORT @timestamp DESC
| LIMIT 3
"""

response = client.esql.query(query=esql_query)
format_response(response)
@timestamp               | client_ip   | event_duration | message              
-------------------------------------------------------------------------------
2023-10-23T13:55:01.543Z | 172.21.3.15 | 1756467        | Connected to 10.1.0.1
2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755        | Connection error     
2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153        | Connection error     

ℹ️ 处理命令的顺序很重要。首先将结果集限制为 3 行,然后再对这 3 行进行排序,很可能会返回与此示例不同的结果,其中排序在限制之前。

计算值

使用 EVAL 命令将包含计算值的列追加到表中。例如,以下查询附加到 duration_ms 列。该列中的值是通过将 event_duration 除以 1,000,000 计算得出的。换句话说: event_duration 从纳秒转换为毫秒。

esql_query = """
FROM sample_data
| EVAL duration_ms = event_duration/1000000.0
"""

response = client.esql.query(query=esql_query)
format_response(response)
@timestamp               | client_ip    | event_duration | message               | duration_ms
----------------------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233        | Connected to 10.1.0.3 | 3.450233   
2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889        | Connected to 10.1.0.2 | 2.764889   
2023-10-23T13:33:34.937Z | 172.21.0.5   | 1232382        | Disconnected          | 1.232382   
2023-10-23T13:51:54.732Z | 172.21.3.15  | 725448         | Connection error      | 0.725448   
2023-10-23T13:52:55.015Z | 172.21.3.15  | 8268153        | Connection error      | 8.268153   
2023-10-23T13:53:55.832Z | 172.21.3.15  | 5033755        | Connection error      | 5.033755   
2023-10-23T13:55:01.543Z | 172.21.3.15  | 1756467        | Connected to 10.1.0.1 | 1.756467

EVAL 支持多种函数。例如,要将数字四舍五入为最接近指定位数的数字,请使用 ROUND 函数:

esql_query = """
FROM sample_data
| EVAL duration_ms = ROUND(event_duration/1000000.0, 1)
"""

response = client.esql.query(query=esql_query)
format_response(response)
@timestamp               | client_ip    | event_duration | message               | duration_ms
----------------------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233        | Connected to 10.1.0.3 | 3.5        
2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889        | Connected to 10.1.0.2 | 2.8        
2023-10-23T13:33:34.937Z | 172.21.0.5   | 1232382        | Disconnected          | 1.2        
2023-10-23T13:51:54.732Z | 172.21.3.15  | 725448         | Connection error      | 0.7        
2023-10-23T13:52:55.015Z | 172.21.3.15  | 8268153        | Connection error      | 8.3        
2023-10-23T13:53:55.832Z | 172.21.3.15  | 5033755        | Connection error      | 5.0        
2023-10-23T13:55:01.543Z | 172.21.3.15  | 1756467        | Connected to 10.1.0.1 | 1.8        

计算统计数据

你还可以使用 ES|QL 来聚合数据。使用 STATS ... BY 命令计算统计数据。

例如,计算中位持续时间:

esql_query = """
FROM sample_data
| STATS median_duration = MEDIAN(event_duration)
"""

response = client.esql.query(query=esql_query)
format_response(response)
median_duration
---------------
2764889.0 

你可以使用一个命令计算多个统计数据:

esql_query = """
FROM sample_data
| STATS median_duration = MEDIAN(event_duration), max_duration = MAX(event_duration)
"""

response = client.esql.query(query=esql_query)
format_response(response)
median_duration | max_duration
------------------------------
2764889.0       | 8268153     

使用 BY 按一列或多列对计算的统计数据进行分组。例如,要计算每个客户端 IP 的中位持续时间:

esql_query = """
FROM sample_data
| STATS median_duration = MEDIAN(event_duration) BY client_ip
"""

response = client.esql.query(query=esql_query)
format_response(response)
median_duration | client_ip   
------------------------------
1232382.0       | 172.21.0.5  
2764889.0       | 172.21.2.113
3450233.0       | 172.21.2.162
3395111.0       | 172.21.3.15 

访问列

你可以通过名称访问列。如果名称包含特殊字符,则需要用反引号(`)引起来。

为 EVAL 或 STATS 创建的列分配显式名称是可选的。如果不提供名称,则新列名称等于函数表达式。例如:

esql_query = """
FROM sample_data
| EVAL event_duration/1000000.0
"""

response = client.esql.query(query=esql_query)
format_response(response)
@timestamp               | client_ip    | event_duration | message               | event_duration/1000000.0
-----------------------------------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233        | Connected to 10.1.0.3 | 3.450233                
2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889        | Connected to 10.1.0.2 | 2.764889                
2023-10-23T13:33:34.937Z | 172.21.0.5   | 1232382        | Disconnected          | 1.232382                
2023-10-23T13:51:54.732Z | 172.21.3.15  | 725448         | Connection error      | 0.725448                
2023-10-23T13:52:55.015Z | 172.21.3.15  | 8268153        | Connection error      | 8.268153                
2023-10-23T13:53:55.832Z | 172.21.3.15  | 5033755        | Connection error      | 5.033755                
2023-10-23T13:55:01.543Z | 172.21.3.15  | 1756467        | Connected to 10.1.0.1 | 1.756467   

在此查询中,EVAL 添加一个名为 event_duration/1000000.0 的新列。由于其名称包含特殊字符,因此要访问此列,请用反引号引用它:

esql_query = """
FROM sample_data
| EVAL event_duration/1000000.0
| STATS MEDIAN(`event_duration/1000000.0`)
"""
response = client.esql.query(query=esql_query)
format_response(response)
MEDIAN(`event_duration/1000000.0`)
----------------------------------
2.764889    

创建直方图

为了跟踪一段时间内的统计数据,ES|QL 允许你使用 AUTO_BUCKET 函数创建直方图。 AUTO_BUCKET 创建人性化的存储桶大小,并为每行返回一个与该行所属的结果存储桶相对应的值。

例如,要为 10 月 23 日的数据创建每小时存储桶:

esql_query = """
FROM sample_data
| KEEP @timestamp
| EVAL bucket = AUTO_BUCKET (@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z")
"""
response = client.esql.query(query=esql_query)
format_response(response)
COUNT(*) | BUCKET(@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z")
---------------------------------------------------------------------------------
2        | 2023-10-23T12:00:00.000Z                                              
5        | 2023-10-23T13:00:00.000Z             

将 AUTO_BUCKET 与 STATS ... BY 结合起来创建直方图。例如,要计算每小时的事件数:

COUNT(*) | BUCKET (@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z")
----------------------------------------------------------------------------------
2        | 2023-10-23T12:00:00.000Z                                               
5        | 2023-10-23T13:00:00.000Z         

或每小时的中位持续时间:

median_duration | BUCKET (@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z")
-----------------------------------------------------------------------------------------
3107561.0       | 2023-10-23T12:00:00.000Z                                               
1756467.0       | 2023-10-23T13:00:00.000Z  

丰富数据

ES|QL 使你能够使用 ENRICH 命令使用 Elasticsearch 中索引的数据来丰富表。

ℹ️ 在使用 ENRICH 之前,你首先需要创建并执行丰富策略。,其中已经创建并执行了名为 clientip_policy 的丰富策略。

以下请求创建并执行名为 clientip_policy 的策略。该策略将 IP 地址链接到环境(“Development”、“QA” 或 “Production”)。

# Define the mapping
mapping = {
    "mappings": {
        "properties": {"client_ip": {"type": "keyword"}, "env": {"type": "keyword"}}
    }
}

# Create the index with the mapping
client.indices.create(index="clientips", body=mapping)

# Prepare bulk data
bulk_data = [
    {"index": {}},
    {"client_ip": "172.21.0.5", "env": "Development"},
    {"index": {}},
    {"client_ip": "172.21.2.113", "env": "QA"},
    {"index": {}},
    {"client_ip": "172.21.2.162", "env": "QA"},
    {"index": {}},
    {"client_ip": "172.21.3.15", "env": "Production"},
    {"index": {}},
    {"client_ip": "172.21.3.16", "env": "Production"},
]

# Bulk index the data
client.bulk(index="clientips", body=bulk_data)

# Define the enrich policy
policy = {
    "match": {
        "indices": "clientips",
        "match_field": "client_ip",
        "enrich_fields": ["env"],
    }
}

# Put the enrich policy
client.enrich.put_policy(name="clientip_policy", body=policy)

# Execute the enrich policy and wait for completion
client.enrich.execute_policy(name="clientip_policy", wait_for_completion=True)

创建并执行策略后,你可以将其与 ENRICH 命令一起使用:

esql_query = """
FROM sample_data
| KEEP @timestamp, client_ip, event_duration
| EVAL client_ip = TO_STRING(client_ip)
| ENRICH clientip_policy ON client_ip WITH env
"""
response = client.esql.query(query=esql_query)
format_response(response)
@timestamp               | event_duration | client_ip    | env 
---------------------------------------------------------------
2023-10-23T12:15:03.360Z | 3450233        | 172.21.2.162 | None
2023-10-23T12:27:28.948Z | 2764889        | 172.21.2.113 | None
2023-10-23T13:33:34.937Z | 1232382        | 172.21.0.5   | None
2023-10-23T13:51:54.732Z | 725448         | 172.21.3.15  | None
2023-10-23T13:52:55.015Z | 8268153        | 172.21.3.15  | None
2023-10-23T13:53:55.832Z | 5033755        | 172.21.3.15  | None
2023-10-23T13:55:01.543Z | 1756467        | 172.21.3.15  | None

你可以在后续命令中使用 ENRICH 命令添加的新 env 列。例如,要计算每个环境的中位持续时间:

esql_query = """
FROM sample_data
| KEEP @timestamp, client_ip, event_duration
| EVAL client_ip = TO_STRING(client_ip)
| ENRICH clientip_policy ON client_ip WITH env
| STATS median_duration = MEDIAN(event_duration) BY env
"""
response = client.esql.query(query=esql_query)
format_response(response)
median_duration | env 
----------------------
2764889.0       | None

有关使用 ES|QL 进行数据丰富的更多信息,请参阅数据丰富

处理数据

你的数据可能包含非结构化字符串,你希望将其结构化以便更轻松地分析数据。例如,示例数据包含如下日志消息:

"Connected to 10.1.0.3"

通过从这些消息中提取 IP 地址,你可以确定哪个 IP 接受了最多的客户端连接。

要在查询时构建非结构化字符串,你可以使用 ES|QL DISSECT 和 GROK 命令。 DISSECT 的工作原理是使用基于分隔符的模式分解字符串。 GROK 的工作原理类似,但使用正则表达式。这使得 GROK 更强大,但通常也更慢。

在这种情况下,不需要正则表达式,因为 message 很简单:“Connected to”,后跟服务器 IP。要匹配此字符串,你可以使用以下 DISSECT 命令:

esql_query = """
FROM sample_data
| DISSECT message "Connected to %{server_ip}"
"""
response = client.esql.query(query=esql_query)
format_response(response)
@timestamp               | client_ip    | event_duration | message               | server_ip
--------------------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233        | Connected to 10.1.0.3 | 10.1.0.3 
2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889        | Connected to 10.1.0.2 | 10.1.0.2 
2023-10-23T13:33:34.937Z | 172.21.0.5   | 1232382        | Disconnected          | None     
2023-10-23T13:51:54.732Z | 172.21.3.15  | 725448         | Connection error      | None     
2023-10-23T13:52:55.015Z | 172.21.3.15  | 8268153        | Connection error      | None     
2023-10-23T13:53:55.832Z | 172.21.3.15  | 5033755        | Connection error      | None     
2023-10-23T13:55:01.543Z | 172.21.3.15  | 1756467        | Connected to 10.1.0.1 | 10.1.0.1 

这会将 server_ip 列添加到具有与此模式匹配的消息的那些行。对于其他行,server_ip 的值为 null。

你可以在后续命令中使用 DISSECT 命令添加的新 server_ip 列。例如,要确定每个服务器已接受多少个连接:

esql_query = """
FROM sample_data
| WHERE STARTS_WITH(message, "Connected to")
| DISSECT message "Connected to %{server_ip}"
| STATS COUNT(*) BY server_ip
"""
response = client.esql.query(query=esql_query)
format_response(response)
COUNT(*) | server_ip
--------------------
1        | 10.1.0.3 
1        | 10.1.0.2 
1        | 10.1.0.1 

ℹ️ 要了解有关使用 ES|QL 进行数据处理的更多信息,请参阅使用 DISSECT 和 GROK 进行数据处理

如果你想了解更多关于 ES|QL 的知识,请详细阅读 “ES|QL

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值