Elasticsearch:RBAC 和 RAG - 最好的朋友 (二)

在之前的文章 “Elasticsearch:RBAC 和 RAG - 最好的朋友(一)”,我们详细描述了如何使用 RBAC 来控制 RAG 的访问。在今天的文章中,我们来通过一个 jupyter notebook 来描述如何实现这个。

安装

如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参考如下的链接来进行安装:

在安装的时候,我们选择 Elastic Stack 8.x 来进行安装。特别值得指出的是:ES|QL 只在 Elastic Stack 8.11 及以后得版本中才有。你需要下载 Elastic Stack 8.11 及以后得版本来进行安装。

在首次启动 Elasticsearch 的时候,我们可以看到如下的输出:

我们需要记下 Elasticsearch 超级用户 elastic 的密码。

我们还可以在安装 Elasticsearch 目录中找到 Elasticsearch 的访问证书:

$ pwd
/Users/liuxg/elastic/elasticsearch-8.13.2/config/certs
$ ls 
http.p12      http_ca.crt   transport.p12

在上面,http_ca.crt 是我们需要用来访问 Elasticsearch 的证书。

我们首先克隆已经写好的代码

git clone https://github.com/liu-xiao-guo/elasticsearch-labs

我们然后进入到该项目的根目录下:

$ pwd
/Users/liuxg/python/elasticsearch-labs/supporting-blog-content/rbac-and-rag-best-friends
$ cp ~/elastic/elasticsearch-8.13.2/config/certs/http_ca.crt .
$ ls
http_ca.crt                     rbac-and-rag-best-friends.ipynb

在上面,我们把 Elasticsearch 的证书拷贝到当前的目录下。上面的 rbac-and-rag-best-friends.ipynb 就是我们下面要展示的 notebook。

展示

在运行 jupyter notebook 之前,我们先在命令行中打入如下的命令来设置变量:

export ES_USER="elastic"
export ES_PASSWORD="VDMlz5QnM_0g-349fFq7"
export ES_ENDPOINT="localhost"

我们需要根据自己的配置做相应的改动。然后,我们在当前的 terminal 中打入如下的命令:

jupyter notebook

安装并导入需要的 Python 库

!pip install elasticsearch python-dotenv
from elasticsearch import Elasticsearch
from IPython.display import HTML, display
from pprint import pprint
from dotenv import load_dotenv
import os, json

在运行完上面的命令后,我们可以查看安装好的 elasticsearch 包的版本:

$ pip list | grep elasticsearch
elasticsearch                 8.13.0

客户端连接到 Elasticsearch

创建 elasticsearch 连接

load_dotenv()

ES_USER = os.getenv("ES_USER")
ES_PASSWORD = os.getenv("ES_PASSWORD")
ES_ENDPOINT = os.getenv("ES_ENDPOINT")

url = f"https://{ES_USER}:{ES_PASSWORD}@{ES_ENDPOINT}:9200"
print(url)

es = Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)
print(es.info())

更多有关如何使用 Python 连接到 Elasticsearch 的知识,请参阅文章 “Elasticsearch:关于在 Python 中使用 Elasticsearch 你需要知道的一切 - 8.x”。

删除演示索引(如果以前存在)

# Delete indices
def delete_indices():
    try:
        es.indices.delete(index="rbac_rag_demo-data_public")
        print("Deleted index: rbac_rag_demo-data_public")
    except Exception as e:
        print(f"Error deleting index rbac_rag_demo-data_public: {str(e)}")

    try:
        es.indices.delete(index="rbac_rag_demo-data_sensitive")
        print("Deleted index: rbac_rag_demo-data_sensitive")
    except Exception as e:
        print(f"Error deleting index rbac_rag_demo-data_sensitive: {str(e)}")


delete_indices()

创建及装载数据到索引中

# Create indices
def create_indices():
    # Create data_public index
    es.indices.create(
        index="rbac_rag_demo-data_public",
        ignore=400,
        body={
            "settings": {"number_of_shards": 1},
            "mappings": {"properties": {"info": {"type": "text"}}},
        },
    )

    # Create data_sensitive index
    es.indices.create(
        index="rbac_rag_demo-data_sensitive",
        ignore=400,
        body={
            "settings": {"number_of_shards": 1},
            "mappings": {
                "properties": {
                    "document": {"type": "text"},
                    "confidentiality_level": {"type": "keyword"},
                }
            },
        },
    )


# Populate sample data
def populate_data():
    # Public HR information
    public_docs = [
        {"title": "Annual leave policies updated.", "confidentiality_level": "low"},
        {"title": "Remote work guidelines available.", "confidentiality_level": "low"},
        {
            "title": "Health benefits registration period starts next month.",
            "confidentiality_level": "low",
        },
    ]
    for doc in public_docs:
        es.index(index="rbac_rag_demo-data_public", document=doc)

    # Sensitive HR information
    sensitive_docs = [
        {
            "title": "Executive compensation details Q2 2024.",
            "confidentiality_level": "high",
        },
        {
            "title": "Bonus payout structure for all levels.",
            "confidentiality_level": "high",
        },
        {
            "title": "Employee stock options plan details.",
            "confidentiality_level": "high",
        },
    ]
    for doc in sensitive_docs:
        es.index(index="rbac_rag_demo-data_sensitive", document=doc)


create_indices()
populate_data()

我们可以在 Kibana 中使用如下的命令来查看索引:

创建两个具有不同访问级别的用户

# Create roles
def create_roles():
    # Role for the engineer
    es.security.put_role(
        name="engineer_role",
        body={
            "indices": [
                {"names": ["rbac_rag_demo-data_public"], "privileges": ["read"]}
            ]
        },
    )

    # Role for the manager
    es.security.put_role(
        name="manager_role",
        body={
            "indices": [
                {
                    "names": [
                        "rbac_rag_demo-data_public",
                        "rbac_rag_demo-data_sensitive",
                    ],
                    "privileges": ["read"],
                }
            ]
        },
    )


# Create users with respective roles
def create_users():
    # User 'engineer'
    es.security.put_user(
        username="engineer",
        body={
            "password": "password123",
            "roles": ["engineer_role"],
            "full_name": "Engineer User",
        },
    )

    # User 'manager'
    es.security.put_user(
        username="manager",
        body={
            "password": "password123",
            "roles": ["manager_role"],
            "full_name": "Manager User",
        },
    )


create_roles()
create_users()

运行完上面的代码后,我们可以在 Kibana 中进行查看:

我们其实也可以使用 Kibana 的 UI 来创建这些用户及 role。你可以想象阅读文章 “Elasticsearch:用户安全设置”。

测试安全角色如何影响查询数据的能力

创建 helper 函数

用于查询每个用户的辅助函数和一些输出格式

"""
def get_es_connection(cid, username, password):
    return Elasticsearch(cloud_id=cid, basic_auth=(username, password))
"""

def get_es_connection(username, password):
    url = f"https://{username}:{password}@{ES_ENDPOINT}:9200"
    print(url)
    return Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)


def query_index(es, index_name, username):
    try:
        response = es.search(index=index_name, body={"query": {"match_all": {}}})

        # Prepare the message
        results_message = f'Results from querying as <span style="color: orange;">{username}:</span><br>'
        for hit in response["hits"]["hits"]:
            confidentiality_level = hit["_source"].get("confidentiality_level", "N/A")
            index_name = hit.get("_index", "N/A")
            title = hit["_source"].get("title", "No title")

            # Set color based on confidentiality level
            if confidentiality_level == "low":
                conf_color = "lightgreen"
            elif confidentiality_level == "high":
                conf_color = "red"
            else:
                conf_color = "black"

            # Set color based on index name
            if index_name == "rbac_rag_demo-data_public":
                index_color = "lightgreen"
            elif index_name == "rbac_rag_demo-data_sensitive":
                index_color = "red"
            else:
                index_color = "black"  # Default color

            results_message += (
                f'Index: <span style="color: {index_color};">{index_name}</span>\t '
                f'confidentiality level: <span style="color: {conf_color};">{confidentiality_level}</span> '
                f'title: <span style="color: lightblue;">{title}</span><br>'
            )

        display(HTML(results_message))

    except Exception as e:
        print(f"Error accessing {index_name}: {str(e)}")

模拟 “工程师” 及 “经理” 的查询

index_pattern = "rbac_rag_demo-data*"
print(
    f"Each user will log in with their credentials and query the same index pattern: {index_pattern}\n\n"
)

for user in ["engineer", "manager"]:
    print(f"Logged in as {user}:")

    es_conn = get_es_connection(user, "password123")
    results = query_index(es_conn, index_pattern, user)
    print("\n\n")
index_pattern = "rbac_rag_demo-data*"
print(
    f"Each user will log in with their credentials and query the same index pattern: {index_pattern}\n\n"
)

for user in ["engineer", "manager"]:
    print(f"Logged in as {user}:")

    es_conn = get_es_connection(user, "password123")
    results = query_index(es_conn, index_pattern, user)
    print("\n\n")

从上面的输出中,我们可以看出来经理可以同时访问两个索引的数据,但是工程师只能访问属于工程师的数据。

最终的源码在地址 elasticsearch-labs/supporting-blog-content/rbac-and-rag-best-friends/rbac-and-rag-best-friends.ipynb at main · liu-xiao-guo/elasticsearch-labs · GitHub

评论 6
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值