StreamAlert项目中的Publishers机制详解
什么是Publishers
在StreamAlert项目中,Publishers是一个强大的警报处理框架,它允许用户在每条规则的基础上,在警报发送到输出目标之前对警报进行转换和定制。简单来说,Publishers就像是警报数据的"化妆师",能够在警报最终呈现给用户前,对其进行美化和加工。
Publishers的核心价值在于:
- 提供警报数据的灵活转换能力
- 支持不同输出目标的定制化展示
- 保持原始警报数据不变的同时生成更适合展示的格式
- 实现不同输出渠道的差异化展示需求
Publishers的工作原理
Publishers在警报处理流程中扮演着关键角色,它们的工作时机是在警报处理完成之后,但在警报被发送到输出目标之前。这种设计使得Publishers能够:
- 接收到完整的警报对象和可选的发布数据
- 对警报信息进行转换、过滤或增强
- 返回处理后的数据供后续流程使用
Publishers的执行是串行的,每个Publisher都会接收前一个Publisher处理后的结果,并在此基础上进行进一步处理。
如何实现新的Publisher
StreamAlert提供了两种实现Publisher的方式,开发者可以根据需求选择最适合的方式。
函数式实现
这是最简单的实现方式,只需要定义一个顶层函数并使用@Register装饰器进行注册:
from streamalert.shared.publisher import Register
@Register
def my_publisher(alert: Alert, publication: dict) -> dict:
# 处理逻辑
return {}
类式实现
对于更复杂的场景,可以通过继承AlertPublisher基类来实现:
from streamalert.shared.publisher import AlertPublisher, Register
@Register
class MyPublisherClass(AlertPublisher):
def publish(alert: Alert, publication: dict) -> dict:
# 处理逻辑
return {}
实现建议
无论采用哪种方式,都应遵循以下最佳实践:
- 返回值应只包含简单类型(字符串、数字、列表、字典)
- 避免直接修改输入参数,应采用复制后修改的方式
- 保持处理逻辑简洁高效
- 考虑异常处理和边界情况
@Register
def sample_publisher(alert, publication):
# 推荐做法:创建新字典而非修改原字典
new_publication = {
**publication,
'new_field': 'new_value'
}
new_publication.pop('old_field', None)
return new_publication
输出目标的适配
为了使Publishers能够充分发挥作用,输出目标的实现需要遵循特定规范。
使用compose_alert方法
输出目标应使用compose_alert()方法来获取经过所有Publishers处理后的数据:
from streamalert.alert_processor.helpers import compose_alert
def _dispatch(self, alert, descriptor):
publication = compose_alert(alert, self, descriptor)
# 使用publication中的数据调用API
默认实现
对于必须字段,输出目标应提供默认值:
def _dispatch(self, alert, descriptor):
default_title = f'Incident Title: #{alert.alert_id}'
default_html = f'<html><body>Rule: {alert.rule_description}</body></html>'
自定义字段
输出目标可以定义特定格式的自定义字段,供Publishers填充:
- 格式:
@{output_service}.{field_name} - 必须以@符号开头
- output_service应与输出目标的
cls.__service__值匹配
def _dispatch(self, alert, descriptor):
publication = compose_alert(alert, self, descriptor)
title = publication.get('@pagerduty.title', default_title)
body_html = publication.get('@pagerduty.body_html', default_html)
Publishers的注册与使用
Publishers需要在规则中通过@rule装饰器的publishers参数进行注册。
注册方式
- 单个Publisher(应用于所有输出):
@publishers=publisher_1
- Publisher列表(应用于所有输出):
@publishers=[publisher_1, publisher_2, publisher_3]
- 按输出目标映射Publisher:
@publishers={
'pagerduty:analyst': [publisher_1, publisher_2],
'pagerduty': [publisher_3, publisher_4],
'demisto': other_publisher,
}
默认Publisher
当规则中未指定publishers或配置错误时,系统会使用DefaultPublisher,它保持与旧版本alert.output_dict()的兼容性。
实际应用示例
假设我们需要处理SSH登录警报,但希望PagerDuty输出只显示关键信息:
@Register
def simplify_pagerduty_output(alert, publication):
return {
'@pagerduty.record': {
'source_ip': alert.record['source_ip'],
'time': alert.record['timestamp'],
'username': alert.record['user'],
},
'@pagerduty.summary': f'Machine SSH: {alert.record["user"]}',
}
然后在规则中针对特定输出应用此Publisher:
@rule(
logs=['ssh'],
outputs=['slack:engineering', 'pagerduty:engineering'],
publishers={
'pagerduty:engineering': simplify_pagerduty_output,
}
)
def machine_ssh_login(rec):
# 规则逻辑
这种实现方式既简化了PagerDuty的警报展示,又保持了Slack输出的完整性,充分体现了Publishers机制的灵活性和实用性。
创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考



