Files
MoFin/scripts/xiaoguo_signal_consumer.py
T
知微 39ff4d95f7 feat: macro_context/market数据全部DB优先,JSON回退
- 建 macro_context_log 表,macro_context_collector.py 双写
- strategy_lifecycle.py load_macro_context() 优先DB
- strategy_tree.py detect_scenario() 优先DB
- stale_push_wlin.py load_macro_line() 优先DB
- xiaoguo_signal_consumer.py 大盘判断优先DB
- stock_profile.py load_macro() 优先DB
- system_audit.py 管道审计改查DB market_snapshots
- JSON保留作fallback,确保过渡期不中断
2026-06-24 22:34:08 +08:00

275 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""xiaoguo_signal_consumer.py — 知微消费小果扫描信号
盘中每30分钟运行,读取 signal_news 表中未处理的 xiaoguo 信号,
做五维快速评估后决定:加自选 / 关注 / 跳过。
管道位置:
xiaoguo_scanner (每5分) → signal_news → 本脚本 → 知微分析报告
no_agent模式:有发现→输出,无→静默
"""
import json, os, sqlite3, sys, time, urllib.request
from pathlib import Path
from datetime import datetime
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
DB_PATH = DATA / "mofin.db"
# 自选池和决策文件
WATCHLIST_PATH = DATA / "watchlist.json"
DECISIONS_PATH = DATA / "decisions.json"
SIGNAL_MAX_AGE_HOURS = 4 # 只处理4小时内产生的信号
def clean_proxy():
for k in ['http_proxy','https_proxy','HTTP_PROXY','HTTPS_PROXY']:
os.environ.pop(k, None)
def fetch_quote(code):
"""拉腾讯行情,返回 dict"""
try:
prefix = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk"
url = f"http://qt.gtimg.cn/q={prefix}{code}"
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
resp = urllib.request.urlopen(req, timeout=5).read().decode('gbk')
fld = resp.split('=')[1].strip().strip('"').strip(';').split('~')
return {
"name": fld[1] if len(fld) > 1 else "",
"code": code,
"price": float(fld[3]) if len(fld) > 3 else 0,
"change_pct": float(fld[32]) if len(fld) > 32 else 0,
"pe": float(fld[39]) if len(fld) > 39 and fld[39] else 0,
"turnover": float(fld[38]) if len(fld) > 38 and fld[38] else 0,
}
except Exception as e:
return {"code": code, "error": str(e)[:60]}
def is_in_portfolio(conn, code):
"""检查是否已在持仓或自选中"""
code_stripped = code.lstrip("0")
cur = conn.execute("SELECT COUNT(*) FROM holdings WHERE code=? AND is_active=1", (code_stripped,))
if cur.fetchone()[0] > 0:
return "holdings"
cur = conn.execute("SELECT COUNT(*) FROM watchlist_stocks WHERE code=?", (code,))
if cur.fetchone()[0] > 0:
return "watchlist"
# 也检查 watchlist.json
try:
wl = json.loads(WATCHLIST_PATH.read_text())
for s in wl.get("stocks", []):
if s.get("code") == code or s.get("code", "").lstrip("0") == code_stripped:
return "watchlist"
except:
pass
return None
def quick_assess(quote):
"""五维快速评估(自动版)"""
score = 0
reasons = []
# 大盘环境,从DB读(回退JSON
try:
import sqlite3
conn = sqlite3.connect(str(DB_PATH))
row = conn.execute(
"SELECT indices FROM macro_context_log WHERE has_valid_data=1 ORDER BY created_at DESC LIMIT 1"
).fetchone()
conn.close()
if row and row[0]:
mc = json.loads(row[0])
else:
raise ValueError
sh = 0
for k, v in mc.items():
if "上证" in k:
sh = v.get("change_pct", 0)
break
if sh > 0.5:
score += 1
reasons.append(f"大盘+{sh:.1f}%偏强")
elif sh < -0.5:
score -= 1
reasons.append(f"大盘{sh:.1f}%偏弱")
except Exception:
try:
mc = json.loads((DATA / "macro_context.json").read_text())
sh = mc.get("shanghai", {}).get("change_pct", 0)
if sh > 0.5:
score += 1
reasons.append(f"大盘+{sh:.1f}%偏强")
elif sh < -0.5:
score -= 1
reasons.append(f"大盘{sh:.1f}%偏弱")
except:
pass
# 技术面:涨跌幅
chg = quote.get("change_pct", 0)
if chg > 3:
score += 1
reasons.append(f"涨幅+{chg:.1f}%偏强")
elif chg < -3:
score -= 1
reasons.append(f"跌幅{chg:.1f}%偏弱")
else:
score += 0.5
reasons.append(f"走势平稳{chg:+.1f}%")
# 基本面:PE
pe = quote.get("pe", 0)
if 5 < pe < 40:
score += 1
reasons.append(f"PE={pe:.0f}合理")
elif pe <= 0:
score -= 0.5
reasons.append("PE为负")
elif pe > 100:
score -= 0.5
reasons.append(f"PE={pe:.0f}偏高")
# 量能
turn = quote.get("turnover", 0)
if turn > 5:
score += 0.5
reasons.append(f"换手{turn:.1f}%活跃")
elif turn < 0.5:
score -= 0.3
reasons.append(f"换手{turn:.1f}%偏低")
return score, reasons
def evaluate_and_act(signal, quote):
"""评估信号并决定操作"""
status_in = is_in_portfolio(get_conn(), signal.get("code", ""))
if status_in:
return f"已在{status_in}中,跳过", None
score, reasons = quick_assess(quote)
if score >= 1.5:
action = "watchlist"
summary = f"加自选: {' | '.join(reasons)}"
elif score >= 0:
action = "monitor"
summary = f"关注: {' | '.join(reasons)}"
else:
action = "skip"
summary = f"跳过(评分{score:.1f}): {' | '.join(reasons)}"
return summary, action
def get_conn():
import sqlite3
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def mark_processed(conn, signal_id):
conn.execute("UPDATE signal_news SET processed=1 WHERE id=?", (signal_id,))
conn.commit()
def main():
clean_proxy()
start = time.time()
today = datetime.now().strftime("%Y-%m-%d")
conn = get_conn()
# 读未处理 xiaoguo 信号(今日)
rows = conn.execute(
"SELECT id, sector, overall_sentiment, summary, key_articles, searched_stocks, source "
"FROM signal_news "
"WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL) "
"AND date(created_at) = ? "
"ORDER BY created_at DESC LIMIT 20",
(today,)
).fetchall()
if not rows:
conn.close()
print("[SILENT] 今日无未处理小果信号")
return
# 尝试从 searched_stocks 提取股票代码
results = []
for r in rows:
try:
searched = json.loads(r["searched_stocks"]) if r["searched_stocks"] else []
except:
searched = []
# 从 sector 字段取股票名
sector_name = r["sector"] or ""
# 尝试提取代码
codes_found = []
for s in searched:
# searched_stocks 存的是股票名称列表
# 尝试从 summary 里找代码
import re
codes = re.findall(r'\d{6}', r["summary"] or "")
codes_found.extend(codes)
if not codes_found:
# 没有直接代码,用名称去查
mark_processed(conn, r["id"])
continue
code = codes_found[0]
quote = fetch_quote(code)
summary, action = evaluate_and_act(dict(r), quote)
if action == "watchlist":
# 加自选
results.append(f"{sector_name}({code}): {summary}")
# 写入 watchlist.json
try:
wl = json.loads(WATCHLIST_PATH.read_text())
wl.setdefault("stocks", [])
# 检查是否已在
existing = [s for s in wl["stocks"] if s.get("code") == code]
if not existing:
wl["stocks"].append({
"code": code,
"name": quote.get("name", sector_name),
"price": quote.get("price", 0),
"status": "watching",
"source": "xiaoguo_scanner",
"added_at": today,
})
WATCHLIST_PATH.write_text(json.dumps(wl, ensure_ascii=False, indent=2))
except:
pass
elif action == "monitor":
results.append(f"🔄 {sector_name}({code}): {summary}")
else:
results.append(f"⏭️ {sector_name}({code}): {summary}")
mark_processed(conn, r["id"])
conn.close()
elapsed = time.time() - start
if results:
print(f"小果信号消费 | {today} | {len(results)}条处理 ({elapsed:.0f}s)")
for r in results:
print(f" {r}")
else:
print("[SILENT] 小果信号消费结束")
if __name__ == "__main__":
main()