Files
MoFin/scripts/macro_context_collector.py
知微 7c0e85af28 硬性策略质量门禁 validate_strategy()
新增 STRATEGY_QUALITY_GATES 检查清单(9条红线):
CRITICAL: 止损/止盈存在+>0, 买入区下沿<上沿
HIGH: 止损≤买入区, 买入推荐含RR≥1.5, 港股标currency=HKD
MEDIUM: signal短词, tech_snapshot含技术位

enforce_strategy_quality() 插在写入链的两处:
1. reassess_with_context() return前 → 单只重评必过
2. regenerate_all() for d in decisions: 写DB前 → 批量重评必过

不过的:status=review_needed, signal降级→信号不充分
不会写进DB/JSON,除非修复了CRITICAL问题
2026-07-02 13:46:53 +08:00

267 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
macro_context_collector.py — 宏观新闻采集器+实时红绿灯(no_agent)
采集来源:
1. akshare.stock_info_global_em() — 东方财富全球宏观新闻(200条实时)
核心功能:
A. 采集新闻→macro_raw_news(去重入库)
B. 实时红绿灯检测——用关键词规则快速识别HIGH/MEDIUM风险
- 不等LLM,采集完立刻判断
- 检测到风险→写入signal_news+macro_risk_state.json
- 盘中的监控cron(每15-25分)读到后立刻调整策略
红绿灯规则:
HIGH(单条就触发): 全球巨头+核心产业负面/美联储意外/指数暴跌/地缘冲突
MEDIUM(累计2条触发): 常规宏观事件/板块级/资金面
"""
import sys, json, hashlib, os, re
from datetime import datetime
from pathlib import Path
DATA_DIR = Path("/home/hmo/MoFin/data")
DB_PATH = DATA_DIR / "mofin.db"
STATE_PATH = DATA_DIR / "macro_risk_state.json"
# ── 红绿灯 关键词规则 ──
# HIGH: 任何一条匹配 → 立即 HIGH 预警
HIGH_PATTERNS = [
# 全球巨头+核心产业
r"苹果[^。]*(?:涨价|降价|推迟|取消|禁|制裁|调查|召回|大跌|暴跌)",
r"openai[^。]*(?:推迟|取消|风险|调查|起诉|倒闭|ipo)",
r"(?:英伟达|nvidia)[^。]*(?:跌|调查|制裁|推迟|禁令)",
r"台积电[^。]*(?:跌|推迟|取消|地震|火灾|禁)",
r"特斯拉.*(?:暴跌|召回|调查|破产|禁)",
# 美联储/央行意外
r"美联储.*(?:意外|紧急|缩表|风暴|警告|超预期|加息\s*50|降息\s*50|紧急\s*(?:会议|声明))",
r"美联储.*(?:利率|决议).*(?:超预期|意外|紧急)",
r"fed.*(?:emergency|unexpected|surprise|hike|cut)",
# 指数暴跌(需 ≥2% 跌幅或使用更强范围词)
r"指数[^。]*?(?:暴跌|熔断|闪崩|重挫)",
r"指数[^。]*?(?:跌幅[^。]{0,20}(?:扩大至|达|至|超|为|逾)[^。]*?(?<![0-9.])(?:[2-9]|[1-9][0-9])(?:\.\d+)?%|下跌(?!.*?涨)[^。]*?(?<![0-9.])(?:[2-9]|[1-9][0-9])(?:\.\d+)?%)",
r"(?:暴跌|重挫|熔断).*?(?<![0-9.])(?:[5-9]|[1-9][0-9])(?:\.\d+)?%",
r"熔断|闪崩",
# 地缘+贸易
r"关税.*(?:升级|新|报复|制裁)",
r"制裁.*(?:新|升级|全面)",
r"战争|开战|入侵|核(?:威胁|武器|弹头|试验|攻击|冲突|导弹|战争|潜艇|问题|危机|设施)|导弹[^。]*(?:发射|袭击|攻击)",
# 系统性能源
r"原油.*(?:跌破|暴跌|崩盘|断供)",
r"石油.*(?:禁运|制裁|断供)",
r"能源危机|粮食危机",
# 系统金融
r"银行.*(?:倒闭|挤兑|破产|接管|危机)",
r"金融危机(?:风险|爆发|蔓延|冲击|预警|警示|逼近|担忧|席卷|升级|恐慌|进入|出现|形成|即将|来袭|警报|当前|新一轮|全面|全球性)|债务危机|违约潮|系统性(?:风险|危机)",
# AI/科技板块重挫
r"半导体.*(?:暴跌|熔断|崩盘|跌幅)",
r"科技股.*(?:暴跌|熔断|崩盘|重挫)",
r"(?:费城半导体|sox)[^。]*?(?:跌|崩)",
]
# MEDIUM: 累计匹配2条以上 → MEDIUM 预警
MEDIUM_PATTERNS = [
r"加息|降息",
r"通胀|CPI|PPI",
r"汇率.*(?:大幅|波动|贬值|升值)",
r"外资.*(?:流出|撤离|减持)",
r"北向.*(?:流出|净卖出|大幅)",
r"季末|年末|半年末|资金回笼|流动性紧张",
r"解禁.*(?:大额|巨量|千亿)",
r"大跌|重挫|杀跌|恐慌",
r"评级.*(?:下调|负面|降级)",
r"预警|风险提示|谨慎",
r"期货.*(?:暴跌|跌停|熔断)",
r"黑天鹅|灰犀牛",
]
# ── 模式完整性校验(防 .pyc 缓存/版本回退) ──
# 直接检查关键特征字符串是否存在于模式中
# 原理: 旧版有 standalone |核| , 新版有 核(?:威胁
# 旧版有 银行.*倒闭|挤兑|破产 , 新版有 银行.*(?:倒闭|挤兑|破产
_PATTERN_CHECKS = {
8: ["暴跌|熔断|闪崩|重挫"], # index pattern must use strong crash words, not "跌幅"
9: ["(?<![0-9.])", "(?:扩大至|达|至|超|为|逾)"], # new: negative lookbehind for decimals + measurement words for 跌幅
14: ["核(?:威胁", "核威胁|武器|弹头"], # must NOT have standalone 核
18: ["倒闭|挤兑|破产"], # bank pattern must have crisis keywords
19: ["金融危机(?:风险", "危机|债务危机"], # must NOT have standalone 金融危机
}
_KNOWN_BAD_SIGS = {
# Known stale .pyc signature fragments that indicate wrong version
"指数.*跌幅": "旧版用 .* 跨句匹配且无 ≥2% 阈值",
"[2-9]%(?!\")": "旧版 pattern 9 无上下文限制和阈值修复(新版用 (?:[2-9]|[1-9][0-9])(?:\\.\\d+)?% 替代 [2-9]%",
"|核|": "旧版有独立单字核",
"英伟达|nvidia.*跌": "旧版 alternation 分组错误",
"导弹.*发射": "旧版只匹配发射不匹配袭击",
"|金融危机|": "旧版 standalone 金融危机匹配历史参照",
}
for idx, required_list in _PATTERN_CHECKS.items():
if not any(req in HIGH_PATTERNS[idx] for req in required_list):
print(f"[MACRO-安全] ⚠️ HIGH_PATTERNS[{idx}] 签名不匹配!")
print(f"[MACRO-安全] 当前: {HIGH_PATTERNS[idx][:100]}")
print(f"[MACRO-安全] 预期应包含: {required_list[0]}")
print(f"[MACRO-安全] 可能原因: .pyc 缓存过期 / 回退到旧版本")
# 额外扫描:检查是否有已知的旧版签名残留
_all_patterns_text = "\n".join(HIGH_PATTERNS)
for bad_sig, reason in _KNOWN_BAD_SIGS.items():
if bad_sig in _all_patterns_text:
print(f"[MACRO-安全] ⚠️ 检测到旧版模式签名 '{bad_sig}' ({reason})")
print(f"[MACRO-安全] .pyc 缓存可能未刷新,当前 HIGH_PATTERNS 可能仍为旧版本")
def ensure_tables(conn):
conn.execute("""
CREATE TABLE IF NOT EXISTS macro_raw_news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
summary TEXT DEFAULT '',
url TEXT DEFAULT '',
source_ts TEXT DEFAULT '',
fetched_at TEXT DEFAULT (datetime('now','localtime')),
risk_level TEXT DEFAULT 'unassessed',
risk_reason TEXT DEFAULT ''
)
""")
conn.commit()
def fetch_news():
try:
import akshare as ak
df = ak.stock_info_global_em()
items = []
for _, row in df.iterrows():
items.append({
"title": str(row.get("标题", "")),
"summary": str(row.get("摘要", "")),
"url": str(row.get("链接", "")),
"source_ts": str(row.get("发布时间", "")),
})
return items
except ImportError:
print("[MACRO] akshare 未安装", file=sys.stderr)
return []
except Exception as e:
print(f"[MACRO] 采集失败: {e}", file=sys.stderr)
return []
def title_hash(title):
return hashlib.md5(title.encode()).hexdigest()[:16]
def quick_risk_check(items):
"""红绿灯:快速关键词检测,返回 (level, matched_articles, summary)"""
hits_high = []
hits_medium = []
for item in items:
text = (item["title"] + " " + item["summary"]).lower()
for pattern in HIGH_PATTERNS:
if re.search(pattern, text):
hits_high.append(item)
break
else:
# 没进 HIGH 才检查 MEDIUM
for pattern in MEDIUM_PATTERNS:
if re.search(pattern, text):
hits_medium.append(item)
break
level = "none"
summary = ""
matched = []
if hits_high:
level = "high"
matched = hits_high[:5]
titles = [f"· {h['title'][:50]}" for h in hits_high[:5]]
summary = f"【高风险】{len(hits_high)}条紧急信号:\n" + "\n".join(titles)
elif len(hits_medium) >= 2:
level = "medium"
matched = hits_medium[:5]
titles = [f"· {h['title'][:50]}" for h in hits_medium[:5]]
summary = f"【中风险】{len(hits_medium)}条预警信号:\n" + "\n".join(titles)
return level, matched, summary
def write_risk_signal(conn, level, matched, summary):
"""写入 signal_news + macro_risk_state.json"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sentiment_map = {"high": "宏观-WATCH_HIGH", "medium": "宏观-WATCH_MEDIUM"}
sentiment = sentiment_map.get(level, "")
# 写入 signal_news
articles_json = json.dumps([{"title": a["title"][:80], "summary": a["summary"][:120]} for a in matched], ensure_ascii=False)
conn.execute(
"INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) VALUES (0, ?, ?, ?, ?, '', ?)",
("宏观", sentiment, summary[:500], articles_json, "macro_watch")
)
conn.commit()
# 写入状态文件(供监控cron实时读取)
state = {
"level": level,
"signals": [{"sentiment": sentiment, "summary": summary[:300], "key_articles": articles_json, "created_at": now}],
"signal_count": len(matched),
"created_at": now,
"expired": False,
"source": "collector_realtime",
}
STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2))
def main():
conn = sqlite3.connect(str(DB_PATH))
ensure_tables(conn)
# 去重基础
existing = set()
for row in conn.execute("SELECT title FROM macro_raw_news ORDER BY id DESC LIMIT 200"):
existing.add(title_hash(row[0]))
items = fetch_news()
if not items:
conn.close()
return
# 去重写入
new_items = []
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for item in items:
if not item["title"].strip():
continue
h = title_hash(item["title"])
if h in existing:
continue
existing.add(h)
try:
conn.execute(
"INSERT INTO macro_raw_news (title, summary, url, source_ts, fetched_at) VALUES (?, ?, ?, ?, ?)",
(item["title"][:300], item["summary"][:500], item["url"][:500], item["source_ts"][:20], now_str)
)
new_items.append(item)
except Exception:
pass
conn.commit()
# 红绿灯检测(只针对新采集的)
level, matched, summary = "none", [], ""
if new_items:
level, matched, summary = quick_risk_check(new_items)
if level in ("high", "medium"):
write_risk_signal(conn, level, matched, summary)
conn.close()
# no_agent 输出
if new_items:
print(f"[MACRO] {len(new_items)}条新宏观新闻")
# HIGH风险:输出预警(会推送到XMPP)
if level == "high":
print(f"⚠️ HIGH风险预警 ({len(matched)}条): {summary[:200]}")
elif level == "medium":
print(f" MEDIUM风险: {len(matched)}条匹配")
if __name__ == "__main__":
import sqlite3
main()