Files
MoFin/scripts/macro_context_collector.py
T
知微 6a97d93018 现金更正 + 法拉电子清仓记录
截图确认:
- 可用资金 92,664.20(含天添利)
- 冻结 39,481.40
- 总现金 132,145.60
- 总资产 = 持仓市值1,107,670 + 现金132,145.60 = 1,239,815.60

法拉电子 189.20卖出100股已记录
2026-06-29 12:40:50 +08:00

234 lines
8.8 KiB
Python

#!/usr/bin/env python3
"""
macro_context_collector.py — 宏观新闻采集器+实时红绿灯(no_agent)
采集来源:
1. akshare.stock_info_global_em() — 东方财富全球宏观新闻(200条实时)
核心功能:
A. 采集新闻→macro_raw_news(去重入库)
B. 实时红绿灯检测——用关键词规则快速识别HIGH/MEDIUM风险
- 不等LLM,采集完立刻判断
- 检测到风险→写入signal_news+macro_risk_state.json
- 盘中的监控cron(每15-25分)读到后立刻调整策略
红绿灯规则:
HIGH(单条就触发): 全球巨头+核心产业负面/美联储意外/指数暴跌/地缘冲突
MEDIUM(累计2条触发): 常规宏观事件/板块级/资金面
"""
import sys, json, hashlib, os, re
from datetime import datetime
from pathlib import Path
DATA_DIR = Path(__file__).parent.parent / "data"
DB_PATH = DATA_DIR / "mofin.db"
STATE_PATH = DATA_DIR / "macro_risk_state.json"
# ── 红绿灯 关键词规则 ──
# HIGH: 任何一条匹配 → 立即 HIGH 预警
HIGH_PATTERNS = [
# 全球巨头+核心产业
# 苹果: 排除合作/采购类新闻(如'苹果牵手长鑫存储,MLCC涨价'→非风险)
r"苹果(?!.*?(?:牵手|合作|联合|携手|助力|入驻|投资|设立|引入)).*?(?:涨价|降价|推迟|取消|禁|制裁|调查|召回|大跌|暴跌)",
r"openai.*(?:推迟|取消|风险|调查|起诉|倒闭|ipo)",
r"(?:英伟达|nvidia).*(?:跌|调查|制裁|推迟|禁令)",
r"台积电.*(?:跌|推迟|取消|地震|火灾|禁)",
r"特斯拉.*(?:暴跌|召回|调查|破产|禁)",
# 美联储/央行意外
r"美联储.*(?:意外|紧急|缩表|风暴|警告|超预期|加息\s*50|降息\s*50|紧急\s*(?:会议|声明))",
r"美联储.*(?:利率|决议).*(?:超预期|意外|紧急)",
r"fed.*(?:emergency|unexpected|surprise|hike|cut)",
# 指数暴跌(针对主要指数,避免'板块指数跌幅居前'等非风险匹配)
r"(?:上证|深证|创业板|科创|恒生指数|恒指|日经|KOSPI|道指|纳指|标普500|沪深300).*(?:暴跌|重挫|熔断|闪崩|跳水|跌幅(?!.*?居前))",
r"指数.*(?:暴跌|熔断|闪崩)",
r"熔断|闪崩",
# 地缘+贸易
r"关税.*(?:升级|新|报复|制裁)",
r"制裁.*(?:新|升级|全面)",
# 战争/地缘: 移除过宽的'核'/'战争'独立匹配,避免'核聚变''硬核科技''核心''贸易战'误触
r"(?:地缘|边境|朝鲜|伊朗).*(?:冲突|风险|升级|紧张|军事行动|交火)|开战|开火|空袭|轰炸|入侵|击落|军事(?:冲突|行动|升级|打击|对抗|演习|部署)|核(?:威胁|武器|弹头|试验|攻击|冲突|导弹|战争|潜艇|问题|危机|设施)|导弹.*(?:发射|试射|打击)",
# 系统性能源
r"原油.*(?:跌破|暴跌|崩盘|断供)",
r"石油.*(?:禁运|制裁|断供)",
r"能源危机|粮食危机",
# 系统金融
r"银行.*(?:倒闭|挤兑|破产|接管|危机)",
r"金融危机|债务危机|违约潮|系统性(?:风险|危机|金融|下跌|崩塌)",
# AI/科技板块重挫
r"半导体.*(?:暴跌|熔断|崩盘|跌幅)",
r"科技股.*(?:暴跌|熔断|崩盘|重挫)",
r"费城半导体|sox.*(?:跌|崩)",
]
# MEDIUM: 累计匹配2条以上 → MEDIUM 预警
MEDIUM_PATTERNS = [
r"加息|降息",
r"通胀|CPI|PPI",
r"汇率.*(?:大幅|波动|贬值|升值)",
r"外资.*(?:流出|撤离|减持)",
r"北向.*(?:流出|净卖出|大幅)",
r"季末|年末|半年末|资金回笼|流动性紧张",
r"解禁.*(?:大额|巨量|千亿)",
r"大跌|重挫|杀跌|恐慌",
r"评级.*(?:下调|负面|降级)",
r"预警|风险提示|谨慎",
r"期货.*(?:暴跌|跌停|熔断)",
r"黑天鹅|灰犀牛",
]
def ensure_tables(conn):
conn.execute("""
CREATE TABLE IF NOT EXISTS macro_raw_news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
summary TEXT DEFAULT '',
url TEXT DEFAULT '',
source_ts TEXT DEFAULT '',
fetched_at TEXT DEFAULT (datetime('now','localtime')),
risk_level TEXT DEFAULT 'unassessed',
risk_reason TEXT DEFAULT ''
)
""")
conn.commit()
def fetch_news():
try:
import akshare as ak
df = ak.stock_info_global_em()
items = []
for _, row in df.iterrows():
items.append({
"title": str(row.get("标题", "")),
"summary": str(row.get("摘要", "")),
"url": str(row.get("链接", "")),
"source_ts": str(row.get("发布时间", "")),
})
return items
except ImportError:
print("[MACRO] akshare 未安装", file=sys.stderr)
return []
except Exception as e:
print(f"[MACRO] 采集失败: {e}", file=sys.stderr)
return []
def title_hash(title):
return hashlib.md5(title.encode()).hexdigest()[:16]
def quick_risk_check(items):
"""红绿灯:快速关键词检测,返回 (level, matched_articles, summary)"""
hits_high = []
hits_medium = []
for item in items:
text = (item["title"] + " " + item["summary"]).lower()
for pattern in HIGH_PATTERNS:
if re.search(pattern, text):
hits_high.append(item)
break
else:
# 没进 HIGH 才检查 MEDIUM
for pattern in MEDIUM_PATTERNS:
if re.search(pattern, text):
hits_medium.append(item)
break
level = "none"
summary = ""
matched = []
if hits_high:
level = "high"
matched = hits_high[:5]
titles = [f"· {h['title'][:50]}" for h in hits_high[:5]]
summary = f"【高风险】{len(hits_high)}条紧急信号:\n" + "\n".join(titles)
elif len(hits_medium) >= 2:
level = "medium"
matched = hits_medium[:5]
titles = [f"· {h['title'][:50]}" for h in hits_medium[:5]]
summary = f"【中风险】{len(hits_medium)}条预警信号:\n" + "\n".join(titles)
return level, matched, summary
def write_risk_signal(conn, level, matched, summary):
"""写入 signal_news + macro_risk_state.json"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sentiment_map = {"high": "宏观-WATCH_HIGH", "medium": "宏观-WATCH_MEDIUM"}
sentiment = sentiment_map.get(level, "")
# 写入 signal_news
articles_json = json.dumps([{"title": a["title"][:80], "summary": a["summary"][:120]} for a in matched], ensure_ascii=False)
conn.execute(
"INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) VALUES (0, ?, ?, ?, ?, '', ?)",
("宏观", sentiment, summary[:500], articles_json, "macro_watch")
)
conn.commit()
# 写入状态文件(供监控cron实时读取)
state = {
"level": level,
"signals": [{"sentiment": sentiment, "summary": summary[:300], "key_articles": articles_json, "created_at": now}],
"signal_count": len(matched),
"created_at": now,
"expired": False,
"source": "collector_realtime",
}
STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2))
def main():
conn = sqlite3.connect(str(DB_PATH))
ensure_tables(conn)
# 去重基础
existing = set()
for row in conn.execute("SELECT title FROM macro_raw_news ORDER BY id DESC LIMIT 200"):
existing.add(title_hash(row[0]))
items = fetch_news()
if not items:
conn.close()
return
# 去重写入
new_items = []
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for item in items:
if not item["title"].strip():
continue
h = title_hash(item["title"])
if h in existing:
continue
existing.add(h)
try:
conn.execute(
"INSERT INTO macro_raw_news (title, summary, url, source_ts, fetched_at) VALUES (?, ?, ?, ?, ?)",
(item["title"][:300], item["summary"][:500], item["url"][:500], item["source_ts"][:20], now_str)
)
new_items.append(item)
except Exception:
pass
conn.commit()
# 红绿灯检测(只针对新采集的)
level, matched, summary = "none", [], ""
if new_items:
level, matched, summary = quick_risk_check(new_items)
if level in ("high", "medium"):
write_risk_signal(conn, level, matched, summary)
conn.close()
# no_agent 输出
if new_items:
print(f"[MACRO] {len(new_items)}条新宏观新闻")
# HIGH风险:输出预警(会推送到XMPP)
if level == "high":
print(f"⚠️ HIGH风险预警 ({len(matched)}条): {summary[:200]}")
elif level == "medium":
print(f" MEDIUM风险: {len(matched)}条匹配")
if __name__ == "__main__":
import sqlite3
main()