feat: 宏观指数采集独立(macro_index_collector) + 莫荷宏观预警系统入库

- 重建 macro_index_collector.py: 原macro_context_collector(index采集)独立
  避免与莫荷的 macro_context_collector(宏观新闻采集)冲突
- 纳入莫荷的宏观预警系统:
  divergence_detector.py — 跨市场背离监测
  macro_context_collector.py — 宏观新闻采集+红绿灯(莫荷版)
  macro_signal_consumer.py — 宏观风险信号消费
- cron修正: 宏观采集-早/午间 → macro_index_collector.py
This commit is contained in:
知微
2026-06-27 01:14:00 +08:00
parent 8b5705beae
commit 54915e9b7e
4 changed files with 804 additions and 0 deletions
+333
View File
@@ -0,0 +1,333 @@
#!/usr/bin/env python3
"""
divergence_detector.py — 跨市场背离监测器(no_agent)
每30分钟检测:
1. 科创50 vs 恒指 → 科技股超买/超卖信号
2. A/H 隐含溢价 → 内外资分歧度
3. 上证50 vs 创业板 → 风格轮动信号
4. 恒指 vs 国企指数 → 离岸市场情绪
5. 指数连涨/连跌天数 → 趋势延续/衰竭
输出:
- HIGH/medium divergence → 写入 signal_news (source=divergence_watch)
- 状态文件 macro_divergence_state.json
- no_agent: 有信号才出声
"""
import sys, json, re, datetime, os, urllib.request
from pathlib import Path
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
DB_PATH = DATA / "mofin.db"
STATE_PATH = DATA / "macro_divergence_state.json"
# ── 监测的指数 ──
INDEX_CODES = {
"上证指数": "sh000001",
"深证成指": "sz399001",
"创业板指": "sz399006",
"科创50": "sh000688",
"上证50": "sh000016",
"沪深300": "sh000300",
"恒生指数": "hkHSI",
"国企指数": "hkHSCEI",
}
# ── 背离阈值 ──
DIVERGENCE_STRONG = 5.0 # >5% → strong信号
DIVERGENCE_MODERATE = 3.0 # >3% → moderate信号
STREAK_DAYS = 3 # 连涨/连跌3天 → 信号
def fetch_indices():
"""获取所有指数实时数据"""
symbols = list(INDEX_CODES.values())
url = f"http://qt.gtimg.cn/q={','.join(symbols)}"
try:
r = urllib.request.urlopen(url, timeout=10)
text = r.read().decode("gbk")
except Exception as e:
print(f"[DIVERGE] 采集失败: {e}", file=sys.stderr)
return {}
indices = {}
for line in text.strip().split("\n"):
line = line.strip()
if not line or "=" not in line:
continue
try:
sym = line.split("=", 1)[0].strip().lstrip("v_")
raw = line.split("=", 1)[1].strip().strip('"').strip(";")
fields = raw.split("~")
if len(fields) < 35:
continue
name = fields[1]
price = float(fields[3]) if fields[3].strip() else 0
close = float(fields[4]) if fields[4].strip() else 0
change_pct = ((price - close) / close * 100) if close else 0
high = float(fields[33]) if fields[33].strip() else 0
low = float(fields[34]) if fields[34].strip() else 0
timestamp = fields[30] if len(fields) > 30 else ""
indices[sym] = {
"name": name,
"price": price,
"close": close,
"change_pct": round(change_pct, 2),
"high": high,
"low": low,
"timestamp": timestamp,
}
except Exception:
continue
return indices
def load_history():
"""从MACRO_CONTEXT_LOG加载前几天的指数数据"""
try:
import sqlite3
conn = sqlite3.connect(str(DB_PATH))
rows = conn.execute(
"SELECT indices, created_at FROM macro_context_log "
"WHERE has_valid_data=1 ORDER BY created_at DESC LIMIT 5"
).fetchall()
conn.close()
history = []
for row in rows:
idx_data = json.loads(row[0]) if row[0] else {}
history.append({
"indices": idx_data,
"timestamp": row[1],
})
return history
except Exception:
return []
def detect_divergences(indices, history):
"""
检测跨市场背离信号
返回: list of signal dicts {type, level, desc, pairs}
"""
signals = []
def get(name):
"""按中文名找指数"""
sym = INDEX_CODES.get(name)
if sym and sym in indices:
return indices[sym]
# 模糊匹配
for s, idx in indices.items():
if name in idx.get("name", ""):
return idx
return None
sh_comp = get("上证指数")
sz_comp = get("深证成指")
cyb = get("创业板指")
kc = get("科创50")
sz50 = get("上证50")
hs300 = get("沪深300")
hsi = get("恒生指数")
hscei = get("国企指数")
# ── 信号1: 科创50 vs 恒指(科技股vs国际资本)──
if kc and hsi:
kc_chg = kc["change_pct"]
hsi_chg = hsi["change_pct"]
divergence = abs(kc_chg - hsi_chg)
if divergence > DIVERGENCE_STRONG:
signals.append({
"type": "a_h_tech_divergence",
"level": "high",
"desc": f"科创50({kc_chg:+.1f}%) vs 恒指({hsi_chg:+.1f}%) 背离{divergence:.1f}个百分点",
"pairs": [kc, hsi],
"direction": "risk" if kc_chg > hsi_chg else "opportunity",
})
elif divergence > DIVERGENCE_MODERATE:
signals.append({
"type": "a_h_tech_divergence",
"level": "medium",
"desc": f"科创50({kc_chg:+.1f}%) vs 恒指({hsi_chg:+.1f}%) 背离{divergence:.1f}个百分点",
"pairs": [kc, hsi],
"direction": "risk" if kc_chg > hsi_chg else "opportunity",
})
# ── 信号2: 上证50 vs 创业板(价值vs成长)──
if sz50 and cyb:
sz50_chg = sz50["change_pct"]
cyb_chg = cyb["change_pct"]
divergence = abs(sz50_chg - cyb_chg)
if divergence > DIVERGENCE_STRONG:
direction = "opportunity" if sz50_chg > cyb_chg else "risk"
signals.append({
"type": "value_growth_divergence",
"level": "high",
"desc": f"上证50({sz50_chg:+.1f}%) vs 创业板({cyb_chg:+.1f}%) 背离{divergence:.1f}个百分点",
"pairs": [sz50, cyb],
"direction": direction,
})
elif divergence > DIVERGENCE_MODERATE:
direction = "opportunity" if sz50_chg > cyb_chg else "risk"
signals.append({
"type": "value_growth_divergence",
"level": "medium",
"desc": f"上证50({sz50_chg:+.1f}%) vs 创业板({cyb_chg:+.1f}%) 背离{divergence:.1f}个百分点",
"pairs": [sz50, cyb],
"direction": direction,
})
# ── 信号3: 恒指 vs 国企指数(国际资本流向)──
if hsi and hscei:
hsi_chg = hsi["change_pct"]
hscei_chg = hscei["change_pct"]
if hsi_chg < 0 and hscei_chg < hsi_chg:
# 国企跌得比恒指多 → 外资恐慌性卖出H股
signals.append({
"type": "hk_panic_selling",
"level": "high",
"desc": f"国企指数({hscei_chg:+.1f}%)跌幅大于恒指({hsi_chg:+.1f}%)→外资恐慌抛售H股",
"pairs": [hscei, hsi],
"direction": "risk",
})
# ── 信号4: A/H 价格背离(用历史数据检测趋势延续)──
if history and len(history) >= 2:
latest = history[0]["indices"]
prev = history[1]["indices"]
# 科创50连涨检测
if kc:
kc_now = kc["change_pct"]
kc_prev = prev.get("科创50", {}).get("change_pct", 0) if "科创50" in prev else 0
kc_yest = latest.get("科创50", {}).get("change_pct", 0)
# 检测连涨
if isinstance(kc_yest, (int, float)) and isinstance(kc_prev, (int, float)):
streak = 0
if kc_yest > 0: streak += 1
if kc_prev > 0: streak += 1
if kc_now > 0: streak += 1
if streak >= STREAK_DAYS and kc_now > 0:
signals.append({
"type": "tech_streak",
"level": "medium",
"desc": f"科创50连涨{streak}日({kc_prev:+.1f}%→{kc_yest:+.1f}%→{kc_now:+.1f}%)→超买风险",
"pairs": [kc],
"direction": "risk",
})
# ── 信号5: 大盘宽度 + 季节效应 ──
now = datetime.datetime.now()
is_month_end = now.day >= 25 # 月末最后一周
is_friday = now.weekday() == 4
# 总体判断
risk_count = sum(1 for s in signals if s["direction"] == "risk")
opp_count = sum(1 for s in signals if s["direction"] == "opportunity")
# 月末+周五叠加 → 脆弱性增强
if is_month_end and is_friday and risk_count >= 2:
signals.append({
"type": "time_window_risk",
"level": "high",
"desc": f"月末({now.day}日)+周五效应+{risk_count}个风险信号叠加→市场脆弱性高",
"pairs": [],
"direction": "risk",
})
elif is_month_end and risk_count >= 1:
signals.append({
"type": "time_window_risk",
"level": "medium",
"desc": f"月末窗口({now.day}日)+{risk_count}个风险信号→注意控制仓位",
"pairs": [],
"direction": "risk",
})
return signals
def write_state(signals, indices):
"""写入状态文件,供监控 cron 消费"""
levels = [s["level"] for s in signals]
highest = "high" if "high" in levels else ("medium" if "medium" in levels else "none")
directions = [s["direction"] for s in signals]
bias = "risk" if directions.count("risk") > directions.count("opportunity") else "opportunity"
state = {
"level": highest,
"bias": bias,
"signal_count": len(signals),
"signals": signals,
"indices": {k: v["change_pct"] for k, v in indices.items()},
"created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2))
def write_to_signal_news(signals):
"""HIGH signal → signal_news"""
if not signals:
return
import sqlite3
conn = sqlite3.connect(str(DB_PATH))
high_signals = [s for s in signals if s["level"] == "high"]
if high_signals:
for s in high_signals:
conn.execute(
"INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) VALUES (?, ?, ?, ?, ?, ?, ?)",
(0, "跨市场", f"背离-{s['direction'].upper()}", s["desc"], json.dumps(s, ensure_ascii=False), "", "divergence_watch")
)
conn.commit()
med_signals = [s for s in signals if s["level"] == "medium"]
if med_signals:
summary = "\n".join([s["desc"] for s in med_signals])
conn.execute(
"INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) VALUES (?, ?, ?, ?, ?, ?, ?)",
(0, "跨市场", "背离-MEDIUM", summary, json.dumps(med_signals, ensure_ascii=False), "", "divergence_watch")
)
conn.commit()
conn.close()
def print_report(signals, indices):
"""no_agent 输出"""
if not signals:
return # SILENT
# 有信号就输出
high = [s for s in signals if s["level"] == "high"]
med = [s for s in signals if s["level"] == "medium"]
lines = []
if high:
for s in high:
icon = "\u26a0\ufe0f" if s["direction"] == "risk" else "\u2b06\ufe0f"
lines.append(f"[DIVERGE] {icon} {s['level'].upper()} {s['type']}: {s['desc']}")
if med:
for s in med[:3]: # 最多3条
icon = "\u26a0\ufe0f" if s["direction"] == "risk" else "\u2b06\ufe0f"
lines.append(f"[DIVERGE] {icon} {s['level'].upper()} {s['type']}: {s['desc']}")
# 输出指数全景
idx_line = " | ".join([f"{n}: {indices.get(s, {}).get('change_pct', 0):+.1f}%" for n, s in INDEX_CODES.items() if s in indices])
lines.append(f"[DIVERGE] 指数全景: {idx_line}")
print("\n".join(lines))
def main():
indices = fetch_indices()
if not indices:
return
history = load_history()
signals = detect_divergences(indices, history)
# 写入 state + signal_news
write_state(signals, indices)
write_to_signal_news(signals)
# no_agent 输出
print_report(signals, indices)
if __name__ == "__main__":
main()
+231
View File
@@ -0,0 +1,231 @@
#!/usr/bin/env python3
"""
macro_context_collector.py — 宏观新闻采集器+实时红绿灯(no_agent)
采集来源:
1. akshare.stock_info_global_em() — 东方财富全球宏观新闻(200条实时)
核心功能:
A. 采集新闻→macro_raw_news(去重入库)
B. 实时红绿灯检测——用关键词规则快速识别HIGH/MEDIUM风险
- 不等LLM,采集完立刻判断
- 检测到风险→写入signal_news+macro_risk_state.json
- 盘中的监控cron(每15-25分)读到后立刻调整策略
红绿灯规则:
HIGH(单条就触发): 全球巨头+核心产业负面/美联储意外/指数暴跌/地缘冲突
MEDIUM(累计2条触发): 常规宏观事件/板块级/资金面
"""
import sys, json, hashlib, os, re
from datetime import datetime
from pathlib import Path
DATA_DIR = Path(__file__).parent.parent / "data"
DB_PATH = DATA_DIR / "mofin.db"
STATE_PATH = DATA_DIR / "macro_risk_state.json"
# ── 红绿灯 关键词规则 ──
# HIGH: 任何一条匹配 → 立即 HIGH 预警
HIGH_PATTERNS = [
# 全球巨头+核心产业
r"苹果.*(?:涨价|降价|推迟|取消|禁|制裁|调查|召回|大跌|暴跌)",
r"openai.*(?:推迟|取消|风险|调查|起诉|倒闭|ipo)",
r"英伟达|nvidia.*(?:跌|调查|制裁|推迟|禁令)",
r"台积电.*(?:跌|推迟|取消|地震|火灾|禁)",
r"特斯拉.*(?:暴跌|召回|调查|破产|禁)",
# 美联储/央行意外
r"美联储.*(?:意外|紧急|缩表|风暴|警告|超预期|加息\s*50|降息\s*50|紧急\s*(?:会议|声明))",
r"美联储.*(?:利率|决议).*(?:超预期|意外|紧急)",
r"fed.*(?:emergency|unexpected|surprise|hike|cut)",
# 指数暴跌
r"指数.*(?:跌幅|暴跌|熔断|闪崩|重挫)",
r"(?:暴跌|重挫|熔断).*[5-9]%",
r"熔断|闪崩",
# 地缘+贸易
r"关税.*(?:升级|新|报复|制裁)",
r"制裁.*(?:新|升级|全面)",
r"战争|开战|入侵|核|导弹.*发射",
# 系统性能源
r"原油.*(?:跌破|暴跌|崩盘|断供)",
r"石油.*(?:禁运|制裁|断供)",
r"能源危机|粮食危机",
# 系统金融
r"银行.*(?:倒闭|挤兑|破产|接管|危机)",
r"金融危机|债务危机|违约潮|系统性",
# AI/科技板块重挫
r"半导体.*(?:暴跌|熔断|崩盘|跌幅)",
r"科技股.*(?:暴跌|熔断|崩盘|重挫)",
r"费城半导体|sox.*(?:跌|崩)",
]
# MEDIUM: 累计匹配2条以上 → MEDIUM 预警
MEDIUM_PATTERNS = [
r"加息|降息",
r"通胀|CPI|PPI",
r"汇率.*(?:大幅|波动|贬值|升值)",
r"外资.*(?:流出|撤离|减持)",
r"北向.*(?:流出|净卖出|大幅)",
r"季末|年末|半年末|资金回笼|流动性紧张",
r"解禁.*(?:大额|巨量|千亿)",
r"大跌|重挫|杀跌|恐慌",
r"评级.*(?:下调|负面|降级)",
r"预警|风险提示|谨慎",
r"期货.*(?:暴跌|跌停|熔断)",
r"黑天鹅|灰犀牛",
]
def ensure_tables(conn):
conn.execute("""
CREATE TABLE IF NOT EXISTS macro_raw_news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
summary TEXT DEFAULT '',
url TEXT DEFAULT '',
source_ts TEXT DEFAULT '',
fetched_at TEXT DEFAULT (datetime('now','localtime')),
risk_level TEXT DEFAULT 'unassessed',
risk_reason TEXT DEFAULT ''
)
""")
conn.commit()
def fetch_news():
try:
import akshare as ak
df = ak.stock_info_global_em()
items = []
for _, row in df.iterrows():
items.append({
"title": str(row.get("标题", "")),
"summary": str(row.get("摘要", "")),
"url": str(row.get("链接", "")),
"source_ts": str(row.get("发布时间", "")),
})
return items
except ImportError:
print("[MACRO] akshare 未安装", file=sys.stderr)
return []
except Exception as e:
print(f"[MACRO] 采集失败: {e}", file=sys.stderr)
return []
def title_hash(title):
return hashlib.md5(title.encode()).hexdigest()[:16]
def quick_risk_check(items):
"""红绿灯:快速关键词检测,返回 (level, matched_articles, summary)"""
hits_high = []
hits_medium = []
for item in items:
text = (item["title"] + " " + item["summary"]).lower()
for pattern in HIGH_PATTERNS:
if re.search(pattern, text):
hits_high.append(item)
break
else:
# 没进 HIGH 才检查 MEDIUM
for pattern in MEDIUM_PATTERNS:
if re.search(pattern, text):
hits_medium.append(item)
break
level = "none"
summary = ""
matched = []
if hits_high:
level = "high"
matched = hits_high[:5]
titles = [f"· {h['title'][:50]}" for h in hits_high[:5]]
summary = f"【高风险】{len(hits_high)}条紧急信号:\n" + "\n".join(titles)
elif len(hits_medium) >= 2:
level = "medium"
matched = hits_medium[:5]
titles = [f"· {h['title'][:50]}" for h in hits_medium[:5]]
summary = f"【中风险】{len(hits_medium)}条预警信号:\n" + "\n".join(titles)
return level, matched, summary
def write_risk_signal(conn, level, matched, summary):
"""写入 signal_news + macro_risk_state.json"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sentiment_map = {"high": "宏观-WATCH_HIGH", "medium": "宏观-WATCH_MEDIUM"}
sentiment = sentiment_map.get(level, "")
# 写入 signal_news
articles_json = json.dumps([{"title": a["title"][:80], "summary": a["summary"][:120]} for a in matched], ensure_ascii=False)
conn.execute(
"INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) VALUES (0, ?, ?, ?, ?, '', ?)",
("宏观", sentiment, summary[:500], articles_json, "macro_watch")
)
conn.commit()
# 写入状态文件(供监控cron实时读取)
state = {
"level": level,
"signals": [{"sentiment": sentiment, "summary": summary[:300], "key_articles": articles_json, "created_at": now}],
"signal_count": len(matched),
"created_at": now,
"expired": False,
"source": "collector_realtime",
}
STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2))
def main():
conn = sqlite3.connect(str(DB_PATH))
ensure_tables(conn)
# 去重基础
existing = set()
for row in conn.execute("SELECT title FROM macro_raw_news ORDER BY id DESC LIMIT 200"):
existing.add(title_hash(row[0]))
items = fetch_news()
if not items:
conn.close()
return
# 去重写入
new_items = []
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for item in items:
if not item["title"].strip():
continue
h = title_hash(item["title"])
if h in existing:
continue
existing.add(h)
try:
conn.execute(
"INSERT INTO macro_raw_news (title, summary, url, source_ts, fetched_at) VALUES (?, ?, ?, ?, ?)",
(item["title"][:300], item["summary"][:500], item["url"][:500], item["source_ts"][:20], now_str)
)
new_items.append(item)
except Exception:
pass
conn.commit()
# 红绿灯检测(只针对新采集的)
level, matched, summary = "none", [], ""
if new_items:
level, matched, summary = quick_risk_check(new_items)
if level in ("high", "medium"):
write_risk_signal(conn, level, matched, summary)
conn.close()
# no_agent 输出
if new_items:
print(f"[MACRO] {len(new_items)}条新宏观新闻")
# HIGH风险:输出预警(会推送到XMPP)
if level == "high":
print(f"⚠️ HIGH风险预警 ({len(matched)}条): {summary[:200]}")
elif level == "medium":
print(f" MEDIUM风险: {len(matched)}条匹配")
if __name__ == "__main__":
import sqlite3
main()
+148
View File
@@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""macro_index_collector.py — 指数宏观数据采集 (no_agent)
采集指数行情,写入 macro_context_log 表(积累每日快照)。
与莫荷的 macro_context_collector.py(宏观新闻采集)分开,互不冲突。
在 9:35 和 12:00 由 cron 触发。
"""
import json, sqlite3, subprocess
from datetime import datetime
from pathlib import Path
DATA_DIR = Path("/home/hmo/web-dashboard/data")
DB_PATH = Path("/home/hmo/MoFin/data/mofin.db")
INDICES = [
("sh000001", "上证指数"), ("sz399001", "深证成指"),
("sz399006", "创业板指"), ("sh000688", "科创50"),
("hkHSI", "恒生指数"), ("hkHSCEI", "国企指数"),
]
SECTOR_INDICES = [
("sz980017", "国证芯片"), ("sh000039", "上证信息"),
("sz980022", "机器人产业"), ("sz980032", "新能电池"),
("sz980076", "通用航空"), ("sh000063", "上证周期"),
("sh000068", "上证资源"), ("sh000019", "治理指数"),
]
def fetch(symbol):
try:
r = subprocess.run(["curl", "-s", f"https://qt.gtimg.cn/q={symbol}"],
capture_output=True, timeout=8)
return r.stdout.decode("gbk", errors="replace")
except:
return None
def parse(text, name):
if not text or "pv_none_match" in text:
return None
try:
f = text.split("~")
if len(f) < 35:
return None
price = float(f[3]) if f[3] else 0
prev = float(f[4]) if f[4] else 0
high = float(f[33]) if len(f) > 33 and f[33] else 0
low = float(f[34]) if len(f) > 34 and f[34] else 0
chg = round((price - prev) / prev * 100, 2) if prev else 0
return {"name": name, "price": price, "change_pct": chg,
"high": high, "low": low}
except:
return None
def assess(indices_data):
if not indices_data:
return "unknown", "unknown"
sh = next((i for i in indices_data if i and i["name"] == "上证指数"), None)
sz = next((i for i in indices_data if i and i["name"] == "深证成指"), None)
cyb = next((i for i in indices_data if i and i["name"] == "创业板指"), None)
avg = ((sh or {}).get("change_pct", 0) + (sz or {}).get("change_pct", 0) +
(cyb or {}).get("change_pct", 0)) / 3
if avg > 1.5:
return "strong_bullish", "整体强势"
elif avg > 0.5:
return "bullish", "偏强"
elif avg > -0.5:
return "neutral", "震荡"
elif avg > -1.5:
return "bearish", "偏弱"
return "strong_bearish", "整体弱势"
def main():
now = datetime.now()
indices_data = []
for sym, name in INDICES:
raw = fetch(sym)
p = parse(raw, name)
if p:
indices_data.append(p)
has_data = any(i for i in indices_data if i and i.get("price", 0) > 0)
overall, desc = assess(indices_data)
# 读莫荷的 macro_risk_state.json 补充风险状态
risk_state = {}
try:
risk_path = DATA_DIR / "macro_risk_state.json"
if risk_path.exists():
risk_state = json.loads(risk_path.read_text())
except:
pass
context = {
"updated_at": now.strftime("%Y-%m-%d %H:%M:%S"),
"session": "morning" if now.hour < 12 else "midday",
"has_valid_data": has_data,
"indices": {i["name"]: {"price": i["price"], "change_pct": i["change_pct"],
"high": i.get("high", 0), "low": i.get("low", 0)}
for i in indices_data if i},
"structure": {"overall": overall, "description": desc},
"risk_level": risk_state.get("level", "none"),
"risk_reason": risk_state.get("reason", ""),
}
# 写入DB
try:
conn = sqlite3.connect(str(DB_PATH))
conn.execute('''INSERT INTO macro_context_log
(data_timestamp, session, has_valid_data, indices, structure,
key_sectors, top_gainers, top_losers, sector_up_ratio, sector_mood)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(context["updated_at"], context["session"],
1 if has_data else 0,
json.dumps(context["indices"], ensure_ascii=False),
json.dumps(context["structure"], ensure_ascii=False),
"[]", "[]", "[]", 0, overall))
conn.commit()
conn.close()
except:
pass
# 写入 macro_context.json(兼容旧读取方)
try:
DATA_DIR.mkdir(parents=True, exist_ok=True)
(DATA_DIR / "macro_context.json").write_text(
json.dumps(context, ensure_ascii=False, indent=2))
except:
pass
if has_data:
idx_parts = [f"{i['name']}{'' if i['change_pct']>0 else ''}{i['change_pct']:+.2f}%"
for i in indices_data[:4] if i]
print(f"【宏观指数】{now.strftime('%H:%M')} | {desc}")
print(" | ".join(idx_parts))
print(f"评估: {'整体强势' if overall=='strong_bullish' else '偏强' if overall=='bullish' else '震荡' if overall=='neutral' else '偏弱' if overall=='bearish' else '整体弱势'}")
if risk_state.get("level") == "high":
print(f"🔴 宏观风险: {risk_state.get('reason','')}")
else:
print("【宏观指数】数据不可用(非交易时段或行情未更新)")
if __name__ == "__main__":
main()
+92
View File
@@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""
macro_signal_consumer.py — 消费宏观风险信号,写入风险状态
no_agent 模式:有HIGH风险→输出风险摘要 | 无→静默
管道位置:
macro_risk_scanner (8:30/11:30) → signal_news(source=macro_watch) → 本脚本
macro_risk_state.json — 供所有监控 cron 读取
如果 HIGH → 推送到 Dad
"""
import sqlite3, json, os
from pathlib import Path
from datetime import datetime
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
DB_PATH = DATA / "mofin.db"
STATE_PATH = DATA / "macro_risk_state.json"
def main():
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
# 读取未处理的 macro_watch 信号
rows = conn.execute(
"SELECT * FROM signal_news WHERE source='macro_watch' AND (processed=0 OR processed IS NULL)"
).fetchall()
if not rows:
# 无新信号时状态文件维持 15 分钟过期
try:
state = json.loads(STATE_PATH.read_text())
created = datetime.strptime(state.get("created_at", "2000-01-01"), "%Y-%m-%d %H:%M:%S")
if (datetime.now() - created).total_seconds() > 900: # 15 min
state["level"] = "none"
state["expired"] = True
STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2))
except:
pass
conn.close()
return # SILENT
# 聚合风险等级
levels = {"宏观-WATCH_HIGH": "high", "宏观-WATCH_MEDIUM": "medium", "宏观-WATCH_INFO": "info"}
highest = "info"
all_summaries = []
for r in rows:
sentiment = r["overall_sentiment"]
lv = levels.get(sentiment, "info")
if lv == "high":
highest = "high"
elif lv == "medium" and highest != "high":
highest = "medium"
all_summaries.append({
"sentiment": sentiment,
"summary": r["summary"][:300],
"key_articles": r["key_articles"],
"created_at": r["created_at"],
})
# 写入风险状态文件
state = {
"level": highest,
"signals": all_summaries,
"signal_count": len(rows),
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"expired": False,
}
STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2))
# 标记为已处理
for r in rows:
conn.execute("UPDATE signal_news SET processed=1 WHERE id=?", (r["id"],))
conn.commit()
conn.close()
# no_agent 输出(有 HIGH 才主动出声)
if highest == "high":
print(f"[MACRO-RISK] ⚠️ HIGH: {len(rows)}条高风险信号")
for s in all_summaries:
print(f" {s['summary'][:100]}")
elif highest == "medium":
if len(os.environ.get("MACRO_VERBOSE", "")) > 0:
print(f"[MACRO-RISK] MEDIUM: {len(rows)}条中风险信号")
# HIGH 信号会通过 no_agent 推送到 XMPP
if __name__ == "__main__":
main()