一个面向生产运维场景的 Webhook 智能管家。基于 FastAPI 异步架构,具备 AI 根因分析、智能告警降噪、缓存/数据库去重、冷热数据归档以及可观测性能力。
| 能力 | 说明 |
|---|---|
| 异步 Webhook 接收 | Full 模式走 TaskIQ/Redis Stream;Lite 模式走 API 进程内后台任务,均立即返回 202 |
| AI 深度分析 | LLM 自动识别重要性,输出根因定位、影响评估与修复建议 |
| OpenClaw 深度分析 | 接入 OpenClaw 深度分析引擎,通过 TaskIQ 动态延迟任务拉取结果 |
| 智能降噪去重 | Adapter 范式化告警 identity,混合相似度/可选 embedding 识别衍生告警,缓存/数据库复用 + 24h 时间窗口(可配置) |
| 告警风暴背压 | Full 模式同一 alert_hash Redis 分布式 single-flight + 短窗口 Fail-Fast,防资源耗尽;Lite 模式使用进程内同类告警串行 |
| 转发规则引擎 | 多规则按优先级匹配,支持 Webhook / 飞书卡片 / OpenClaw 三种目标类型 |
| 事务性转发 Outbox | 处理结果与转发意图同事务落库,Worker 异步消费,避免 DB 状态与 HTTP 副作用脱节 |
| 转发失败重试 | Outbox 投递失败后指数退避重试;超过最大投递年龄会标记 expired,避免旧告警误发 |
| 冷热数据归档 | 每日凌晨自动按重要性分级归档(high 90d / medium 30d / low 7d) |
| 运行时策略热更新 | 配置写入 DB system_configs,Redis Pub/Sub 广播到所有进程 |
| 全方位可观测性 | Prometheus 原生指标 + OpenTelemetry 链路追踪(可选) |
┌─────────────┐
Webhook 来源 ───────▶│ FastAPI │──── 202 Accepted
(Prometheus, │ /webhook │
Grafana, etc.) └──────┬──────┘
│ TaskIQ (Redis MQ)
▼
┌─────────────────────────────────┐
│ Pipeline │
│ normalize → dedup → AI analyze │
│ → noise reduce → forward │
└────────────┬────────────────────┘
┌───────────────┼───────────────┐
▼ ▼ ▼
PostgreSQL Redis 外部系统
(持久化) (队列/缓存) (飞书/Webhook/
OpenClaw)
进程模型:
migrate一次性任务:等待 PostgreSQL 就绪并执行alembic upgrade headapi进程:FastAPI HTTP 服务(Gunicorn 4 UvicornWorker)worker进程:TaskIQ Worker,消费异步业务任务和定时任务scheduler进程:TaskIQ Scheduler,只负责周期性投递任务,不执行业务逻辑RUN_MODE=all:通过supervisord在同一个容器内同时拉起api/worker/scheduler,适合单机小部署或演示;需要横向扩容 Worker 时仍推荐独立进程/独立容器。RUN_MODE=lite:单 API 进程内完成接收、分析、Outbox 投递和兜底扫描,不依赖 Redis/TaskIQ;适合小团队、演示和低 QPS 告警转发。
异步职责边界:
- TaskIQ:基于 Redis Stream 的异步任务投递与 Worker 消费
- Scheduler:周期性投递 recovery、metrics、数据维护等任务
- TaskIQ 动态调度:按事件投递 Webhook 处理重试、Forward Outbox 重试和 OpenClaw 结果拉取
- Forward Outbox:Webhook 处理事务内只写入待发送意图,由 Worker 执行真实 HTTP/OpenClaw 转发
- PostgreSQL:Webhook 事实存储、失败转发/死信/重试状态等可审计状态
- Redis:TaskIQ 队列、短窗口风暴计数、缓存、运行时配置广播
Lite 模式边界:
- 只启动 API + PostgreSQL,后台处理在 API 进程内执行。
- 不使用 Redis Stream、Redis 缓存、Redis Pub/Sub 或跨进程分布式锁。
- 适合单实例/小流量;需要横向扩容、强隔离 Worker、跨实例 single-flight 时使用默认 Full 模式。
| 组件 | 技术选型 |
|---|---|
| Web 框架 | Python 3.12 + FastAPI + uvloop |
| 任务队列 | TaskIQ + Redis Stream Broker |
| 数据库 | PostgreSQL 15+ (asyncpg + SQLAlchemy 2.0) |
| 队列/缓存 | Redis 7+ |
| AI 调用 | AsyncOpenAI + Instructor (结构化输出) |
| HTTP 客户端 | httpx 单例连接池 |
| 监控 | Prometheus + prometheus-fastapi-instrumentator |
| 链路追踪 | OpenTelemetry (可选,OTLP 导出) |
| 数据迁移 | Alembic |
| 容器化 | Docker + Docker Compose;Full 模式 6 服务,Lite 模式 3 服务(migrate/API/PostgreSQL) |
# 1. 复制并填写配置
cp .env.example .env
# 至少需要替换: API_KEY, ADMIN_WRITE_KEY, WEBHOOK_SECRET;需要 AI 分析时再填写 OPENAI_API_KEY
# 2. 一键启动(Migrate + API + Worker + Scheduler + Redis + PostgreSQL)
docker-compose up -d --build
# 3. 验证
curl http://localhost:8000/health数据库 Schema 迁移由 Compose 中的一次性 migrate 服务执行(alembic upgrade head)。API、Worker 和 Scheduler 只在迁移成功后启动,entrypoint.sh 只负责按 RUN_MODE 分发进程。
只想把 Prometheus/Grafana 告警处理后转发到飞书,可以先用 Lite 模式:
cp .env.example .env
# 至少替换 API_KEY、ADMIN_WRITE_KEY、WEBHOOK_SECRET;如果只是内网测试,可显式设置 ALLOW_UNAUTHENTICATED_WEBHOOK=true
docker compose -f docker-compose.lite.yml up -d --build
curl http://localhost:8000/readyLite 模式不启动 Redis、Worker、Scheduler。Webhook 入站、AI 分析、Outbox 投递和补偿扫描都在 API 进程内完成;/ready 只检查 PostgreSQL。生产高流量或多实例部署仍建议使用默认 Full 模式。
默认 Compose 是三容器进程模型。如果想用 supervisor 在一个应用容器里同时拉起 API、Worker、Scheduler:
docker compose -f docker-compose.yml -f docker-compose.supervisor.yml up -d --build
docker compose -f docker-compose.yml -f docker-compose.supervisor.yml exec webhook-service supervisorctl -c /app/supervisord.conf statusRUN_MODE=all 时容器 PID 1 是 supervisord,它会管理三个子进程:
api:Gunicorn + UvicornWorker,监听:8000worker:taskiq worker core.taskiq_broker:broker services.operations.tasksscheduler:taskiq scheduler core.taskiq_broker:scheduler
健康检查会同时校验 supervisor 中三个 program 均为 RUNNING,并探测 API /ready。
默认三容器模式下,API 指标从 webhook-service:8000/metrics 暴露;Worker 会在容器网络内额外 expose ${WORKER_METRICS_PORT:-9001},用于抓取 TaskIQ 后台任务、recovery、重试等指标。all-in-one 模式下这些进程共享 PROMETHEUS_MULTIPROC_DIR,统一从 API /metrics 聚合暴露。
pip install -r requirements.lock
# 启动 API
uvicorn main:app --reload --port 8000
# 启动 Worker(另一个终端)
taskiq worker core.taskiq_broker:broker services.operations.taskscurl -X POST http://localhost:8000/webhook \
-H "Content-Type: application/json" \
-d '{"alertname": "TestAlert", "severity": "critical", "host": "prod-01"}'项目目前分三层验证:
| 层级 | 命令 | 覆盖内容 |
|---|---|---|
| 静态检查 | ruff check . / mypy . |
代码风格、类型边界 |
| 单元 + 进程内集成 | pytest |
纯函数、核心服务、FastAPI 路由到 pipeline 的进程内链路 |
| Docker E2E | tests/e2e/run_webhook_to_feishu.sh |
真 PostgreSQL、真 Redis、API 容器、TaskIQ Worker、TaskIQ Scheduler、fake Feishu HTTP server |
常规本地/CI 快速验证:
ruff check .
mypy .
pytest核心链路 E2E 验证:
tests/e2e/run_webhook_to_feishu.sh这条 E2E 会自动:
- 用
tests/e2e/docker-compose.yml启动一次性环境; - 从干净 PostgreSQL 执行
alembic upgrade head; - 等待一次性
migrate任务完成后,启动 API、Redis、TaskIQ Worker、TaskIQ Scheduler 和 fake Feishu; - 向
/webhook/prometheus发送真实 HTTP 请求; - 等待 Worker 从 Redis 消费任务;
- 断言 fake Feishu 收到飞书
interactivecard。
脚本退出时会自动 docker compose down -v --remove-orphans 清理容器和数据卷。失败时会打印相关容器最近日志,优先看 webhook-service、worker 和 scheduler。
Docker E2E 比普通 pytest 慢,默认不放进快速 CI。发版前、改动迁移/队列/转发链路时应手动跑一遍。
/
├── api/ # API 路由 (webhook, admin, analysis, forwarding)
├── core/ # 基础设施 (config, auth, redis, logger, metrics, broker)
├── services/ # 业务逻辑,按能力分包
│ ├── webhooks/ # 接收、持久化、查询、主处理 Pipeline
│ ├── forwarding/ # 转发规则、外部投递、失败转发补偿
│ ├── analysis/ # AI 分析、降噪、OpenClaw 集成
│ ├── operations/ # TaskIQ 任务、调度入口、恢复/指标/维护任务
│ └── runtime_config/# 运行时配置热更新服务
├── adapters/ # 生态适配器 (多格式归一化)
│ └── plugins/ # 生态适配器插件 (feishu_card)
├── models/ # SQLAlchemy ORM 模型
├── schemas/ # Pydantic 请求/响应 Schema
├── db/ # 数据库连接池与 session 管理
├── alembic/ # Alembic 增量 Schema 迁移
├── prompts/ # AI 提示词模板
├── templates/ # 前端 Dashboard HTML + 静态文件
├── scripts/ # 运维工具脚本
├── tests/ # pytest 测试套件 + Docker E2E
├── docs/ # 功能文档
├── main.py # FastAPI 入口
├── worker.py # TaskIQ Worker 入口
├── Dockerfile # 多阶段构建 (jemalloc + 非 root)
├── docker-compose.yml # 6 服务编排(含 migrate job)
└── .env.example # 配置模板
所有
/api/*端点需要Authorization: Bearer <API_KEY>Header;会修改状态、触发 AI/OpenClaw 或发起外部转发的写接口需要Authorization: Bearer <ADMIN_WRITE_KEY>(未配置时回退到API_KEY)。/webhook端点默认无需鉴权(可通过REQUIRE_WEBHOOK_AUTH=true开启)。
| 方法 | 路径 | 说明 |
|---|---|---|
POST |
/webhook |
接收通用 Webhook(自动检测来源) |
POST |
/webhook/{source} |
接收指定来源的 Webhook |
GET |
/health |
健康检查 |
GET |
/ 或 /dashboard |
Web 管理界面 |
| 方法 | 路径 | 说明 |
|---|---|---|
GET |
/api/webhooks |
分页列举事件(支持 source/importance/status 过滤) |
GET |
/api/webhooks/{id} |
获取单条事件详情(含原始 payload) |
| 方法 | 路径 | 说明 |
|---|---|---|
POST |
/api/deep-analyze/{webhook_id} |
触发 OpenClaw 深度分析(写权限) |
GET |
/api/deep-analyses |
分页列举深度分析记录 |
GET |
/api/deep-analyses/{webhook_id} |
获取某事件的所有分析记录 |
POST |
/api/deep-analyses/{id}/retry |
手动重拉 OpenClaw 分析结果(写权限) |
POST |
/api/deep-analyses/{id}/forward |
手动转发分析结果到指定 URL(写权限) |
POST |
/api/reanalyze/{webhook_id} |
强制重新 AI 分析(写权限) |
POST |
/api/forward/{webhook_id} |
手动触发转发(写权限) |
GET |
/api/ai-usage |
AI 用量 & 成本统计 |
| 方法 | 路径 | 说明 |
|---|---|---|
GET |
/api/forward-rules |
列举所有转发规则 |
POST |
/api/forward-rules |
创建转发规则(写权限) |
PUT |
/api/forward-rules/{id} |
更新转发规则(写权限) |
DELETE |
/api/forward-rules/{id} |
删除转发规则(写权限) |
GET |
/api/failed-forwards |
失败转发审计记录 |
POST |
/api/failed-forwards/{id}/retry |
重置失败转发重试(写权限) |
DELETE |
/api/failed-forwards/{id} |
删除补偿记录(写权限) |
| 方法 | 路径 | 说明 |
|---|---|---|
GET |
/api/config |
查看当前有效配置 |
GET |
/api/config/sources |
查看每个配置 key 的来源(db/env/default)及更新时间 |
POST |
/api/config |
热更新运行时配置(写权限) |
GET |
/api/prompt |
查看当前 AI Prompt |
POST |
/api/prompt/reload |
热重载 Prompt 文件(写权限) |
GET |
/api/admin/dead-letters |
死信队列列表 |
POST |
/api/admin/dead-letters/{id}/replay |
重放单条死信事件(写权限) |
GET |
/api/admin/stuck-events |
列举僵尸事件 |
POST |
/api/admin/stuck-events/{id}/requeue |
重新入队单条僵尸事件(写权限) |
| 方法 | 路径 | 说明 |
|---|---|---|
GET |
/metrics |
Prometheus 格式指标 |
优先级(低 → 高):内置默认值 < .env / 环境变量 < system_configs 数据库表(仅启用运行时配置后参与热更新)
标记
[runtime]的业务策略项支持通过POST /api/config或 Web 界面在线修改,无需重启。连接串、模型基地址、Token/API Key 默认要求通过环境变量或 ConfigMap 修改并滚动重启,避免多进程配置短暂不一致。
| 变量 | 默认值 | 说明 |
|---|---|---|
API_KEY |
— | 管理 API 鉴权 Token(生产必须设置) |
ADMIN_WRITE_KEY |
— | 写操作单独 Key(为空则回退到 API_KEY) |
REQUIRE_WEBHOOK_AUTH |
true |
生产环境默认要求 Webhook 签名或 Token 鉴权 |
ALLOW_UNAUTHENTICATED_WEBHOOK |
false |
显式允许生产环境公开接收 Webhook(不推荐) |
WEBHOOK_RATE_LIMIT_PER_MINUTE |
600 |
按客户端 IP 限流;设为 0 表示关闭 |
DATABASE_URL |
postgresql://... |
PostgreSQL 连接串 |
REDIS_URL |
redis://localhost:6379/0 |
Redis 连接串 |
RUN_MODE |
api |
api / worker / scheduler / all / lite;lite 单 API 进程内处理,不依赖 Redis/TaskIQ |
ENABLE_RUNTIME_CONFIG |
false(生产) |
启用 DB/Redis 运行时业务策略配置 |
ALLOW_RUNTIME_CONNECTION_CONFIG |
false |
允许连接/密钥类配置热更新;生产不建议开启 |
API_WORKERS |
4 |
RUN_MODE=all 时 API Gunicorn worker 数 |
WORKER_METRICS_PORT |
9001 |
Compose 中独立 Worker 容器的后台任务指标端口 |
DB_POOL_SIZE |
5 |
单进程数据库连接池大小 |
DB_STATEMENT_TIMEOUT_MS |
30000 |
SQL 语句超时(毫秒) |
LOG_LEVEL |
INFO |
项目业务日志级别(webhook_service、config、db、models 等) |
THIRD_PARTY_LOG_LEVEL |
WARNING |
第三方/框架日志级别(TaskIQ、httpx、uvicorn、gunicorn 等) |
LOG_FILE |
— | 日志文件路径(为空则仅控制台输出) |
RECOVERY_SCAN_INTERVAL_SECONDS |
300 |
recovery-only DB 兜底扫描间隔;正常路径走 Redis/TaskIQ |
MAX_CONCURRENT_WEBHOOK_TASKS |
30 |
所有 Worker 全局 Webhook 处理并发上限(Redis 分布式令牌) |
WEBHOOK_TASK_SLOT_LEASE_SECONDS |
1800 |
全局并发令牌租约秒数,长任务会自动续期 |
RECOVERY_POLLER_STUCK_THRESHOLD_SECONDS |
300 |
僵尸事件判定阈值(秒) |
WEBHOOK_SECRET |
— | HMAC-SHA256 签名校验密钥 |
| 变量 | 默认值 | 说明 |
|---|---|---|
ENABLE_AI_ANALYSIS |
true |
[runtime] 开启 AI 分析 |
OPENAI_API_KEY |
— | LLM 提供商 API Key;默认需重启生效 |
OPENAI_API_URL |
OpenRouter | LLM API 基地址;默认需重启生效 |
OPENAI_MODEL |
anthropic/claude-sonnet-4 |
[runtime] 使用的模型 |
AI_SYSTEM_PROMPT |
内置 | [runtime] 系统级 Prompt |
AI_USER_PROMPT_FILE |
prompts/webhook_analysis_detailed.txt |
用户 Prompt 模板文件路径 |
CACHE_ENABLED |
true |
分析结果 Redis 缓存 |
ANALYSIS_CACHE_TTL |
21600 |
缓存有效期(秒,默认 6h) |
| 变量 | 默认值 | 说明 |
|---|---|---|
DUPLICATE_ALERT_TIME_WINDOW |
24 |
去重时间窗口(小时) |
REANALYZE_AFTER_TIME_WINDOW |
false |
超窗后是否重新 AI 分析 |
FORWARD_AFTER_TIME_WINDOW |
false |
超窗后是否重新转发 |
ENABLE_ALERT_NOISE_REDUCTION |
true |
开启混合相似度降噪 |
NOISE_REDUCTION_WINDOW_MINUTES |
5 |
相似度比对时间窗口(分钟) |
ROOT_CAUSE_MIN_CONFIDENCE |
0.65 |
根因判定置信度阈值 |
NOISE_RELATED_MIN_CONFIDENCE |
0.35 |
近邻告警纳入关联集合的最低置信度 |
NOISE_*_WEIGHT |
见 core/config.py |
source/resource/semantic/severity/time 评分权重 |
SUPPRESS_DERIVED_ALERT_FORWARD |
true |
抑制衍生告警的转发 |
| 变量 | 默认值 | 说明 |
|---|---|---|
ENABLE_FORWARD |
true |
开启自动转发 |
FORWARD_URL |
— | 默认转发目标 URL |
ENABLE_FORWARD_RETRY |
true |
失败转发自动重试 |
FORWARD_RETRY_MAX_RETRIES |
3 |
最大重试次数 |
FORWARD_RETRY_INITIAL_DELAY |
60 |
初始重试延迟(秒) |
FORWARD_MAX_DELIVERY_AGE_SECONDS |
1800 |
Outbox 最大投递年龄;超龄标记 expired 不再发送,0 表示关闭 |
WEBHOOK_RETRY_MAX_RETRIES |
5 |
Webhook 主处理最大重试次数 |
PROCESSING_LOCK_DISTRIBUTED_ENABLED |
true |
同一 alert_hash 跨 Worker 串行处理 |
PROCESSING_LOCK_TTL_SECONDS |
180 |
分布式处理锁 TTL,会自动续期 |
PROCESSING_LOCK_WAIT_TIMEOUT_SECONDS |
15 |
等待同类告警处理锁的最长秒数 |
PROCESSING_LOCK_FAILFAST_THRESHOLD |
20 |
短窗口内同类告警超过阈值后直接背压 |
| 变量 | 默认值 | 说明 |
|---|---|---|
OPENCLAW_ENABLED |
false |
[runtime] 启用 OpenClaw 引擎 |
OPENCLAW_GATEWAY_URL |
— | OpenClaw 网关地址;默认需重启生效 |
OPENCLAW_GATEWAY_TOKEN |
— | 认证 Token;默认需重启生效 |
OPENCLAW_HOOKS_TOKEN |
— | Hook 认证 Token;默认需重启生效 |
OPENCLAW_HTTP_API_URL |
http://127.0.0.1:8085 |
OpenClaw HTTP 查询地址;默认需重启生效 |
OPENCLAW_TIMEOUT_SECONDS |
900 |
[runtime] 分析超时(秒) |
OPENCLAW_POLL_INITIAL_DELAY_SECONDS |
10 |
[runtime] OpenClaw 首次结果拉取延迟 |
OPENCLAW_POLL_MAX_DELAY_SECONDS |
120 |
[runtime] OpenClaw 轮询指数退避最大延迟 |
OPENCLAW_POLL_BACKOFF_MULTIPLIER |
2.0 |
[runtime] OpenClaw 轮询退避倍率 |
OPENCLAW_POLL_TIMEOUT |
180 |
[runtime] 单次轮询请求超时 |
DEEP_ANALYSIS_FEISHU_WEBHOOK |
— | 深度分析完成后推送的飞书 Webhook URL |
服务在 /metrics 暴露以下核心指标:
| 指标名 | 类型 | 说明 |
|---|---|---|
webhook_received_total |
Counter | 接收 Webhook 总量(按 source/status) |
webhook_processing_status_total |
Counter | Pipeline 处理结果计数 |
webhook_processing_duration_seconds |
Histogram | Pipeline 处理耗时分布 |
webhook_noise_reduced_total |
Counter | 降噪告警数(按 source/relation_type/suppressed) |
webhook_storm_suppressed_total |
Counter | 告警风暴触发抑制/聚合次数 |
webhook_running_tasks |
Gauge | 当前活跃的 Webhook 后台处理任务数 |
webhook_recovery_polled_total |
Counter | Recovery 扫描处理的僵尸事件数 |
ai_tokens_total |
Counter | Token 消耗量(按 model/token_type) |
ai_cost_usd_total |
Counter | 累计 AI 成本(美元) |
ai_analysis_duration_seconds |
Histogram | AI 分析耗时(按 source/engine) |
db_queue_pending |
Gauge | legacy DB 事件路径待恢复记录数 |
forward_retry_pending |
Gauge | DB 中待重试转发审计记录数 |
- 所有管理 API 需
Authorization: Bearer <API_KEY>Header - 写操作(配置变更、重放事件)可配置独立
ADMIN_WRITE_KEY - 支持接收端 HMAC-SHA256 签名校验(
WEBHOOK_SECRET) - 内置熔断器(Circuit Breaker)防止下游故障雪崩
- Docker 容器以非 root 用户(appuser:1000)运行
详细功能文档见 docs/:
MIT License