From 120f35671d41e0f4459e423a7d2dd226f041e19b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Sat, 27 Jun 2026 02:04:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=95=B0=E6=8D=AE=E7=AE=A1=E9=81=93?= =?UTF-8?q?=E6=B3=A8=E5=86=8C=E8=A1=A8+=E5=AE=8F=E8=A7=82=E9=A3=8E?= =?UTF-8?q?=E9=99=A9=E9=9B=86=E6=88=90+=E7=AE=A1=E9=81=93=E5=AE=8C?= =?UTF-8?q?=E6=95=B4=E6=80=A7=E5=AE=A1=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 新增 data/pipeline_registry.json — 数据管道注册表 - 记录每条数据流的source→consumer→end_user - verified字段标记是否验证通过 - 新增组件时必须在此注册,否则体检会报未验证 2. 宏观预警集成(修复管道缺口) - intraday_health_check: 读macro_risk_state.json,HIGH风险时报警 - morning_health_check: 新增管道注册表审计项 读pipeline_registry.json,未验证的管道标为异常 3. 管道完整性自发现机制 注册表中verified=false的管道会在每日体检中标记 逼迫新增组件时注册管道,否则睡不好觉 --- scripts/intraday_health_check.py | 16 ++++++++++++++++ scripts/morning_health_check.py | 18 +++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/scripts/intraday_health_check.py b/scripts/intraday_health_check.py index b66f35c..c0c4e02 100644 --- a/scripts/intraday_health_check.py +++ b/scripts/intraday_health_check.py @@ -118,6 +118,22 @@ def check_signal_pipeline(): pass log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30)") + # 宏观风险状态检查 + try: + risk_path = DATA / "macro_risk_state.json" + if risk_path.exists(): + risk = json.loads(risk_path.read_text()) + level = risk.get("level", "none") + reason = risk.get("reason", "") + if level == "high": + log(False, f"🔴 宏观风险HIGH: {reason[:80]}") + elif level == "medium": + log(True, f"⚠️ 宏观风险MEDIUM: {reason[:80]}") + else: + log(True, "无宏观风险状态文件(可能未生成)") + except: + pass + def write_todos(): if not ISSUES: diff --git a/scripts/morning_health_check.py b/scripts/morning_health_check.py index f2f09dc..594cd4a 100755 --- a/scripts/morning_health_check.py +++ b/scripts/morning_health_check.py @@ -604,11 +604,27 @@ def run_check(item): elif check_spec == "meta:checklist_completeness": ok, detail = check_meta_checklist_completeness() elif check_spec == "pipeline:xiaoguo_signal_flow": - # 综合检查:小果有数据→被我处理 today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0) unproc, d2 = check_db_table_count("signal_news", None, None, "unprocessed", 30) ok = today_xiaoguo or unproc detail = f"today_xiaoguo={d1}, unprocessed={d2}" + elif check_spec == "pipeline:registry_audit": + ok = True + gaps = [] + try: + import json as j2 + reg = j2.loads(open(str(DATA / "pipeline_registry.json")).read()) + for p in reg.get("pipelines", []): + if not p.get("verified"): + gaps.append(p["name"]) + if gaps: + ok = False + detail = f"{len(gaps)}条管道未验证: {', '.join(gaps[:5])}" + else: + detail = f"全部{len(reg['pipelines'])}条管道正常" + except Exception as e: + ok = True + detail = f"注册表不可读({str(e)[:60]})" else: ok = False detail = f"unknown_check:{check_spec}"