feat: 数据管道注册表+宏观风险集成+管道完整性审计
1. 新增 data/pipeline_registry.json — 数据管道注册表
- 记录每条数据流的source→consumer→end_user
- verified字段标记是否验证通过
- 新增组件时必须在此注册,否则体检会报未验证
2. 宏观预警集成(修复管道缺口)
- intraday_health_check: 读macro_risk_state.json,HIGH风险时报警
- morning_health_check: 新增管道注册表审计项
读pipeline_registry.json,未验证的管道标为异常
3. 管道完整性自发现机制
注册表中verified=false的管道会在每日体检中标记
逼迫新增组件时注册管道,否则睡不好觉
This commit is contained in:
@@ -118,6 +118,22 @@ def check_signal_pipeline():
|
|||||||
pass
|
pass
|
||||||
log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30)")
|
log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30)")
|
||||||
|
|
||||||
|
# 宏观风险状态检查
|
||||||
|
try:
|
||||||
|
risk_path = DATA / "macro_risk_state.json"
|
||||||
|
if risk_path.exists():
|
||||||
|
risk = json.loads(risk_path.read_text())
|
||||||
|
level = risk.get("level", "none")
|
||||||
|
reason = risk.get("reason", "")
|
||||||
|
if level == "high":
|
||||||
|
log(False, f"🔴 宏观风险HIGH: {reason[:80]}")
|
||||||
|
elif level == "medium":
|
||||||
|
log(True, f"⚠️ 宏观风险MEDIUM: {reason[:80]}")
|
||||||
|
else:
|
||||||
|
log(True, "无宏观风险状态文件(可能未生成)")
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def write_todos():
|
def write_todos():
|
||||||
if not ISSUES:
|
if not ISSUES:
|
||||||
|
|||||||
@@ -604,11 +604,27 @@ def run_check(item):
|
|||||||
elif check_spec == "meta:checklist_completeness":
|
elif check_spec == "meta:checklist_completeness":
|
||||||
ok, detail = check_meta_checklist_completeness()
|
ok, detail = check_meta_checklist_completeness()
|
||||||
elif check_spec == "pipeline:xiaoguo_signal_flow":
|
elif check_spec == "pipeline:xiaoguo_signal_flow":
|
||||||
# 综合检查:小果有数据→被我处理
|
|
||||||
today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0)
|
today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0)
|
||||||
unproc, d2 = check_db_table_count("signal_news", None, None, "unprocessed", 30)
|
unproc, d2 = check_db_table_count("signal_news", None, None, "unprocessed", 30)
|
||||||
ok = today_xiaoguo or unproc
|
ok = today_xiaoguo or unproc
|
||||||
detail = f"today_xiaoguo={d1}, unprocessed={d2}"
|
detail = f"today_xiaoguo={d1}, unprocessed={d2}"
|
||||||
|
elif check_spec == "pipeline:registry_audit":
|
||||||
|
ok = True
|
||||||
|
gaps = []
|
||||||
|
try:
|
||||||
|
import json as j2
|
||||||
|
reg = j2.loads(open(str(DATA / "pipeline_registry.json")).read())
|
||||||
|
for p in reg.get("pipelines", []):
|
||||||
|
if not p.get("verified"):
|
||||||
|
gaps.append(p["name"])
|
||||||
|
if gaps:
|
||||||
|
ok = False
|
||||||
|
detail = f"{len(gaps)}条管道未验证: {', '.join(gaps[:5])}"
|
||||||
|
else:
|
||||||
|
detail = f"全部{len(reg['pipelines'])}条管道正常"
|
||||||
|
except Exception as e:
|
||||||
|
ok = True
|
||||||
|
detail = f"注册表不可读({str(e)[:60]})"
|
||||||
else:
|
else:
|
||||||
ok = False
|
ok = False
|
||||||
detail = f"unknown_check:{check_spec}"
|
detail = f"unknown_check:{check_spec}"
|
||||||
|
|||||||
Reference in New Issue
Block a user