From 54915e9b7ebc7d87ed48e409bb2a63915441c863 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Sat, 27 Jun 2026 01:14:00 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=8F=E8=A7=82=E6=8C=87=E6=95=B0?= =?UTF-8?q?=E9=87=87=E9=9B=86=E7=8B=AC=E7=AB=8B(macro=5Findex=5Fcollector)?= =?UTF-8?q?=20+=20=E8=8E=AB=E8=8D=B7=E5=AE=8F=E8=A7=82=E9=A2=84=E8=AD=A6?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E5=85=A5=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 重建 macro_index_collector.py: 原macro_context_collector(index采集)独立 避免与莫荷的 macro_context_collector(宏观新闻采集)冲突 - 纳入莫荷的宏观预警系统: divergence_detector.py — 跨市场背离监测 macro_context_collector.py — 宏观新闻采集+红绿灯(莫荷版) macro_signal_consumer.py — 宏观风险信号消费 - cron修正: 宏观采集-早/午间 → macro_index_collector.py --- scripts/divergence_detector.py | 333 +++++++++++++++++++++++++++++ scripts/macro_context_collector.py | 231 ++++++++++++++++++++ scripts/macro_index_collector.py | 148 +++++++++++++ scripts/macro_signal_consumer.py | 92 ++++++++ 4 files changed, 804 insertions(+) create mode 100644 scripts/divergence_detector.py create mode 100644 scripts/macro_context_collector.py create mode 100644 scripts/macro_index_collector.py create mode 100644 scripts/macro_signal_consumer.py diff --git a/scripts/divergence_detector.py b/scripts/divergence_detector.py new file mode 100644 index 0000000..fef8d3c --- /dev/null +++ b/scripts/divergence_detector.py @@ -0,0 +1,333 @@ +#!/usr/bin/env python3 +""" +divergence_detector.py — 跨市场背离监测器(no_agent) + +每30分钟检测: + 1. 科创50 vs 恒指 → 科技股超买/超卖信号 + 2. A/H 隐含溢价 → 内外资分歧度 + 3. 上证50 vs 创业板 → 风格轮动信号 + 4. 恒指 vs 国企指数 → 离岸市场情绪 + 5. 指数连涨/连跌天数 → 趋势延续/衰竭 + +输出: + - HIGH/medium divergence → 写入 signal_news (source=divergence_watch) + - 状态文件 macro_divergence_state.json + - no_agent: 有信号才出声 +""" +import sys, json, re, datetime, os, urllib.request + +from pathlib import Path + +BASE = Path("/home/hmo/MoFin") +DATA = BASE / "data" +DB_PATH = DATA / "mofin.db" +STATE_PATH = DATA / "macro_divergence_state.json" + +# ── 监测的指数 ── +INDEX_CODES = { + "上证指数": "sh000001", + "深证成指": "sz399001", + "创业板指": "sz399006", + "科创50": "sh000688", + "上证50": "sh000016", + "沪深300": "sh000300", + "恒生指数": "hkHSI", + "国企指数": "hkHSCEI", +} + +# ── 背离阈值 ── +DIVERGENCE_STRONG = 5.0 # >5% → strong信号 +DIVERGENCE_MODERATE = 3.0 # >3% → moderate信号 +STREAK_DAYS = 3 # 连涨/连跌3天 → 信号 + +def fetch_indices(): + """获取所有指数实时数据""" + symbols = list(INDEX_CODES.values()) + url = f"http://qt.gtimg.cn/q={','.join(symbols)}" + try: + r = urllib.request.urlopen(url, timeout=10) + text = r.read().decode("gbk") + except Exception as e: + print(f"[DIVERGE] 采集失败: {e}", file=sys.stderr) + return {} + + indices = {} + for line in text.strip().split("\n"): + line = line.strip() + if not line or "=" not in line: + continue + try: + sym = line.split("=", 1)[0].strip().lstrip("v_") + raw = line.split("=", 1)[1].strip().strip('"').strip(";") + fields = raw.split("~") + if len(fields) < 35: + continue + name = fields[1] + price = float(fields[3]) if fields[3].strip() else 0 + close = float(fields[4]) if fields[4].strip() else 0 + change_pct = ((price - close) / close * 100) if close else 0 + high = float(fields[33]) if fields[33].strip() else 0 + low = float(fields[34]) if fields[34].strip() else 0 + timestamp = fields[30] if len(fields) > 30 else "" + indices[sym] = { + "name": name, + "price": price, + "close": close, + "change_pct": round(change_pct, 2), + "high": high, + "low": low, + "timestamp": timestamp, + } + except Exception: + continue + return indices + +def load_history(): + """从MACRO_CONTEXT_LOG加载前几天的指数数据""" + try: + import sqlite3 + conn = sqlite3.connect(str(DB_PATH)) + rows = conn.execute( + "SELECT indices, created_at FROM macro_context_log " + "WHERE has_valid_data=1 ORDER BY created_at DESC LIMIT 5" + ).fetchall() + conn.close() + history = [] + for row in rows: + idx_data = json.loads(row[0]) if row[0] else {} + history.append({ + "indices": idx_data, + "timestamp": row[1], + }) + return history + except Exception: + return [] + +def detect_divergences(indices, history): + """ + 检测跨市场背离信号 + 返回: list of signal dicts {type, level, desc, pairs} + """ + signals = [] + + def get(name): + """按中文名找指数""" + sym = INDEX_CODES.get(name) + if sym and sym in indices: + return indices[sym] + # 模糊匹配 + for s, idx in indices.items(): + if name in idx.get("name", ""): + return idx + return None + + sh_comp = get("上证指数") + sz_comp = get("深证成指") + cyb = get("创业板指") + kc = get("科创50") + sz50 = get("上证50") + hs300 = get("沪深300") + hsi = get("恒生指数") + hscei = get("国企指数") + + # ── 信号1: 科创50 vs 恒指(科技股vs国际资本)── + if kc and hsi: + kc_chg = kc["change_pct"] + hsi_chg = hsi["change_pct"] + divergence = abs(kc_chg - hsi_chg) + if divergence > DIVERGENCE_STRONG: + signals.append({ + "type": "a_h_tech_divergence", + "level": "high", + "desc": f"科创50({kc_chg:+.1f}%) vs 恒指({hsi_chg:+.1f}%) 背离{divergence:.1f}个百分点", + "pairs": [kc, hsi], + "direction": "risk" if kc_chg > hsi_chg else "opportunity", + }) + elif divergence > DIVERGENCE_MODERATE: + signals.append({ + "type": "a_h_tech_divergence", + "level": "medium", + "desc": f"科创50({kc_chg:+.1f}%) vs 恒指({hsi_chg:+.1f}%) 背离{divergence:.1f}个百分点", + "pairs": [kc, hsi], + "direction": "risk" if kc_chg > hsi_chg else "opportunity", + }) + + # ── 信号2: 上证50 vs 创业板(价值vs成长)── + if sz50 and cyb: + sz50_chg = sz50["change_pct"] + cyb_chg = cyb["change_pct"] + divergence = abs(sz50_chg - cyb_chg) + if divergence > DIVERGENCE_STRONG: + direction = "opportunity" if sz50_chg > cyb_chg else "risk" + signals.append({ + "type": "value_growth_divergence", + "level": "high", + "desc": f"上证50({sz50_chg:+.1f}%) vs 创业板({cyb_chg:+.1f}%) 背离{divergence:.1f}个百分点", + "pairs": [sz50, cyb], + "direction": direction, + }) + elif divergence > DIVERGENCE_MODERATE: + direction = "opportunity" if sz50_chg > cyb_chg else "risk" + signals.append({ + "type": "value_growth_divergence", + "level": "medium", + "desc": f"上证50({sz50_chg:+.1f}%) vs 创业板({cyb_chg:+.1f}%) 背离{divergence:.1f}个百分点", + "pairs": [sz50, cyb], + "direction": direction, + }) + + # ── 信号3: 恒指 vs 国企指数(国际资本流向)── + if hsi and hscei: + hsi_chg = hsi["change_pct"] + hscei_chg = hscei["change_pct"] + if hsi_chg < 0 and hscei_chg < hsi_chg: + # 国企跌得比恒指多 → 外资恐慌性卖出H股 + signals.append({ + "type": "hk_panic_selling", + "level": "high", + "desc": f"国企指数({hscei_chg:+.1f}%)跌幅大于恒指({hsi_chg:+.1f}%)→外资恐慌抛售H股", + "pairs": [hscei, hsi], + "direction": "risk", + }) + + # ── 信号4: A/H 价格背离(用历史数据检测趋势延续)── + if history and len(history) >= 2: + latest = history[0]["indices"] + prev = history[1]["indices"] + + # 科创50连涨检测 + if kc: + kc_now = kc["change_pct"] + kc_prev = prev.get("科创50", {}).get("change_pct", 0) if "科创50" in prev else 0 + kc_yest = latest.get("科创50", {}).get("change_pct", 0) + + # 检测连涨 + if isinstance(kc_yest, (int, float)) and isinstance(kc_prev, (int, float)): + streak = 0 + if kc_yest > 0: streak += 1 + if kc_prev > 0: streak += 1 + if kc_now > 0: streak += 1 + if streak >= STREAK_DAYS and kc_now > 0: + signals.append({ + "type": "tech_streak", + "level": "medium", + "desc": f"科创50连涨{streak}日({kc_prev:+.1f}%→{kc_yest:+.1f}%→{kc_now:+.1f}%)→超买风险", + "pairs": [kc], + "direction": "risk", + }) + + # ── 信号5: 大盘宽度 + 季节效应 ── + now = datetime.datetime.now() + is_month_end = now.day >= 25 # 月末最后一周 + is_friday = now.weekday() == 4 + + # 总体判断 + risk_count = sum(1 for s in signals if s["direction"] == "risk") + opp_count = sum(1 for s in signals if s["direction"] == "opportunity") + + # 月末+周五叠加 → 脆弱性增强 + if is_month_end and is_friday and risk_count >= 2: + signals.append({ + "type": "time_window_risk", + "level": "high", + "desc": f"月末({now.day}日)+周五效应+{risk_count}个风险信号叠加→市场脆弱性高", + "pairs": [], + "direction": "risk", + }) + elif is_month_end and risk_count >= 1: + signals.append({ + "type": "time_window_risk", + "level": "medium", + "desc": f"月末窗口({now.day}日)+{risk_count}个风险信号→注意控制仓位", + "pairs": [], + "direction": "risk", + }) + + return signals + +def write_state(signals, indices): + """写入状态文件,供监控 cron 消费""" + levels = [s["level"] for s in signals] + highest = "high" if "high" in levels else ("medium" if "medium" in levels else "none") + directions = [s["direction"] for s in signals] + bias = "risk" if directions.count("risk") > directions.count("opportunity") else "opportunity" + + state = { + "level": highest, + "bias": bias, + "signal_count": len(signals), + "signals": signals, + "indices": {k: v["change_pct"] for k, v in indices.items()}, + "created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2)) + +def write_to_signal_news(signals): + """HIGH signal → signal_news""" + if not signals: + return + import sqlite3 + conn = sqlite3.connect(str(DB_PATH)) + + high_signals = [s for s in signals if s["level"] == "high"] + if high_signals: + for s in high_signals: + conn.execute( + "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) VALUES (?, ?, ?, ?, ?, ?, ?)", + (0, "跨市场", f"背离-{s['direction'].upper()}", s["desc"], json.dumps(s, ensure_ascii=False), "", "divergence_watch") + ) + conn.commit() + + med_signals = [s for s in signals if s["level"] == "medium"] + if med_signals: + summary = "\n".join([s["desc"] for s in med_signals]) + conn.execute( + "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) VALUES (?, ?, ?, ?, ?, ?, ?)", + (0, "跨市场", "背离-MEDIUM", summary, json.dumps(med_signals, ensure_ascii=False), "", "divergence_watch") + ) + conn.commit() + + conn.close() + +def print_report(signals, indices): + """no_agent 输出""" + if not signals: + return # SILENT + + # 有信号就输出 + high = [s for s in signals if s["level"] == "high"] + med = [s for s in signals if s["level"] == "medium"] + + lines = [] + if high: + for s in high: + icon = "\u26a0\ufe0f" if s["direction"] == "risk" else "\u2b06\ufe0f" + lines.append(f"[DIVERGE] {icon} {s['level'].upper()} {s['type']}: {s['desc']}") + if med: + for s in med[:3]: # 最多3条 + icon = "\u26a0\ufe0f" if s["direction"] == "risk" else "\u2b06\ufe0f" + lines.append(f"[DIVERGE] {icon} {s['level'].upper()} {s['type']}: {s['desc']}") + + # 输出指数全景 + idx_line = " | ".join([f"{n}: {indices.get(s, {}).get('change_pct', 0):+.1f}%" for n, s in INDEX_CODES.items() if s in indices]) + lines.append(f"[DIVERGE] 指数全景: {idx_line}") + + print("\n".join(lines)) + +def main(): + indices = fetch_indices() + if not indices: + return + + history = load_history() + signals = detect_divergences(indices, history) + + # 写入 state + signal_news + write_state(signals, indices) + write_to_signal_news(signals) + + # no_agent 输出 + print_report(signals, indices) + +if __name__ == "__main__": + main() diff --git a/scripts/macro_context_collector.py b/scripts/macro_context_collector.py new file mode 100644 index 0000000..4887f76 --- /dev/null +++ b/scripts/macro_context_collector.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 +""" +macro_context_collector.py — 宏观新闻采集器+实时红绿灯(no_agent) + +采集来源: + 1. akshare.stock_info_global_em() — 东方财富全球宏观新闻(200条实时) + +核心功能: + A. 采集新闻→macro_raw_news(去重入库) + B. 实时红绿灯检测——用关键词规则快速识别HIGH/MEDIUM风险 + - 不等LLM,采集完立刻判断 + - 检测到风险→写入signal_news+macro_risk_state.json + - 盘中的监控cron(每15-25分)读到后立刻调整策略 + +红绿灯规则: + HIGH(单条就触发): 全球巨头+核心产业负面/美联储意外/指数暴跌/地缘冲突 + MEDIUM(累计2条触发): 常规宏观事件/板块级/资金面 +""" + +import sys, json, hashlib, os, re +from datetime import datetime +from pathlib import Path + +DATA_DIR = Path(__file__).parent.parent / "data" +DB_PATH = DATA_DIR / "mofin.db" +STATE_PATH = DATA_DIR / "macro_risk_state.json" + +# ── 红绿灯 关键词规则 ── +# HIGH: 任何一条匹配 → 立即 HIGH 预警 +HIGH_PATTERNS = [ + # 全球巨头+核心产业 + r"苹果.*(?:涨价|降价|推迟|取消|禁|制裁|调查|召回|大跌|暴跌)", + r"openai.*(?:推迟|取消|风险|调查|起诉|倒闭|ipo)", + r"英伟达|nvidia.*(?:跌|调查|制裁|推迟|禁令)", + r"台积电.*(?:跌|推迟|取消|地震|火灾|禁)", + r"特斯拉.*(?:暴跌|召回|调查|破产|禁)", + # 美联储/央行意外 + r"美联储.*(?:意外|紧急|缩表|风暴|警告|超预期|加息\s*50|降息\s*50|紧急\s*(?:会议|声明))", + r"美联储.*(?:利率|决议).*(?:超预期|意外|紧急)", + r"fed.*(?:emergency|unexpected|surprise|hike|cut)", + # 指数暴跌 + r"指数.*(?:跌幅|暴跌|熔断|闪崩|重挫)", + r"(?:暴跌|重挫|熔断).*[5-9]%", + r"熔断|闪崩", + # 地缘+贸易 + r"关税.*(?:升级|新|报复|制裁)", + r"制裁.*(?:新|升级|全面)", + r"战争|开战|入侵|核|导弹.*发射", + # 系统性能源 + r"原油.*(?:跌破|暴跌|崩盘|断供)", + r"石油.*(?:禁运|制裁|断供)", + r"能源危机|粮食危机", + # 系统金融 + r"银行.*(?:倒闭|挤兑|破产|接管|危机)", + r"金融危机|债务危机|违约潮|系统性", + # AI/科技板块重挫 + r"半导体.*(?:暴跌|熔断|崩盘|跌幅)", + r"科技股.*(?:暴跌|熔断|崩盘|重挫)", + r"费城半导体|sox.*(?:跌|崩)", +] + +# MEDIUM: 累计匹配2条以上 → MEDIUM 预警 +MEDIUM_PATTERNS = [ + r"加息|降息", + r"通胀|CPI|PPI", + r"汇率.*(?:大幅|波动|贬值|升值)", + r"外资.*(?:流出|撤离|减持)", + r"北向.*(?:流出|净卖出|大幅)", + r"季末|年末|半年末|资金回笼|流动性紧张", + r"解禁.*(?:大额|巨量|千亿)", + r"大跌|重挫|杀跌|恐慌", + r"评级.*(?:下调|负面|降级)", + r"预警|风险提示|谨慎", + r"期货.*(?:暴跌|跌停|熔断)", + r"黑天鹅|灰犀牛", +] + +def ensure_tables(conn): + conn.execute(""" + CREATE TABLE IF NOT EXISTS macro_raw_news ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + title TEXT NOT NULL, + summary TEXT DEFAULT '', + url TEXT DEFAULT '', + source_ts TEXT DEFAULT '', + fetched_at TEXT DEFAULT (datetime('now','localtime')), + risk_level TEXT DEFAULT 'unassessed', + risk_reason TEXT DEFAULT '' + ) + """) + conn.commit() + +def fetch_news(): + try: + import akshare as ak + df = ak.stock_info_global_em() + items = [] + for _, row in df.iterrows(): + items.append({ + "title": str(row.get("标题", "")), + "summary": str(row.get("摘要", "")), + "url": str(row.get("链接", "")), + "source_ts": str(row.get("发布时间", "")), + }) + return items + except ImportError: + print("[MACRO] akshare 未安装", file=sys.stderr) + return [] + except Exception as e: + print(f"[MACRO] 采集失败: {e}", file=sys.stderr) + return [] + +def title_hash(title): + return hashlib.md5(title.encode()).hexdigest()[:16] + +def quick_risk_check(items): + """红绿灯:快速关键词检测,返回 (level, matched_articles, summary)""" + hits_high = [] + hits_medium = [] + + for item in items: + text = (item["title"] + " " + item["summary"]).lower() + for pattern in HIGH_PATTERNS: + if re.search(pattern, text): + hits_high.append(item) + break + else: + # 没进 HIGH 才检查 MEDIUM + for pattern in MEDIUM_PATTERNS: + if re.search(pattern, text): + hits_medium.append(item) + break + + level = "none" + summary = "" + matched = [] + + if hits_high: + level = "high" + matched = hits_high[:5] + titles = [f"· {h['title'][:50]}" for h in hits_high[:5]] + summary = f"【高风险】{len(hits_high)}条紧急信号:\n" + "\n".join(titles) + elif len(hits_medium) >= 2: + level = "medium" + matched = hits_medium[:5] + titles = [f"· {h['title'][:50]}" for h in hits_medium[:5]] + summary = f"【中风险】{len(hits_medium)}条预警信号:\n" + "\n".join(titles) + + return level, matched, summary + +def write_risk_signal(conn, level, matched, summary): + """写入 signal_news + macro_risk_state.json""" + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + sentiment_map = {"high": "宏观-WATCH_HIGH", "medium": "宏观-WATCH_MEDIUM"} + sentiment = sentiment_map.get(level, "") + + # 写入 signal_news + articles_json = json.dumps([{"title": a["title"][:80], "summary": a["summary"][:120]} for a in matched], ensure_ascii=False) + conn.execute( + "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) VALUES (0, ?, ?, ?, ?, '', ?)", + ("宏观", sentiment, summary[:500], articles_json, "macro_watch") + ) + conn.commit() + + # 写入状态文件(供监控cron实时读取) + state = { + "level": level, + "signals": [{"sentiment": sentiment, "summary": summary[:300], "key_articles": articles_json, "created_at": now}], + "signal_count": len(matched), + "created_at": now, + "expired": False, + "source": "collector_realtime", + } + STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2)) + +def main(): + conn = sqlite3.connect(str(DB_PATH)) + ensure_tables(conn) + + # 去重基础 + existing = set() + for row in conn.execute("SELECT title FROM macro_raw_news ORDER BY id DESC LIMIT 200"): + existing.add(title_hash(row[0])) + + items = fetch_news() + if not items: + conn.close() + return + + # 去重写入 + new_items = [] + now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + for item in items: + if not item["title"].strip(): + continue + h = title_hash(item["title"]) + if h in existing: + continue + existing.add(h) + try: + conn.execute( + "INSERT INTO macro_raw_news (title, summary, url, source_ts, fetched_at) VALUES (?, ?, ?, ?, ?)", + (item["title"][:300], item["summary"][:500], item["url"][:500], item["source_ts"][:20], now_str) + ) + new_items.append(item) + except Exception: + pass + + conn.commit() + + # 红绿灯检测(只针对新采集的) + level, matched, summary = "none", [], "" + if new_items: + level, matched, summary = quick_risk_check(new_items) + if level in ("high", "medium"): + write_risk_signal(conn, level, matched, summary) + + conn.close() + + # no_agent 输出 + if new_items: + print(f"[MACRO] {len(new_items)}条新宏观新闻") + # HIGH风险:输出预警(会推送到XMPP) + if level == "high": + print(f"⚠️ HIGH风险预警 ({len(matched)}条): {summary[:200]}") + elif level == "medium": + print(f" MEDIUM风险: {len(matched)}条匹配") + +if __name__ == "__main__": + import sqlite3 + main() diff --git a/scripts/macro_index_collector.py b/scripts/macro_index_collector.py new file mode 100644 index 0000000..07752da --- /dev/null +++ b/scripts/macro_index_collector.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +"""macro_index_collector.py — 指数宏观数据采集 (no_agent) + +采集指数行情,写入 macro_context_log 表(积累每日快照)。 +与莫荷的 macro_context_collector.py(宏观新闻采集)分开,互不冲突。 + +在 9:35 和 12:00 由 cron 触发。 +""" + +import json, sqlite3, subprocess +from datetime import datetime +from pathlib import Path + +DATA_DIR = Path("/home/hmo/web-dashboard/data") +DB_PATH = Path("/home/hmo/MoFin/data/mofin.db") + +INDICES = [ + ("sh000001", "上证指数"), ("sz399001", "深证成指"), + ("sz399006", "创业板指"), ("sh000688", "科创50"), + ("hkHSI", "恒生指数"), ("hkHSCEI", "国企指数"), +] + +SECTOR_INDICES = [ + ("sz980017", "国证芯片"), ("sh000039", "上证信息"), + ("sz980022", "机器人产业"), ("sz980032", "新能电池"), + ("sz980076", "通用航空"), ("sh000063", "上证周期"), + ("sh000068", "上证资源"), ("sh000019", "治理指数"), +] + + +def fetch(symbol): + try: + r = subprocess.run(["curl", "-s", f"https://qt.gtimg.cn/q={symbol}"], + capture_output=True, timeout=8) + return r.stdout.decode("gbk", errors="replace") + except: + return None + + +def parse(text, name): + if not text or "pv_none_match" in text: + return None + try: + f = text.split("~") + if len(f) < 35: + return None + price = float(f[3]) if f[3] else 0 + prev = float(f[4]) if f[4] else 0 + high = float(f[33]) if len(f) > 33 and f[33] else 0 + low = float(f[34]) if len(f) > 34 and f[34] else 0 + chg = round((price - prev) / prev * 100, 2) if prev else 0 + return {"name": name, "price": price, "change_pct": chg, + "high": high, "low": low} + except: + return None + + +def assess(indices_data): + if not indices_data: + return "unknown", "unknown" + sh = next((i for i in indices_data if i and i["name"] == "上证指数"), None) + sz = next((i for i in indices_data if i and i["name"] == "深证成指"), None) + cyb = next((i for i in indices_data if i and i["name"] == "创业板指"), None) + avg = ((sh or {}).get("change_pct", 0) + (sz or {}).get("change_pct", 0) + + (cyb or {}).get("change_pct", 0)) / 3 + if avg > 1.5: + return "strong_bullish", "整体强势" + elif avg > 0.5: + return "bullish", "偏强" + elif avg > -0.5: + return "neutral", "震荡" + elif avg > -1.5: + return "bearish", "偏弱" + return "strong_bearish", "整体弱势" + + +def main(): + now = datetime.now() + indices_data = [] + for sym, name in INDICES: + raw = fetch(sym) + p = parse(raw, name) + if p: + indices_data.append(p) + + has_data = any(i for i in indices_data if i and i.get("price", 0) > 0) + overall, desc = assess(indices_data) + + # 读莫荷的 macro_risk_state.json 补充风险状态 + risk_state = {} + try: + risk_path = DATA_DIR / "macro_risk_state.json" + if risk_path.exists(): + risk_state = json.loads(risk_path.read_text()) + except: + pass + + context = { + "updated_at": now.strftime("%Y-%m-%d %H:%M:%S"), + "session": "morning" if now.hour < 12 else "midday", + "has_valid_data": has_data, + "indices": {i["name"]: {"price": i["price"], "change_pct": i["change_pct"], + "high": i.get("high", 0), "low": i.get("low", 0)} + for i in indices_data if i}, + "structure": {"overall": overall, "description": desc}, + "risk_level": risk_state.get("level", "none"), + "risk_reason": risk_state.get("reason", ""), + } + + # 写入DB + try: + conn = sqlite3.connect(str(DB_PATH)) + conn.execute('''INSERT INTO macro_context_log + (data_timestamp, session, has_valid_data, indices, structure, + key_sectors, top_gainers, top_losers, sector_up_ratio, sector_mood) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', + (context["updated_at"], context["session"], + 1 if has_data else 0, + json.dumps(context["indices"], ensure_ascii=False), + json.dumps(context["structure"], ensure_ascii=False), + "[]", "[]", "[]", 0, overall)) + conn.commit() + conn.close() + except: + pass + + # 写入 macro_context.json(兼容旧读取方) + try: + DATA_DIR.mkdir(parents=True, exist_ok=True) + (DATA_DIR / "macro_context.json").write_text( + json.dumps(context, ensure_ascii=False, indent=2)) + except: + pass + + if has_data: + idx_parts = [f"{i['name']}{'▲' if i['change_pct']>0 else '▼'}{i['change_pct']:+.2f}%" + for i in indices_data[:4] if i] + print(f"【宏观指数】{now.strftime('%H:%M')} | {desc}") + print(" | ".join(idx_parts)) + print(f"评估: {'整体强势' if overall=='strong_bullish' else '偏强' if overall=='bullish' else '震荡' if overall=='neutral' else '偏弱' if overall=='bearish' else '整体弱势'}") + if risk_state.get("level") == "high": + print(f"🔴 宏观风险: {risk_state.get('reason','')}") + else: + print("【宏观指数】数据不可用(非交易时段或行情未更新)") + + +if __name__ == "__main__": + main() diff --git a/scripts/macro_signal_consumer.py b/scripts/macro_signal_consumer.py new file mode 100644 index 0000000..719e6ea --- /dev/null +++ b/scripts/macro_signal_consumer.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +""" +macro_signal_consumer.py — 消费宏观风险信号,写入风险状态 + +no_agent 模式:有HIGH风险→输出风险摘要 | 无→静默 + +管道位置: + macro_risk_scanner (8:30/11:30) → signal_news(source=macro_watch) → 本脚本 + ↓ + macro_risk_state.json — 供所有监控 cron 读取 + ↓ + 如果 HIGH → 推送到 Dad +""" +import sqlite3, json, os +from pathlib import Path +from datetime import datetime + +BASE = Path("/home/hmo/MoFin") +DATA = BASE / "data" +DB_PATH = DATA / "mofin.db" +STATE_PATH = DATA / "macro_risk_state.json" + +def main(): + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + + # 读取未处理的 macro_watch 信号 + rows = conn.execute( + "SELECT * FROM signal_news WHERE source='macro_watch' AND (processed=0 OR processed IS NULL)" + ).fetchall() + + if not rows: + # 无新信号时状态文件维持 15 分钟过期 + try: + state = json.loads(STATE_PATH.read_text()) + created = datetime.strptime(state.get("created_at", "2000-01-01"), "%Y-%m-%d %H:%M:%S") + if (datetime.now() - created).total_seconds() > 900: # 15 min + state["level"] = "none" + state["expired"] = True + STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2)) + except: + pass + conn.close() + return # SILENT + + # 聚合风险等级 + levels = {"宏观-WATCH_HIGH": "high", "宏观-WATCH_MEDIUM": "medium", "宏观-WATCH_INFO": "info"} + highest = "info" + all_summaries = [] + + for r in rows: + sentiment = r["overall_sentiment"] + lv = levels.get(sentiment, "info") + if lv == "high": + highest = "high" + elif lv == "medium" and highest != "high": + highest = "medium" + all_summaries.append({ + "sentiment": sentiment, + "summary": r["summary"][:300], + "key_articles": r["key_articles"], + "created_at": r["created_at"], + }) + + # 写入风险状态文件 + state = { + "level": highest, + "signals": all_summaries, + "signal_count": len(rows), + "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "expired": False, + } + STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2)) + + # 标记为已处理 + for r in rows: + conn.execute("UPDATE signal_news SET processed=1 WHERE id=?", (r["id"],)) + conn.commit() + conn.close() + + # no_agent 输出(有 HIGH 才主动出声) + if highest == "high": + print(f"[MACRO-RISK] ⚠️ HIGH: {len(rows)}条高风险信号") + for s in all_summaries: + print(f" {s['summary'][:100]}") + elif highest == "medium": + if len(os.environ.get("MACRO_VERBOSE", "")) > 0: + print(f"[MACRO-RISK] MEDIUM: {len(rows)}条中风险信号") + # HIGH 信号会通过 no_agent 推送到 XMPP + +if __name__ == "__main__": + main()