【Dify(v1.x) 核心源码深入解析】moderation 模块

重磅推荐专栏:
《大模型AIGC》
《课程大纲》
《知识星球》

本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经验分享,旨在帮助读者更好地理解和应用这些领域的最新进展

模块简介

Dify 的 moderation 模块是一套功能强大的内容审核系统,旨在对用户输入和模型输出进行敏感内容检测与处理,以确保内容的安全性和合规性。它提供了灵活多样的审核策略,可帮助开发者根据实际需求定制审核流程,广泛应用于各种需要内容管控的应用场景,如聊天机器人、内容生成平台等。

模块架构

整体架构图

调用者
ModerationFactory
具体审核类
审核算法或规则
审核结果处理逻辑
审核配置验证逻辑

类图

使用
创建
返回
返回
返回
返回
返回
返回
调用者
ModerationFactory
Moderation
OpenAIModeration
KeywordsModeration
ApiModeration
ModerationInputsResult
ModerationOutputsResult

核心组件详解

ModerationFactory - 审核工厂类

这是 moderation 模块的核心入口,负责根据配置创建具体的审核实例。

代码片段及注释:

class ModerationFactory:
    __extension_instance: Moderation

    def __init__(self, name: str, app_id: str, tenant_id: str, config: dict) -> None:
        # 根据审核类型名称获取对应的审核类
        extension_class = code_based_extension.extension_class(ExtensionModule.MODERATION, name)
        # 创建具体审核实例
        self.__extension_instance = extension_class(app_id, tenant_id, config)

    @classmethod
    def validate_config(cls, name: str, tenant_id: str, config: dict) -> None:
        # 验证审核配置
        code_based_extension.validate_form_schema(ExtensionModule.MODERATION, name, config)
        extension_class = code_based_extension.extension_class(ExtensionModule.MODERATION, name)
        # 调用具体审核类的配置验证方法
        extension_class.validate_config(tenant_id, config)  # type: ignore

    def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
        # 执行输入内容审核
        return self.__extension_instance.moderation_for_inputs(inputs, query)

    def moderation_for_outputs(self, text: str) -> ModerationOutputsResult:
        # 执行输出内容审核
        return self.__extension_instance.moderation_for_outputs(text)

Moderation - 审核基类

定义了审核的基本规范和通用逻辑,所有具体审核类都需继承自该基类。

代码片段及注释:

class Moderation(Extensible, ABC):
    """
    审核基类,定义审核的基本规范和通用逻辑
    """

    module: ExtensionModule = ExtensionModule.MODERATION

    def __init__(self, app_id: str, tenant_id: str, config: Optional[dict] = None) -> None:
        super().__init__(tenant_id, config)
        self.app_id = app_id

    @classmethod
    @abstractmethod
    def validate_config(cls, tenant_id: str, config: dict) -> None:
        """
        验证审核配置
        """
        raise NotImplementedError

    @abstractmethod
    def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
        """
        执行输入内容审核
        """
        raise NotImplementedError

    @abstractmethod
    def moderation_for_outputs(self, text: str) -> ModerationOutputsResult:
        """
        执行输出内容审核
        """
        raise NotImplementedError

    @classmethod
    def _validate_inputs_and_outputs_config(cls, config: dict, is_preset_response_required: bool) -> None:
        # 验证输入输出配置的通用逻辑
        inputs_config = config.get("inputs_config")
        if not isinstance(inputs_config, dict):
            raise ValueError("inputs_config must be a dict")

        outputs_config = config.get("outputs_config")
        if not isinstance(outputs_config, dict):
            raise ValueError("outputs_config must be a dict")

        inputs_config_enabled = inputs_config.get("enabled")
        outputs_config_enabled = outputs_config.get("enabled")
        if not inputs_config_enabled and not outputs_config_enabled:
            raise ValueError("At least one of inputs_config or outputs_config must be enabled")

        if not is_preset_response_required:
            return

        if inputs_config_enabled:
            if not inputs_config.get("preset_response"):
                raise ValueError("inputs_config.preset_response is required")

            if len(inputs_config.get("preset_response", 0)) > 100:
                raise ValueError("inputs_config.preset_response must be less than 100 characters")

        if outputs_config_enabled:
            if not outputs_config.get("preset_response"):
                raise ValueError("outputs_config.preset_response is required")

            if len(outputs_config.get("preset_response", 0)) > 100:
                raise ValueError("outputs_config.preset_response must be less than 100 characters")

OpenAIModeration - OpenAI 审核类

利用 OpenAI 的审核模型对内容进行敏感性检测。

代码片段及注释:

class OpenAIModeration(Moderation):
    name: str = "openai_moderation"

    @classmethod
    def validate_config(cls, tenant_id: str, config: dict) -> None:
        # 验证 OpenAI 审核配置
        cls._validate_inputs_and_outputs_config(config, True)

    def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
        # 执行输入内容审核
        flagged = False
        preset_response = ""
        if self.config is None:
            raise ValueError("The config is not set.")

        if self.config["inputs_config"]["enabled"]:
            preset_response = self.config["inputs_config"]["preset_response"]

            if query:
                inputs["query__"] = query
            flagged = self._is_violated(inputs)

        return ModerationInputsResult(
            flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response
        )

    def moderation_for_outputs(self, text: str) -> ModerationOutputsResult:
        # 执行输出内容审核
        flagged = False
        preset_response = ""
        if self.config is None:
            raise ValueError("The config is not set.")

        if self.config["outputs_config"]["enabled"]:
            flagged = self._is_violated({"text": text})
            preset_response = self.config["outputs_config"]["preset_response"]

        return ModerationOutputsResult(
            flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response
        )

    def _is_violated(self, inputs: dict):
        # 判断内容是否违规
        text = "\n".join(str(inputs.values()))
        model_manager = ModelManager()
        # 获取 OpenAI 审核模型实例
        model_instance = model_manager.get_model_instance(
            tenant_id=self.tenant_id, provider="openai", model_type=ModelType.MODERATION, model="text-moderation-stable"
        )

        # 调用 OpenAI 审核模型进行检测
        openai_moderation = model_instance.invoke_moderation(text=text)

        return openai_moderation

KeywordsModeration - 关键词审核类

通过匹配预设的敏感关键词来检测内容是否违规。

代码片段及注释:

class KeywordsModeration(Moderation):
    name: str = "keywords"

    @classmethod
    def validate_config(cls, tenant_id: str, config: dict) -> None:
        # 验证关键词审核配置
        cls._validate_inputs_and_outputs_config(config, True)

        if not config.get("keywords"):
            raise ValueError("keywords is required")

        if len(config.get("keywords", [])) > 10000:
            raise ValueError("keywords length must be less than 10000")

        keywords_row_len = config["keywords"].split("\n")
        if len(keywords_row_len) > 100:
            raise ValueError("the number of rows for the keywords must be less than 100")

    def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
        # 执行输入内容审核
        flagged = False
        preset_response = ""
        if self.config is None:
            raise ValueError("The config is not set.")

        if self.config["inputs_config"]["enabled"]:
            preset_response = self.config["inputs_config"]["preset_response"]

            if query:
                inputs["query__"] = query

            # 获取过滤后的关键词列表
            keywords_list = [keyword for keyword in self.config["keywords"].split("\n") if keyword]

            flagged = self._is_violated(inputs, keywords_list)

        return ModerationInputsResult(
            flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response
        )

    def moderation_for_outputs(self, text: str) -> ModerationOutputsResult:
        # 执行输出内容审核
        flagged = False
        preset_response = ""
        if self.config is None:
            raise ValueError("The config is not set.")

        if self.config["outputs_config"]["enabled"]:
            # 获取过滤后的关键词列表
            keywords_list = [keyword for keyword in self.config["keywords"].split("\n") if keyword]

            flagged = self._is_violated({"text": text}, keywords_list)
            preset_response = self.config["outputs_config"]["preset_response"]

        return ModerationOutputsResult(
            flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response
        )

    def _is_violated(self, inputs: dict, keywords_list: list) -> bool:
        # 判断内容是否包含敏感关键词
        return any(self._check_keywords_in_value(keywords_list, value) for value in inputs.values())

    def _check_keywords_in_value(self, keywords_list: Sequence[str], value: Any) -> bool:
        # 检查单个值是否包含敏感关键词
        return any(keyword.lower() in str(value).lower() for keyword in keywords_list)

ApiModeration - API 审核类

通过调用外部 API 进行内容审核。

代码片段及注释:

class ApiModeration(Moderation):
    name: str = "api"

    @classmethod
    def validate_config(cls, tenant_id: str, config: dict) -> None:
        # 验证 API 审核配置
        cls._validate_inputs_and_outputs_config(config, False)

        api_based_extension_id = config.get("api_based_extension_id")
        if not api_based_extension_id:
            raise ValueError("api_based_extension_id is required")

        extension = cls._get_api_based_extension(tenant_id, api_based_extension_id)
        if not extension:
            raise ValueError("API-based Extension not found. Please check it again.")

    def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
        # 执行输入内容审核
        flagged = False
        preset_response = ""
        if self.config is None:
            raise ValueError("The config is not set.")

        if self.config["inputs_config"]["enabled"]:
            params = ModerationInputParams(app_id=self.app_id, inputs=inputs, query=query)

            # 调用外部 API 进行审核
            result = self._get_config_by_requestor(APIBasedExtensionPoint.APP_MODERATION_INPUT, params.model_dump())
            return ModerationInputsResult(**result)

        return ModerationInputsResult(
            flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response
        )

    def moderation_for_outputs(self, text: str) -> ModerationOutputsResult:
        # 执行输出内容审核
        flagged = False
        preset_response = ""
        if self.config is None:
            raise ValueError("The config is not set.")

        if self.config["outputs_config"]["enabled"]:
            params = ModerationOutputParams(app_id=self.app_id, text=text)

            # 调用外部 API 进行审核
            result = self._get_config_by_requestor(APIBasedExtensionPoint.APP_MODERATION_OUTPUT, params.model_dump())
            return ModerationOutputsResult(**result)

        return ModerationOutputsResult(
            flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response
        )

    def _get_config_by_requestor(self, extension_point: APIBasedExtensionPoint, params: dict) -> dict:
        # 发起对外部 API 的请求
        if self.config is None:
            raise ValueError("The config is not set.")
        extension = self._get_api_based_extension(self.tenant_id, self.config.get("api_based_extension_id", ""))
        if not extension:
            raise ValueError("API-based Extension not found. Please check it again.")
        requestor = APIBasedExtensionRequestor(extension.api_endpoint, decrypt_token(self.tenant_id, extension.api_key))

        result = requestor.request(extension_point, params)
        return result

    @staticmethod
    def _get_api_based_extension(tenant_id: str, api_based_extension_id: str) -> Optional[APIBasedExtension]:
        # 获取 API-based 扩展
        extension = (
            db.session.query(APIBasedExtension)
            .filter(APIBasedExtension.tenant_id == tenant_id, APIBasedExtension.id == api_based_extension_id)
            .first()
        )

        return extension

审核流程详解

输入审核流程

流程图
OpenAI审核
关键词审核
API审核
调用者发起输入审核请求
ModerationFactory 创建审核实例
审核类型判断
OpenAIModeration审核逻辑
KeywordsModeration审核逻辑
ApiModeration审核逻辑
返回审核结果
详细步骤
  1. 调用者通过 ModerationFactory 创建具体的审核实例,传入审核类型、应用 ID、租户 ID 和配置等参数。

  2. 根据审核类型,ModerationFactory 会创建对应的审核类实例(如 OpenAIModeration、KeywordsModeration 或 ApiModeration)。

  3. 调用审核实例的 moderation_for_inputs 方法,传入用户输入内容和查询字符串(可选)。

  4. 在审核类中,根据配置判断是否启用输入审核。如果启用,则执行相应的审核逻辑:

    • OpenAIModeration :将输入内容转换为文本,调用 OpenAI 审核模型进行检测,判断内容是否违规。
    • KeywordsModeration :将输入内容与预设的敏感关键词列表进行匹配,判断是否包含敏感关键词。
    • ApiModeration :将输入内容封装为请求参数,调用外部 API 进行审核,获取审核结果。
  5. 根据审核结果,返回 ModerationInputsResult 对象,包含是否违规、审核动作(如直接输出或覆盖)和预设响应等内容。

输出审核流程

流程图
OpenAI审核
关键词审核
API审核
调用者发起输出审核请求
ModerationFactory 创建审核实例
审核类型判断
OpenAIModeration审核逻辑
KeywordsModeration审核逻辑
ApiModeration审核逻辑
返回审核结果
详细步骤
  1. 调用者通过 ModerationFactory 创建具体的审核实例,传入审核类型、应用 ID、租户 ID 和配置等参数。

  2. 根据审核类型,ModerationFactory 会创建对应的审核类实例(如 OpenAIModeration、KeywordsModeration 或 ApiModeration)。

  3. 调用审核实例的 moderation_for_outputs 方法,传入模型输出的文本内容。

  4. 在审核类中,根据配置判断是否启用输出审核。如果启用,则执行相应的审核逻辑:

    • OpenAIModeration :将输出内容转换为文本,调用 OpenAI 审核模型进行检测,判断内容是否违规。
    • KeywordsModeration :将输出内容与预设的敏感关键词列表进行匹配,判断是否包含敏感关键词。
    • ApiModeration :将输出内容封装为请求参数,调用外部 API 进行审核,获取审核结果。
  5. 根据审核结果,返回 ModerationOutputsResult 对象,包含是否违规、审核动作(如直接输出或覆盖)和预设响应等内容。

审核结果处理

审核结果主要通过 ModerationInputsResult 和 ModerationOutputsResult 两个类来表示,它们包含了审核的关键信息,如下表所示:

属性名称含义类型
flagged是否标记为违规内容bool
action审核动作(DIRECT_OUTPUT 或 OVERRIDDEN)ModerationAction
preset_response预设响应内容str
inputs审核后的输入内容(仅 ModerationInputsResult 有)dict
query审核后的查询字符串(仅 ModerationInputsResult 有)str
text审核后的输出文本(仅 ModerationOutputsResult 有)str

根据审核结果,调用者可以采取相应的处理措施。例如,如果内容被标记为违规且审核动作为 DIRECT_OUTPUT,则可以直接使用预设响应内容进行回复;如果审核动作为 OVERRIDDEN,则可以使用审核后的内容进行后续处理。

场景应用示例

场景一:聊天机器人输入审核

在聊天机器人应用中,用户发送消息后,系统需要对输入内容进行审核,以防止恶意攻击或不当内容。

  1. 用户发送消息 “你好,我想了解一些关于股票投资的知识。”。
  2. 系统调用 ModerationFactory 创建关键词审核实例,传入应用 ID、租户 ID 和关键词配置(包含敏感词汇如 “赌博”“诈骗” 等)。
  3. 调用审核实例的 moderation_for_inputs 方法,传入用户输入内容。
  4. KeywordsModeration 审核类将用户输入内容与关键词列表进行匹配,发现不包含敏感词汇,返回 flagged = False 的审核结果。
  5. 系统根据审核结果,允许消息通过,继续后续的处理流程。

场景二:内容生成平台输出审核

内容生成平台在生成文本内容后,需要对输出内容进行审核,以确保内容的安全性和合规性。

  1. 模型生成了一段关于健康饮食的文章。
  2. 系统调用 ModerationFactory 创建 OpenAI 审核实例,传入应用 ID、租户 ID 和 OpenAI 审核配置。
  3. 调用审核实例的 moderation_for_outputs 方法,传入模型输出的文本内容。
  4. OpenAIModeration 审核类调用 OpenAI 审核模型对内容进行检测,判断是否包含敏感或不适宜的内容。
  5. 如果检测到违规内容,返回 flagged = True 及相应的预设响应内容;否则返回 flagged = False 的审核结果。
  6. 系统根据审核结果,对违规内容进行处理(如替换为预设响应),确保输出内容的安全性。

性能优化与注意事项

性能优化

  • 缓存机制 :对于频繁访问的审核配置和关键词列表,可以考虑引入缓存机制,减少数据库查询次数,提高系统性能。
  • 异步处理 :在审核流程中,对于一些耗时较长的操作(如调用外部 API),可以考虑采用异步处理方式,避免阻塞主线程,提高系统的响应速度。
  • 资源优化 :合理配置审核模型和 API 的资源使用,根据不同业务场景进行资源分配,确保系统的稳定运行。

注意事项

  • 配置管理 :审核配置的合理性和准确性至关重要,需要定期对配置进行检查和更新,以适应不断变化的业务需求和安全形势。
  • 容错处理 :在调用外部服务(如 OpenAI 审核模型或外部 API)时,需要做好容错处理,避免因服务不可用导致整个系统受影响。
  • 数据安全 :在处理用户输入和模型输出内容时,需要注意数据的安全性和隐私保护,避免敏感信息泄露。

总之,Dify 的 moderation 模块提供了一套灵活、高效的审核解决方案,通过合理配置和使用,可以有效保障内容的安全性和合规性,满足各种应用场景的需求。

### 关于 DIFY 项目中 `.env.example` 文件缺失的解决方案 如果遇到 `dify` 项目中缺少 `.env.example` 文件的情况,可以按照以下方法解决: #### 方法一:手动创建 `.env.example` 如果没有发现 `.env.example` 文件,则可能是因为该文件未被包含在版本控制系统中或者是下载过程中丢失。此时可以根据项目的文档说明来手动创建此文件。 - 进入到 `dify/docker` 目录下。 - 创建一个新的名为 `.env.example` 的文件,并向其中添加必要的环境变量配置项。这些配置项可以从其他类似的开源项目获取模板,也可以参照官方文档中的指导[^2]。 ```bash cd dify/docker touch .env.example nano .env.example ``` 编辑器打开后输入如下内容作为示例(具体参数需依据实际需求填写): ```plaintext # 启用自定义模型 CUSTOM_MODEL_ENABLED=true # 指定 Ollama 的 API 地址(根据部署环境调整 IP) OLLAMA_API_BASE_URL=http://192.168.1.10:11434 ``` 保存并关闭编辑器。 #### 方法二:克隆最新仓库代码重新获取 另一种可能是本地副本存在问题,建议尝试删除当前工作区并从远程仓库再次拉取最新的源码以确保获得完整的资源文件。 ```bash rm -rf dify/ git clone https://github.com/your-repo/dify.git cd dify/docker cp .env.example .env ``` 这一步骤能够保证`.env.example`文件的存在以及其内容是最新的状态[^3]。 #### 方法三:联系维护者或社区求助 当上述两种方式都无法解决问题时,可以通过访问该项目的 GitHub Issues 页面提交问题报告寻求帮助;或者加入开发者交流群组询问是否有其他人也遇到了相同情况及其处理办法。 ---
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

小爷毛毛(卓寿杰)

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值