StreamAlert项目中的Publishers机制详解

StreamAlert项目中的Publishers机制详解

【免费下载链接】streamalert StreamAlert is a serverless, realtime data analysis framework which empowers you to ingest, analyze, and alert on data from any environment, using datasources and alerting logic you define. 【免费下载链接】streamalert 项目地址: https://gitcode.com/gh_mirrors/st/streamalert

什么是Publishers

在StreamAlert项目中,Publishers是一个强大的警报处理框架,它允许用户在每条规则的基础上,在警报发送到输出目标之前对警报进行转换和定制。简单来说,Publishers就像是警报数据的"化妆师",能够在警报最终呈现给用户前,对其进行美化和加工。

Publishers的核心价值在于:

  • 提供警报数据的灵活转换能力
  • 支持不同输出目标的定制化展示
  • 保持原始警报数据不变的同时生成更适合展示的格式
  • 实现不同输出渠道的差异化展示需求

Publishers的工作原理

Publishers在警报处理流程中扮演着关键角色,它们的工作时机是在警报处理完成之后,但在警报被发送到输出目标之前。这种设计使得Publishers能够:

  1. 接收到完整的警报对象和可选的发布数据
  2. 对警报信息进行转换、过滤或增强
  3. 返回处理后的数据供后续流程使用

Publishers的执行是串行的,每个Publisher都会接收前一个Publisher处理后的结果,并在此基础上进行进一步处理。

如何实现新的Publisher

StreamAlert提供了两种实现Publisher的方式,开发者可以根据需求选择最适合的方式。

函数式实现

这是最简单的实现方式,只需要定义一个顶层函数并使用@Register装饰器进行注册:

from streamalert.shared.publisher import Register

@Register
def my_publisher(alert: Alert, publication: dict) -> dict:
    # 处理逻辑
    return {}

类式实现

对于更复杂的场景,可以通过继承AlertPublisher基类来实现:

from streamalert.shared.publisher import AlertPublisher, Register

@Register
class MyPublisherClass(AlertPublisher):
    def publish(alert: Alert, publication: dict) -> dict:
        # 处理逻辑
        return {}

实现建议

无论采用哪种方式,都应遵循以下最佳实践:

  1. 返回值应只包含简单类型(字符串、数字、列表、字典)
  2. 避免直接修改输入参数,应采用复制后修改的方式
  3. 保持处理逻辑简洁高效
  4. 考虑异常处理和边界情况
@Register
def sample_publisher(alert, publication):
    # 推荐做法:创建新字典而非修改原字典
    new_publication = {
        **publication,
        'new_field': 'new_value'
    }
    new_publication.pop('old_field', None)
    return new_publication

输出目标的适配

为了使Publishers能够充分发挥作用,输出目标的实现需要遵循特定规范。

使用compose_alert方法

输出目标应使用compose_alert()方法来获取经过所有Publishers处理后的数据:

from streamalert.alert_processor.helpers import compose_alert

def _dispatch(self, alert, descriptor):
    publication = compose_alert(alert, self, descriptor)
    # 使用publication中的数据调用API

默认实现

对于必须字段,输出目标应提供默认值:

def _dispatch(self, alert, descriptor):
    default_title = f'Incident Title: #{alert.alert_id}'
    default_html = f'<html><body>Rule: {alert.rule_description}</body></html>'

自定义字段

输出目标可以定义特定格式的自定义字段,供Publishers填充:

  • 格式:@{output_service}.{field_name}
  • 必须以@符号开头
  • output_service应与输出目标的cls.__service__值匹配
def _dispatch(self, alert, descriptor):
    publication = compose_alert(alert, self, descriptor)
    title = publication.get('@pagerduty.title', default_title)
    body_html = publication.get('@pagerduty.body_html', default_html)

Publishers的注册与使用

Publishers需要在规则中通过@rule装饰器的publishers参数进行注册。

注册方式

  1. 单个Publisher(应用于所有输出):
@publishers=publisher_1
  1. Publisher列表(应用于所有输出):
@publishers=[publisher_1, publisher_2, publisher_3]
  1. 按输出目标映射Publisher:
@publishers={
    'pagerduty:analyst': [publisher_1, publisher_2],
    'pagerduty': [publisher_3, publisher_4],
    'demisto': other_publisher,
}

默认Publisher

当规则中未指定publishers或配置错误时,系统会使用DefaultPublisher,它保持与旧版本alert.output_dict()的兼容性。

实际应用示例

假设我们需要处理SSH登录警报,但希望PagerDuty输出只显示关键信息:

@Register
def simplify_pagerduty_output(alert, publication):
    return {
        '@pagerduty.record': {
            'source_ip': alert.record['source_ip'],
            'time': alert.record['timestamp'],
            'username': alert.record['user'],
        },
        '@pagerduty.summary': f'Machine SSH: {alert.record["user"]}',
    }

然后在规则中针对特定输出应用此Publisher:

@rule(
    logs=['ssh'],
    outputs=['slack:engineering', 'pagerduty:engineering'],
    publishers={
        'pagerduty:engineering': simplify_pagerduty_output,
    }
)
def machine_ssh_login(rec):
    # 规则逻辑

这种实现方式既简化了PagerDuty的警报展示,又保持了Slack输出的完整性,充分体现了Publishers机制的灵活性和实用性。

【免费下载链接】streamalert StreamAlert is a serverless, realtime data analysis framework which empowers you to ingest, analyze, and alert on data from any environment, using datasources and alerting logic you define. 【免费下载链接】streamalert 项目地址: https://gitcode.com/gh_mirrors/st/streamalert

创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值