279 lines
8.7 KiB
Python
279 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
||
"""xiaoguo_signal_consumer.py — 知微消费小果扫描信号
|
||
|
||
盘中每30分钟运行,读取 signal_news 表中未处理的 xiaoguo 信号,
|
||
做五维快速评估后决定:加自选 / 关注 / 跳过。
|
||
|
||
管道位置:
|
||
xiaoguo_scanner (每5分) → signal_news → 本脚本 → 知微分析报告
|
||
|
||
no_agent模式:有发现→输出,无→静默
|
||
"""
|
||
|
||
import json, os, sqlite3, sys, time, urllib.request
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
|
||
BASE = Path("/home/hmo/MoFin")
|
||
DATA = BASE / "data"
|
||
DB_PATH = DATA / "mofin.db"
|
||
|
||
# 自选池和决策文件
|
||
WATCHLIST_PATH = DATA / "watchlist.json"
|
||
DECISIONS_PATH = DATA / "decisions.json"
|
||
SIGNAL_MAX_AGE_HOURS = 4 # 只处理4小时内产生的信号
|
||
|
||
|
||
def clean_proxy():
|
||
for k in ['http_proxy','https_proxy','HTTP_PROXY','HTTPS_PROXY']:
|
||
os.environ.pop(k, None)
|
||
|
||
|
||
def fetch_quote(code):
|
||
"""拉行情。DB 优先,腾讯 fallback"""
|
||
# DB 优先
|
||
try: from mofin_db import get_price_from_db; p, chg = get_price_from_db(code); return {"name":"", "code":code, "price":p, "change_pct":chg or 0} if p else None
|
||
except: pass
|
||
# Fallback: 腾讯
|
||
try:
|
||
prefix = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk"
|
||
url = f"http://qt.gtimg.cn/q={prefix}{code}"
|
||
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
|
||
resp = urllib.request.urlopen(req, timeout=5).read().decode('gbk')
|
||
fld = resp.split('=')[1].strip().strip('"').strip(';').split('~')
|
||
return {
|
||
"name": fld[1] if len(fld) > 1 else "",
|
||
"code": code,
|
||
"price": float(fld[3]) if len(fld) > 3 else 0,
|
||
"change_pct": float(fld[32]) if len(fld) > 32 else 0,
|
||
"pe": float(fld[39]) if len(fld) > 39 and fld[39] else 0,
|
||
"turnover": float(fld[38]) if len(fld) > 38 and fld[38] else 0,
|
||
}
|
||
except Exception as e:
|
||
return {"code": code, "error": str(e)[:60]}
|
||
|
||
|
||
def is_in_portfolio(conn, code):
|
||
"""检查是否已在持仓或自选中"""
|
||
code_stripped = code.lstrip("0")
|
||
cur = conn.execute("SELECT COUNT(*) FROM holdings WHERE code=? AND is_active=1", (code_stripped,))
|
||
if cur.fetchone()[0] > 0:
|
||
return "holdings"
|
||
cur = conn.execute("SELECT COUNT(*) FROM watchlist_stocks WHERE code=?", (code,))
|
||
if cur.fetchone()[0] > 0:
|
||
return "watchlist"
|
||
# 也检查 watchlist.json
|
||
try:
|
||
wl = json.loads(WATCHLIST_PATH.read_text())
|
||
for s in wl.get("stocks", []):
|
||
if s.get("code") == code or s.get("code", "").lstrip("0") == code_stripped:
|
||
return "watchlist"
|
||
except:
|
||
pass
|
||
return None
|
||
|
||
|
||
def quick_assess(quote):
|
||
"""五维快速评估(自动版)"""
|
||
score = 0
|
||
reasons = []
|
||
|
||
# 大盘环境,从DB读(回退JSON)
|
||
try:
|
||
import sqlite3
|
||
conn = sqlite3.connect(str(DB_PATH))
|
||
row = conn.execute(
|
||
"SELECT indices FROM macro_context_log WHERE has_valid_data=1 ORDER BY created_at DESC LIMIT 1"
|
||
).fetchone()
|
||
conn.close()
|
||
if row and row[0]:
|
||
mc = json.loads(row[0])
|
||
else:
|
||
raise ValueError
|
||
sh = 0
|
||
for k, v in mc.items():
|
||
if "上证" in k:
|
||
sh = v.get("change_pct", 0)
|
||
break
|
||
if sh > 0.5:
|
||
score += 1
|
||
reasons.append(f"大盘+{sh:.1f}%偏强")
|
||
elif sh < -0.5:
|
||
score -= 1
|
||
reasons.append(f"大盘{sh:.1f}%偏弱")
|
||
except Exception:
|
||
try:
|
||
mc = json.loads((DATA / "macro_context.json").read_text())
|
||
sh = mc.get("shanghai", {}).get("change_pct", 0)
|
||
if sh > 0.5:
|
||
score += 1
|
||
reasons.append(f"大盘+{sh:.1f}%偏强")
|
||
elif sh < -0.5:
|
||
score -= 1
|
||
reasons.append(f"大盘{sh:.1f}%偏弱")
|
||
except:
|
||
pass
|
||
|
||
# 技术面:涨跌幅
|
||
chg = quote.get("change_pct", 0)
|
||
if chg > 3:
|
||
score += 1
|
||
reasons.append(f"涨幅+{chg:.1f}%偏强")
|
||
elif chg < -3:
|
||
score -= 1
|
||
reasons.append(f"跌幅{chg:.1f}%偏弱")
|
||
else:
|
||
score += 0.5
|
||
reasons.append(f"走势平稳{chg:+.1f}%")
|
||
|
||
# 基本面:PE
|
||
pe = quote.get("pe", 0)
|
||
if 5 < pe < 40:
|
||
score += 1
|
||
reasons.append(f"PE={pe:.0f}合理")
|
||
elif pe <= 0:
|
||
score -= 0.5
|
||
reasons.append("PE为负")
|
||
elif pe > 100:
|
||
score -= 0.5
|
||
reasons.append(f"PE={pe:.0f}偏高")
|
||
|
||
# 量能
|
||
turn = quote.get("turnover", 0)
|
||
if turn > 5:
|
||
score += 0.5
|
||
reasons.append(f"换手{turn:.1f}%活跃")
|
||
elif turn < 0.5:
|
||
score -= 0.3
|
||
reasons.append(f"换手{turn:.1f}%偏低")
|
||
|
||
return score, reasons
|
||
|
||
|
||
def evaluate_and_act(signal, quote):
|
||
"""评估信号并决定操作"""
|
||
status_in = is_in_portfolio(get_conn(), signal.get("code", ""))
|
||
if status_in:
|
||
return f"已在{status_in}中,跳过", None
|
||
|
||
score, reasons = quick_assess(quote)
|
||
|
||
if score >= 1.5:
|
||
action = "watchlist"
|
||
summary = f"加自选: {' | '.join(reasons)}"
|
||
elif score >= 0:
|
||
action = "monitor"
|
||
summary = f"关注: {' | '.join(reasons)}"
|
||
else:
|
||
action = "skip"
|
||
summary = f"跳过(评分{score:.1f}): {' | '.join(reasons)}"
|
||
|
||
return summary, action
|
||
|
||
|
||
def get_conn():
|
||
import sqlite3
|
||
conn = sqlite3.connect(str(DB_PATH))
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
|
||
def mark_processed(conn, signal_id):
|
||
conn.execute("UPDATE signal_news SET processed=1 WHERE id=?", (signal_id,))
|
||
conn.commit()
|
||
|
||
|
||
def main():
|
||
clean_proxy()
|
||
start = time.time()
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
conn = get_conn()
|
||
|
||
# 读未处理 xiaoguo 信号(今日)
|
||
rows = conn.execute(
|
||
"SELECT id, sector, overall_sentiment, summary, key_articles, searched_stocks, source "
|
||
"FROM signal_news "
|
||
"WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL) "
|
||
"AND date(created_at) = ? "
|
||
"ORDER BY created_at DESC LIMIT 20",
|
||
(today,)
|
||
).fetchall()
|
||
|
||
if not rows:
|
||
conn.close()
|
||
print("[SILENT] 今日无未处理小果信号")
|
||
return
|
||
|
||
# 尝试从 searched_stocks 提取股票代码
|
||
results = []
|
||
for r in rows:
|
||
try:
|
||
searched = json.loads(r["searched_stocks"]) if r["searched_stocks"] else []
|
||
except:
|
||
searched = []
|
||
|
||
# 从 sector 字段取股票名
|
||
sector_name = r["sector"] or ""
|
||
|
||
# 尝试提取代码
|
||
codes_found = []
|
||
for s in searched:
|
||
# searched_stocks 存的是股票名称列表
|
||
# 尝试从 summary 里找代码
|
||
import re
|
||
codes = re.findall(r'\d{6}', r["summary"] or "")
|
||
codes_found.extend(codes)
|
||
|
||
if not codes_found:
|
||
# 没有直接代码,用名称去查
|
||
mark_processed(conn, r["id"])
|
||
continue
|
||
|
||
code = codes_found[0]
|
||
quote = fetch_quote(code)
|
||
|
||
summary, action = evaluate_and_act(dict(r), quote)
|
||
|
||
if action == "watchlist":
|
||
# 加自选
|
||
results.append(f"✅ {sector_name}({code}): {summary}")
|
||
# 写入 watchlist.json
|
||
try:
|
||
wl = json.loads(WATCHLIST_PATH.read_text())
|
||
wl.setdefault("stocks", [])
|
||
# 检查是否已在
|
||
existing = [s for s in wl["stocks"] if s.get("code") == code]
|
||
if not existing:
|
||
wl["stocks"].append({
|
||
"code": code,
|
||
"name": quote.get("name", sector_name),
|
||
"price": quote.get("price", 0),
|
||
"status": "watching",
|
||
"source": "xiaoguo_scanner",
|
||
"added_at": today,
|
||
})
|
||
WATCHLIST_PATH.write_text(json.dumps(wl, ensure_ascii=False, indent=2))
|
||
except:
|
||
pass
|
||
elif action == "monitor":
|
||
results.append(f"🔄 {sector_name}({code}): {summary}")
|
||
else:
|
||
results.append(f"⏭️ {sector_name}({code}): {summary}")
|
||
|
||
mark_processed(conn, r["id"])
|
||
|
||
conn.close()
|
||
|
||
elapsed = time.time() - start
|
||
if results:
|
||
print(f"小果信号消费 | {today} | {len(results)}条处理 ({elapsed:.0f}s)")
|
||
for r in results:
|
||
print(f" {r}")
|
||
else:
|
||
print("[SILENT] 小果信号消费结束")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|