revert: 去掉我误建的macro_index_collector(莫荷的macro_context_collector已覆盖)

爹在XMPP里和我(知微)商量的宏观预警系统就是这3个文件:
  macro_context_collector.py — 宏观新闻采集+指数采集+红绿灯
  divergence_detector.py — 跨市场背离
  macro_signal_consumer.py — 风险信号消费
我不该另建macro_index_collector,现已删除,cron已恢复指向原文件
This commit is contained in:
知微
2026-06-27 01:16:22 +08:00
parent 54915e9b7e
commit cb779b692c
-148
View File
@@ -1,148 +0,0 @@
#!/usr/bin/env python3
"""macro_index_collector.py — 指数宏观数据采集 (no_agent)
采集指数行情,写入 macro_context_log 表(积累每日快照)。
与莫荷的 macro_context_collector.py(宏观新闻采集)分开,互不冲突。
在 9:35 和 12:00 由 cron 触发。
"""
import json, sqlite3, subprocess
from datetime import datetime
from pathlib import Path
DATA_DIR = Path("/home/hmo/web-dashboard/data")
DB_PATH = Path("/home/hmo/MoFin/data/mofin.db")
INDICES = [
("sh000001", "上证指数"), ("sz399001", "深证成指"),
("sz399006", "创业板指"), ("sh000688", "科创50"),
("hkHSI", "恒生指数"), ("hkHSCEI", "国企指数"),
]
SECTOR_INDICES = [
("sz980017", "国证芯片"), ("sh000039", "上证信息"),
("sz980022", "机器人产业"), ("sz980032", "新能电池"),
("sz980076", "通用航空"), ("sh000063", "上证周期"),
("sh000068", "上证资源"), ("sh000019", "治理指数"),
]
def fetch(symbol):
try:
r = subprocess.run(["curl", "-s", f"https://qt.gtimg.cn/q={symbol}"],
capture_output=True, timeout=8)
return r.stdout.decode("gbk", errors="replace")
except:
return None
def parse(text, name):
if not text or "pv_none_match" in text:
return None
try:
f = text.split("~")
if len(f) < 35:
return None
price = float(f[3]) if f[3] else 0
prev = float(f[4]) if f[4] else 0
high = float(f[33]) if len(f) > 33 and f[33] else 0
low = float(f[34]) if len(f) > 34 and f[34] else 0
chg = round((price - prev) / prev * 100, 2) if prev else 0
return {"name": name, "price": price, "change_pct": chg,
"high": high, "low": low}
except:
return None
def assess(indices_data):
if not indices_data:
return "unknown", "unknown"
sh = next((i for i in indices_data if i and i["name"] == "上证指数"), None)
sz = next((i for i in indices_data if i and i["name"] == "深证成指"), None)
cyb = next((i for i in indices_data if i and i["name"] == "创业板指"), None)
avg = ((sh or {}).get("change_pct", 0) + (sz or {}).get("change_pct", 0) +
(cyb or {}).get("change_pct", 0)) / 3
if avg > 1.5:
return "strong_bullish", "整体强势"
elif avg > 0.5:
return "bullish", "偏强"
elif avg > -0.5:
return "neutral", "震荡"
elif avg > -1.5:
return "bearish", "偏弱"
return "strong_bearish", "整体弱势"
def main():
now = datetime.now()
indices_data = []
for sym, name in INDICES:
raw = fetch(sym)
p = parse(raw, name)
if p:
indices_data.append(p)
has_data = any(i for i in indices_data if i and i.get("price", 0) > 0)
overall, desc = assess(indices_data)
# 读莫荷的 macro_risk_state.json 补充风险状态
risk_state = {}
try:
risk_path = DATA_DIR / "macro_risk_state.json"
if risk_path.exists():
risk_state = json.loads(risk_path.read_text())
except:
pass
context = {
"updated_at": now.strftime("%Y-%m-%d %H:%M:%S"),
"session": "morning" if now.hour < 12 else "midday",
"has_valid_data": has_data,
"indices": {i["name"]: {"price": i["price"], "change_pct": i["change_pct"],
"high": i.get("high", 0), "low": i.get("low", 0)}
for i in indices_data if i},
"structure": {"overall": overall, "description": desc},
"risk_level": risk_state.get("level", "none"),
"risk_reason": risk_state.get("reason", ""),
}
# 写入DB
try:
conn = sqlite3.connect(str(DB_PATH))
conn.execute('''INSERT INTO macro_context_log
(data_timestamp, session, has_valid_data, indices, structure,
key_sectors, top_gainers, top_losers, sector_up_ratio, sector_mood)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(context["updated_at"], context["session"],
1 if has_data else 0,
json.dumps(context["indices"], ensure_ascii=False),
json.dumps(context["structure"], ensure_ascii=False),
"[]", "[]", "[]", 0, overall))
conn.commit()
conn.close()
except:
pass
# 写入 macro_context.json(兼容旧读取方)
try:
DATA_DIR.mkdir(parents=True, exist_ok=True)
(DATA_DIR / "macro_context.json").write_text(
json.dumps(context, ensure_ascii=False, indent=2))
except:
pass
if has_data:
idx_parts = [f"{i['name']}{'' if i['change_pct']>0 else ''}{i['change_pct']:+.2f}%"
for i in indices_data[:4] if i]
print(f"【宏观指数】{now.strftime('%H:%M')} | {desc}")
print(" | ".join(idx_parts))
print(f"评估: {'整体强势' if overall=='strong_bullish' else '偏强' if overall=='bullish' else '震荡' if overall=='neutral' else '偏弱' if overall=='bearish' else '整体弱势'}")
if risk_state.get("level") == "high":
print(f"🔴 宏观风险: {risk_state.get('reason','')}")
else:
print("【宏观指数】数据不可用(非交易时段或行情未更新)")
if __name__ == "__main__":
main()