fix: market_screener market.json→DB, remove all JSON refs

This commit is contained in:
知微
2026-07-03 23:48:29 +08:00
parent b6959f73bc
commit a05b82cd6b
+283 -254
View File
@@ -1,254 +1,283 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""market_screener.py — 小果本地LLM全市场筛选(分步少吃多餐版)""" """market_screener.py — 小果本地LLM全市场筛选(分步少吃多餐版)"""
import json, os, re, time, urllib.request, urllib.error import json, os, re, time, urllib.request, urllib.error
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
WEB_DASHBOARD_DIR = Path(__file__).resolve().parent.parent.parent / "web-dashboard" if "hermes" in str(Path(__file__).resolve()) else Path(__file__).parent WEB_DASHBOARD_DIR = Path(__file__).resolve().parent.parent.parent / "web-dashboard" if "hermes" in str(Path(__file__).resolve()) else Path(__file__).parent
DATA_DIR = WEB_DASHBOARD_DIR / "data" DATA_DIR = WEB_DASHBOARD_DIR / "data"
MARKET_JSON = DATA_DIR / "market.json" POOL_JSON = DATA_DIR / "candidate_pool.json" # 筛选缓存(临时)
POOL_JSON = DATA_DIR / "candidate_pool.json" XIAOGUO_MODEL = "Qwen3.6-27B-OptiQ-4bit"
XIAOGUO_MODEL = "Qwen3.6-27B-OptiQ-4bit" API_TIMEOUT = 60
API_TIMEOUT = 60 MAX_SECTORS = 5
MAX_SECTORS = 5 MAX_CANDIDATES_POOL = 60
MAX_CANDIDATES_POOL = 60 TENCENT_URL = "http://qt.gtimg.cn/q="
TENCENT_URL = "http://qt.gtimg.cn/q="
def _get_xiaoguo_url():
def _get_xiaoguo_url(): try:
try: from mo_config import get_config
from mo_config import get_config return get_config().xiaoguo_api_url
return get_config().xiaoguo_api_url except Exception:
except Exception: return "http://node122:18003/v1/chat/completions" # legacy fallback
return "http://node122:18003/v1/chat/completions" # legacy fallback
def load_json(path):
def load_json(path): try:
try: with open(path, "r", encoding="utf-8") as f:
with open(path, "r", encoding="utf-8") as f: return json.load(f)
return json.load(f) except (FileNotFoundError, json.JSONDecodeError):
except (FileNotFoundError, json.JSONDecodeError): return None
return None
def save_json(path, data):
def save_json(path, data): path.parent.mkdir(parents=True, exist_ok=True)
path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as f:
with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2)
json.dump(data, f, ensure_ascii=False, indent=2)
def call_xiaoguo(messages, timeout=API_TIMEOUT):
def call_xiaoguo(messages, timeout=API_TIMEOUT): """调小果本地LLM(通过node122直连,/etc/hosts自动走LAN或EasyTier"""
"""调小果本地LLM(通过node122直连,/etc/hosts自动走LAN或EasyTier""" payload = json.dumps({"model": XIAOGUO_MODEL, "messages": messages,
payload = json.dumps({"model": XIAOGUO_MODEL, "messages": messages, "temperature": 0.1, "max_tokens": 2048}).encode()
"temperature": 0.1, "max_tokens": 2048}).encode() req = urllib.request.Request(_get_xiaoguo_url(), data=payload,
req = urllib.request.Request(_get_xiaoguo_url(), data=payload, headers={"Content-Type": "application/json",
headers={"Content-Type": "application/json", "Authorization": "Bearer hermes123"}, method="POST")
"Authorization": "Bearer hermes123"}, method="POST") try:
try: resp = urllib.request.build_opener(urllib.request.ProxyHandler({})).open(req, timeout=timeout)
resp = urllib.request.build_opener(urllib.request.ProxyHandler({})).open(req, timeout=timeout) return json.loads(resp.read())["choices"][0]["message"]["content"]
return json.loads(resp.read())["choices"][0]["message"]["content"] except Exception as e:
except Exception as e: print(f"API失败: {e}", flush=True)
print(f"API失败: {e}", flush=True) return None
return None
def extract_json(text):
def extract_json(text): """从回复中提取第一个完整JSON,跳过思考过程"""
"""从回复中提取第一个完整JSON,跳过思考过程""" # 优先找代码块
# 优先找代码块 m = re.search(r"```(?:json)?\s*(\{[\s\S]*?\})\s*```", text)
m = re.search(r"```(?:json)?\s*(\{[\s\S]*?\})\s*```", text) if m:
if m: try:
try: return json.loads(m.group(1))
return json.loads(m.group(1)) except json.JSONDecodeError:
except json.JSONDecodeError: pass
pass # 跳过思考过程行(Here's a thinking process / 数字列表项)
# 跳过思考过程行(Here's a thinking process / 数字列表项) clean_lines = []
clean_lines = [] for line in text.split("\n"):
for line in text.split("\n"): s = line.strip()
s = line.strip() if re.match(r"^\d+\.\s+\*\*", s) or "Here's a thinking" in s or "**Analyze" in s:
if re.match(r"^\d+\.\s+\*\*", s) or "Here's a thinking" in s or "**Analyze" in s: continue
continue if s.startswith("```"):
if s.startswith("```"): clean_lines.append(s)
clean_lines.append(s) continue
continue clean_lines.append(line)
clean_lines.append(line) clean = "\n".join(clean_lines)
clean = "\n".join(clean_lines) # 找第一个完整JSON
# 找第一个完整JSON start = clean.find("{")
start = clean.find("{") if start < 0:
if start < 0: return None
return None for mode in ["直接", "跳过头部"]:
for mode in ["直接", "跳过头部"]: for pos in [start, clean.find("\n{", start), clean.find("{", start + 1)]:
for pos in [start, clean.find("\n{", start), clean.find("{", start + 1)]: if pos < 0:
if pos < 0: continue
continue depth = 0
depth = 0 for i in range(pos, len(clean)):
for i in range(pos, len(clean)): if clean[i] == "{": depth += 1
if clean[i] == "{": depth += 1 elif clean[i] == "}":
elif clean[i] == "}": depth -= 1
depth -= 1 if depth == 0:
if depth == 0: try:
try: return json.loads(clean[pos:i+1])
return json.loads(clean[pos:i+1]) except json.JSONDecodeError:
except json.JSONDecodeError: break
break return None
return None
def tencent_quote(code):
def tencent_quote(code): try:
try: opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) tc = f"sh{code}" if code.startswith("6") or code.startswith("5") else f"sz{code}"
tc = f"sh{code}" if code.startswith("6") or code.startswith("5") else f"sz{code}" resp = opener.open(TENCENT_URL + tc, timeout=5)
resp = opener.open(TENCENT_URL + tc, timeout=5) d = resp.read().decode("gbk").split('="')[1].split('"')[0].split("~")
d = resp.read().decode("gbk").split('="')[1].split('"')[0].split("~") if len(d) > 32:
if len(d) > 32: return {"name": d[1], "price": float(d[3]) if d[3] else 0,
return {"name": d[1], "price": float(d[3]) if d[3] else 0, "change_pct": float(d[32]) if d[32] else 0}
"change_pct": float(d[32]) if d[32] else 0} except:
except: pass
pass return None
return None
def load_pool():
def load_pool(): pool = load_json(POOL_JSON)
pool = load_json(POOL_JSON) return pool if pool else {"last_updated": "", "total_candidates": 0, "candidates": []}
return pool if pool else {"last_updated": "", "total_candidates": 0, "candidates": []}
def save_pool(pool):
def save_pool(pool): pool["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M")
pool["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M") pool["total_candidates"] = len([c for c in pool.get("candidates", []) if not c.get("dropped")])
pool["total_candidates"] = len([c for c in pool.get("candidates", []) if not c.get("dropped")]) save_json(POOL_JSON, pool)
save_json(POOL_JSON, pool)
def add_or_update(pool, code, name, sector, score, reason, entry_range, stop_loss, target, vprice, vchg):
def add_or_update(pool, code, name, sector, score, reason, entry_range, stop_loss, target, vprice, vchg): now = datetime.now().strftime("%Y-%m-%d %H:%M")
now = datetime.now().strftime("%Y-%m-%d %H:%M") for c in pool.setdefault("candidates", []):
for c in pool.setdefault("candidates", []): if c["code"] == code and not c.get("dropped"):
if c["code"] == code and not c.get("dropped"): c.update({"last_updated": now, "num_observations": c.get("num_observations", 1) + 1,
c.update({"last_updated": now, "num_observations": c.get("num_observations", 1) + 1, "xiaoguo_score": score, "xiaoguo_reason": reason,
"xiaoguo_score": score, "xiaoguo_reason": reason, "xiaoguo_strategy": {"entry_range": entry_range, "stop_loss": stop_loss, "target": target},
"xiaoguo_strategy": {"entry_range": entry_range, "stop_loss": stop_loss, "target": target}, "verified_price": vprice, "verified_change": vchg})
"verified_price": vprice, "verified_change": vchg}) c.setdefault("score_history", []).append({"date": now, "score": score})
c.setdefault("score_history", []).append({"date": now, "score": score}) hist = c["score_history"]
hist = c["score_history"] if len(hist) >= 3:
if len(hist) >= 3: recent = [h["score"] for h in hist[-3:]]
recent = [h["score"] for h in hist[-3:]] c["trend_warning"] = all(recent[i] > recent[i+1] for i in range(2))
c["trend_warning"] = all(recent[i] > recent[i+1] for i in range(2)) return False
return False pool["candidates"].append({"code": code, "name": name, "sector": sector,
pool["candidates"].append({"code": code, "name": name, "sector": sector, "xiaoguo_score": score, "xiaoguo_reason": reason,
"xiaoguo_score": score, "xiaoguo_reason": reason, "xiaoguo_strategy": {"entry_range": entry_range, "stop_loss": stop_loss, "target": target},
"xiaoguo_strategy": {"entry_range": entry_range, "stop_loss": stop_loss, "target": target}, "verified_price": vprice, "verified_change": vchg,
"verified_price": vprice, "verified_change": vchg, "added_at": now, "last_updated": now, "num_observations": 1,
"added_at": now, "last_updated": now, "num_observations": 1, "score_history": [{"date": now, "score": score}],
"score_history": [{"date": now, "score": score}], "zhiwei_star": None, "zhiwei_reviewed": False, "zhiwei_reviewed_at": None,
"zhiwei_star": None, "zhiwei_reviewed": False, "zhiwei_reviewed_at": None, "promoted": False, "promoted_at": None, "dropped": False, "drop_reason": None,
"promoted": False, "promoted_at": None, "dropped": False, "drop_reason": None, "trend_warning": False, "trend_note": ""})
"trend_warning": False, "trend_note": ""}) return True
return True
def cleanup(pool):
def cleanup(pool): now = datetime.now()
now = datetime.now() for c in pool.get("candidates", []):
for c in pool.get("candidates", []): if c.get("dropped"): continue
if c.get("dropped"): continue hist = c.get("score_history", [])
hist = c.get("score_history", []) if len(hist) >= 3 and sum(h["score"] for h in hist[-3:]) / 3 < 5:
if len(hist) >= 3 and sum(h["score"] for h in hist[-3:]) / 3 < 5: c.update({"dropped": True, "drop_reason": "平均评分<5"}); continue
c.update({"dropped": True, "drop_reason": "平均评分<5"}); continue if len(hist) >= 3:
if len(hist) >= 3: recent = [h["score"] for h in hist[-3:]]
recent = [h["score"] for h in hist[-3:]] if all(recent[i] > recent[i+1] for i in range(2)):
if all(recent[i] > recent[i+1] for i in range(2)): if c.get("trend_warning"): c.update({"dropped": True, "drop_reason": "连续下降2轮"})
if c.get("trend_warning"): c.update({"dropped": True, "drop_reason": "连续下降2轮"}) else: c["trend_warning"] = True
else: c["trend_warning"] = True if c.get("last_updated"):
if c.get("last_updated"): try:
try: if (now - datetime.strptime(c["last_updated"], "%Y-%m-%d %H:%M")).days >= 7:
if (now - datetime.strptime(c["last_updated"], "%Y-%m-%d %H:%M")).days >= 7: c.update({"dropped": True, "drop_reason": "超7天未更新"})
c.update({"dropped": True, "drop_reason": "超7天未更新"}) except: pass
except: pass
# ── 第1步:大盘分析 ──
# ── 第1步:大盘分析 ── def step1(sectors, source):
def step1(sectors, source): normalized = []
normalized = [] for s in sectors:
for s in sectors: cp = (s.get("change", 0) / 100) if source == "eastmoney" else s.get("change", 0)
cp = (s.get("change", 0) / 100) if source == "eastmoney" else s.get("change", 0) parts = [f"{s['name']}({cp:+.2f}%)"]
parts = [f"{s['name']}({cp:+.2f}%)"] if s.get("lead_stock"): parts.append(f"领涨:{s['lead_stock']}")
if s.get("lead_stock"): parts.append(f"领涨:{s['lead_stock']}") if s.get("net_inflow"): parts.append(f"资金:{s['net_inflow']}亿")
if s.get("net_inflow"): parts.append(f"资金:{s['net_inflow']}亿") normalized.append((" | ".join(parts), cp))
normalized.append((" | ".join(parts), cp)) gainers = [x[0] for x in sorted(normalized, key=lambda x: x[1], reverse=True) if x[1] > 0][:10]
gainers = [x[0] for x in sorted(normalized, key=lambda x: x[1], reverse=True) if x[1] > 0][:10] losers = [x[0] for x in sorted(normalized, key=lambda x: x[1]) if x[1] <= 0][:5]
losers = [x[0] for x in sorted(normalized, key=lambda x: x[1]) if x[1] <= 0][:5] text = "领涨板块:\n" + "\n".join(gainers)
text = "板块:\n" + "\n".join(gainers) if losers: text += "\n板块:\n" + "\n".join(losers)
if losers: text += "\n领跌板块\n" + "\n".join(losers) prompt = f"分析以下A股板块,选出3-5个值得关注的行业(只看趋势,排除一日游)。\n\n{text}\n\n输出JSON{{\"market_verdict\":\"强势|中性|弱势\",\"hot_sectors\":[{{\"name\":\"板块\",\"reason\":\"理由\"}}],\"danger_sectors\":[{{\"name\":\"板块名\",\"reason\":\"理由\"}}]}}"
prompt = f"分析以下A股板块,选出3-5个值得关注的行业(只看趋势,排除一日游)。\n\n{text}\n\n输出JSON{{\"market_verdict\":\"强势|中性|弱势\",\"hot_sectors\":[{{\"name\":\"板块名\",\"reason\":\"理由\"}}],\"danger_sectors\":[{{\"name\":\"板块名\",\"reason\":\"理由\"}}]}}" print(f" 第1步:大盘分析", flush=True)
print(f" 第1步:大盘分析", flush=True) result = call_xiaoguo([{"role": "user", "content": prompt}])
result = call_xiaoguo([{"role": "user", "content": prompt}]) return extract_json(result) if result else None
return extract_json(result) if result else None
# ── 第2步:个股分析 ──
# ── 第2步:个股分析 ── def step2(sector, source):
def step2(sector, source): name = sector.get("name", "")
name = sector.get("name", "") cp = (sector.get("change", 0) / 100) if source == "eastmoney" else sector.get("change", 0)
cp = (sector.get("change", 0) / 100) if source == "eastmoney" else sector.get("change", 0) detail = []
detail = [] if sector.get("up_count"): detail.append(f"上涨{sector['up_count']}/{sector['up_count']+sector.get('down_count',0)}")
if sector.get("up_count"): detail.append(f"上涨{sector['up_count']}/{sector['up_count']+sector.get('down_count',0)}") if sector.get("net_inflow"): detail.append(f"资金净流入{sector['net_inflow']}亿")
if sector.get("net_inflow"): detail.append(f"资金净流入{sector['net_inflow']}亿") if sector.get("lead_stock"): detail.append(f"领涨{sector['lead_stock']}({sector.get('lead_stock_change',0):+.2f}%)")
if sector.get("lead_stock"): detail.append(f"领涨{sector['lead_stock']}({sector.get('lead_stock_change',0):+.2f}%)") d = " | ".join(detail) if detail else "无详细数据"
d = " | ".join(detail) if detail else "无详细数据" prompt = f"板块:{name}({cp:+.2f}%) | {d}\n推荐2-3只候选股,评分1-10,附入场区间/止损/目标。JSON:{{\"sector_judgment\":\"\",\"sector_reason\":\"\",\"candidates\":[{{\"code\":\"\",\"name\":\"\",\"score\":0,\"reason\":\"\",\"entry_range\":\"\",\"stop_loss\":\"\",\"target\":\"\"}}]}}"
prompt = f"板块:{name}({cp:+.2f}%) | {d}\n推荐2-3只候选股,评分1-10,附入场区间/止损/目标。JSON:{{\"sector_judgment\":\"\",\"sector_reason\":\"\",\"candidates\":[{{\"code\":\"\",\"name\":\"\",\"score\":0,\"reason\":\"\",\"entry_range\":\"\",\"stop_loss\":\"\",\"target\":\"\"}}]}}" print(f" 分析行业: {name}", flush=True)
print(f" 分析行业: {name}", flush=True) result = call_xiaoguo([{"role": "user", "content": prompt}])
result = call_xiaoguo([{"role": "user", "content": prompt}]) if not result: return None
if not result: return None parsed = extract_json(result)
parsed = extract_json(result) if not parsed: return None
if not parsed: return None valid = []
valid = [] for c in parsed.get("candidates", []):
for c in parsed.get("candidates", []): q = tencent_quote(c.get("code", ""))
q = tencent_quote(c.get("code", "")) if q and q["price"] > 0:
if q and q["price"] > 0: c["verified_price"], c["verified_change"] = q["price"], q["change_pct"]
c["verified_price"], c["verified_change"] = q["price"], q["change_pct"] valid.append(c)
valid.append(c) print(f" 候选{len(parsed.get('candidates',[]))}只, 验证通过{len(valid)}", flush=True)
print(f" 候选{len(parsed.get('candidates',[]))}只, 验证通过{len(valid)}", flush=True) return valid
return valid
def main():
def main(): # 从 DB 读大盘+板块数据(替代 market.json
market = load_json(MARKET_JSON) from mofin_db import get_conn
if not market or not market.get("sectors"): conn = get_conn()
print("market.json 无数据", flush=True); return market = {"sectors": [], "source": "db"}
sectors, source = market["sectors"], market.get("source", "ths") try:
print(f"板块: {len(sectors)}, 来源: {source}", flush=True) # latest snapshot
pool = load_pool() sr = conn.execute("SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 1").fetchone()
existing = {c["code"] for c in pool.get("candidates", []) if not c.get("dropped")} if sr:
cols = [d[0] for d in conn.execute("SELECT * FROM market_snapshots LIMIT 0").description]
# 第1步 snap = dict(zip(cols, sr))
view = step1(sectors, source) market["up_ratio"] = snap.get("up_ratio", 0)
if not view: market["mood"] = snap.get("mood", "neutral")
print("第1步失败", flush=True); return market["source"] = snap.get("source", "db")
market["market_verdict"] = view.get("market_verdict", "中性")
market["verdict_reason"] = view.get("verdict_reason", "") # sector data
market["hot_sectors"] = view.get("hot_sectors", []) sectors = conn.execute("""
market["danger_sectors"] = view.get("danger_sectors", []) SELECT s.name as name, s.change_pct as change_pct, s.lead_stock as lead_stock,
market["xiaoguo_scan_timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M") s.up_count, s.down_count, s.net_inflow
market.setdefault("insights", []).append(f"[小果全市场] {view.get('market_verdict','中性')}") FROM sector_snapshots s
hot_names = [s["name"] for s in view.get("hot_sectors", [])] JOIN market_snapshots ms ON s.snapshot_id = ms.id
print(f"热门行业: {hot_names}", flush=True) WHERE ms.id = (SELECT MAX(id) FROM market_snapshots)
if not hot_names: ORDER BY s.change_pct DESC
save_pool(pool); save_json(MARKET_JSON, market); return """).fetchall()
if sectors:
# 第2步 cols = [d[0] for d in conn.execute("SELECT name, change_pct, lead_stock, up_count, down_count, net_inflow FROM sector_snapshots LIMIT 0").description]
hot_data = [s for s in sectors if s.get("name", "") in hot_names][:MAX_SECTORS] market["sectors"] = [dict(zip(cols, r)) for r in sectors]
new_count = 0 except Exception as e:
for s in hot_data: print(f"DB read error: {e}", flush=True)
cands = step2(s, source) conn.close()
if not cands: continue
for c in cands: if not market.get("sectors"):
if add_or_update(pool, c["code"], c.get("name",""), s["name"], print("market 无数据", flush=True); return
c.get("score",5), c.get("reason",""), sectors = market["sectors"]
c.get("entry_range",""), c.get("stop_loss",""), c.get("target",""), source = market.get("source", "db")
c.get("verified_price",0), c.get("verified_change",0)): print(f"板块: {len(sectors)}, 来源: {source}", flush=True)
new_count += 1 pool = load_pool()
print(f" + {c.get('name','')}({c['code']}) {c.get('score',0)}", flush=True) existing = {c["code"] for c in pool.get("candidates", []) if not c.get("dropped")}
cleanup(pool) # 第1步
save_pool(pool) view = step1(sectors, source)
save_json(MARKET_JSON, market) if not view:
print(f"完成: 新增{new_count}, 池内{pool['total_candidates']}活跃", flush=True) print("第1步失败", flush=True); return
market["market_verdict"] = view.get("market_verdict", "中性")
if __name__ == "__main__": market["verdict_reason"] = view.get("verdict_reason", "")
main() market["hot_sectors"] = view.get("hot_sectors", [])
market["danger_sectors"] = view.get("danger_sectors", [])
market["xiaoguo_scan_timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M")
market.setdefault("insights", []).append(f"[小果全市场] {view.get('market_verdict','中性')}")
hot_names = [s["name"] for s in view.get("hot_sectors", [])]
print(f"热门行业: {hot_names}", flush=True)
if not hot_names:
save_pool(pool)
return
# 第2步
hot_data = [s for s in sectors if s.get("name", "") in hot_names][:MAX_SECTORS]
new_count = 0
for s in hot_data:
cands = step2(s, source)
if not cands: continue
for c in cands:
if add_or_update(pool, c["code"], c.get("name",""), s["name"],
c.get("score",5), c.get("reason",""),
c.get("entry_range",""), c.get("stop_loss",""), c.get("target",""),
c.get("verified_price",0), c.get("verified_change",0)):
new_count += 1
print(f" + {c.get('name','')}({c['code']}) {c.get('score',0)}", flush=True)
cleanup(pool)
save_pool(pool)
print(f"完成: 新增{new_count}, 池内{pool['total_candidates']}活跃", flush=True)
if __name__ == "__main__":
main()