Files
MoFin/market_screener.py
T
知微 (MoFin) aa0f740381 MoFin 初始提交
完整数据采集+分析管道:
- market_watch.py:90行业板块采集(同花顺/东方财富)
- 市场精选推荐 cron:全市场分析+候选池+星级推荐
- price_monitor.py:持仓/自选高频价格监控
- refresh_mtf_cache.py:多周期K线缓存
- 策略评估/知识萃取管道

文档:docs/ 含完整需求+架构设计
注意:尚未配置 git remote,笑笑接手后自行配置
2026-06-20 12:04:21 +08:00

249 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""market_screener.py — 小果本地LLM全市场筛选(分步少吃多餐版)"""
import json, os, re, time, urllib.request, urllib.error
from datetime import datetime
from pathlib import Path
WEB_DASHBOARD_DIR = Path(__file__).resolve().parent.parent.parent / "web-dashboard" if "hermes" in str(Path(__file__).resolve()) else Path(__file__).parent
DATA_DIR = WEB_DASHBOARD_DIR / "data"
MARKET_JSON = DATA_DIR / "market.json"
POOL_JSON = DATA_DIR / "candidate_pool.json"
XIAOGUO_API = "http://192.168.1.122:18003/v1/chat/completions"
XIAOGUO_MODEL = "Qwen3.6-27B-OptiQ-4bit"
API_TIMEOUT = 60
MAX_SECTORS = 5
MAX_CANDIDATES_POOL = 60
TENCENT_URL = "http://qt.gtimg.cn/q="
def load_json(path):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def save_json(path, data):
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def call_xiaoguo(messages, timeout=API_TIMEOUT):
"""调用主模型(走Hermes gateway),小果本地LLM太缩水扛不住全市场分析"""
payload = json.dumps({"model": "hermes-agent", "messages": messages,
"temperature": 0.1, "max_tokens": 2048}).encode()
req = urllib.request.Request("http://localhost:8643/v1/chat/completions", data=payload,
headers={"Content-Type": "application/json",
"Authorization": "Bearer hermes123"}, method="POST")
try:
resp = urllib.request.build_opener(urllib.request.ProxyHandler({})).open(req, timeout=timeout)
return json.loads(resp.read())["choices"][0]["message"]["content"]
except Exception as e:
print(f"API失败: {e}", flush=True)
return None
def extract_json(text):
"""从回复中提取第一个完整JSON,跳过思考过程"""
# 优先找代码块
m = re.search(r"```(?:json)?\s*(\{[\s\S]*?\})\s*```", text)
if m:
try:
return json.loads(m.group(1))
except json.JSONDecodeError:
pass
# 跳过思考过程行(Here's a thinking process / 数字列表项)
clean_lines = []
for line in text.split("\n"):
s = line.strip()
if re.match(r"^\d+\.\s+\*\*", s) or "Here's a thinking" in s or "**Analyze" in s:
continue
if s.startswith("```"):
clean_lines.append(s)
continue
clean_lines.append(line)
clean = "\n".join(clean_lines)
# 找第一个完整JSON
start = clean.find("{")
if start < 0:
return None
for mode in ["直接", "跳过头部"]:
for pos in [start, clean.find("\n{", start), clean.find("{", start + 1)]:
if pos < 0:
continue
depth = 0
for i in range(pos, len(clean)):
if clean[i] == "{": depth += 1
elif clean[i] == "}":
depth -= 1
if depth == 0:
try:
return json.loads(clean[pos:i+1])
except json.JSONDecodeError:
break
return None
def tencent_quote(code):
try:
opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
tc = f"sh{code}" if code.startswith("6") or code.startswith("5") else f"sz{code}"
resp = opener.open(TENCENT_URL + tc, timeout=5)
d = resp.read().decode("gbk").split('="')[1].split('"')[0].split("~")
if len(d) > 32:
return {"name": d[1], "price": float(d[3]) if d[3] else 0,
"change_pct": float(d[32]) if d[32] else 0}
except:
pass
return None
def load_pool():
pool = load_json(POOL_JSON)
return pool if pool else {"last_updated": "", "total_candidates": 0, "candidates": []}
def save_pool(pool):
pool["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M")
pool["total_candidates"] = len([c for c in pool.get("candidates", []) if not c.get("dropped")])
save_json(POOL_JSON, pool)
def add_or_update(pool, code, name, sector, score, reason, entry_range, stop_loss, target, vprice, vchg):
now = datetime.now().strftime("%Y-%m-%d %H:%M")
for c in pool.setdefault("candidates", []):
if c["code"] == code and not c.get("dropped"):
c.update({"last_updated": now, "num_observations": c.get("num_observations", 1) + 1,
"xiaoguo_score": score, "xiaoguo_reason": reason,
"xiaoguo_strategy": {"entry_range": entry_range, "stop_loss": stop_loss, "target": target},
"verified_price": vprice, "verified_change": vchg})
c.setdefault("score_history", []).append({"date": now, "score": score})
hist = c["score_history"]
if len(hist) >= 3:
recent = [h["score"] for h in hist[-3:]]
c["trend_warning"] = all(recent[i] > recent[i+1] for i in range(2))
return False
pool["candidates"].append({"code": code, "name": name, "sector": sector,
"xiaoguo_score": score, "xiaoguo_reason": reason,
"xiaoguo_strategy": {"entry_range": entry_range, "stop_loss": stop_loss, "target": target},
"verified_price": vprice, "verified_change": vchg,
"added_at": now, "last_updated": now, "num_observations": 1,
"score_history": [{"date": now, "score": score}],
"zhiwei_star": None, "zhiwei_reviewed": False, "zhiwei_reviewed_at": None,
"promoted": False, "promoted_at": None, "dropped": False, "drop_reason": None,
"trend_warning": False, "trend_note": ""})
return True
def cleanup(pool):
now = datetime.now()
for c in pool.get("candidates", []):
if c.get("dropped"): continue
hist = c.get("score_history", [])
if len(hist) >= 3 and sum(h["score"] for h in hist[-3:]) / 3 < 5:
c.update({"dropped": True, "drop_reason": "平均评分<5"}); continue
if len(hist) >= 3:
recent = [h["score"] for h in hist[-3:]]
if all(recent[i] > recent[i+1] for i in range(2)):
if c.get("trend_warning"): c.update({"dropped": True, "drop_reason": "连续下降2轮"})
else: c["trend_warning"] = True
if c.get("last_updated"):
try:
if (now - datetime.strptime(c["last_updated"], "%Y-%m-%d %H:%M")).days >= 7:
c.update({"dropped": True, "drop_reason": "超7天未更新"})
except: pass
# ── 第1步:大盘分析 ──
def step1(sectors, source):
normalized = []
for s in sectors:
cp = (s.get("change", 0) / 100) if source == "eastmoney" else s.get("change", 0)
parts = [f"{s['name']}({cp:+.2f}%)"]
if s.get("lead_stock"): parts.append(f"领涨:{s['lead_stock']}")
if s.get("net_inflow"): parts.append(f"资金:{s['net_inflow']}亿")
normalized.append((" | ".join(parts), cp))
gainers = [x[0] for x in sorted(normalized, key=lambda x: x[1], reverse=True) if x[1] > 0][:10]
losers = [x[0] for x in sorted(normalized, key=lambda x: x[1]) if x[1] <= 0][:5]
text = "领涨板块:\n" + "\n".join(gainers)
if losers: text += "\n领跌板块:\n" + "\n".join(losers)
prompt = f"分析以下A股板块,选出3-5个值得关注的行业(只看趋势,排除一日游)。\n\n{text}\n\n输出JSON{{\"market_verdict\":\"强势|中性|弱势\",\"hot_sectors\":[{{\"name\":\"板块名\",\"reason\":\"理由\"}}],\"danger_sectors\":[{{\"name\":\"板块名\",\"reason\":\"理由\"}}]}}"
print(f" 第1步:大盘分析", flush=True)
result = call_xiaoguo([{"role": "user", "content": prompt}])
return extract_json(result) if result else None
# ── 第2步:个股分析 ──
def step2(sector, source):
name = sector.get("name", "")
cp = (sector.get("change", 0) / 100) if source == "eastmoney" else sector.get("change", 0)
detail = []
if sector.get("up_count"): detail.append(f"上涨{sector['up_count']}/{sector['up_count']+sector.get('down_count',0)}")
if sector.get("net_inflow"): detail.append(f"资金净流入{sector['net_inflow']}亿")
if sector.get("lead_stock"): detail.append(f"领涨{sector['lead_stock']}({sector.get('lead_stock_change',0):+.2f}%)")
d = " | ".join(detail) if detail else "无详细数据"
prompt = f"板块:{name}({cp:+.2f}%) | {d}\n推荐2-3只候选股,评分1-10,附入场区间/止损/目标。JSON:{{\"sector_judgment\":\"\",\"sector_reason\":\"\",\"candidates\":[{{\"code\":\"\",\"name\":\"\",\"score\":0,\"reason\":\"\",\"entry_range\":\"\",\"stop_loss\":\"\",\"target\":\"\"}}]}}"
print(f" 分析行业: {name}", flush=True)
result = call_xiaoguo([{"role": "user", "content": prompt}])
if not result: return None
parsed = extract_json(result)
if not parsed: return None
valid = []
for c in parsed.get("candidates", []):
q = tencent_quote(c.get("code", ""))
if q and q["price"] > 0:
c["verified_price"], c["verified_change"] = q["price"], q["change_pct"]
valid.append(c)
print(f" 候选{len(parsed.get('candidates',[]))}只, 验证通过{len(valid)}", flush=True)
return valid
def main():
market = load_json(MARKET_JSON)
if not market or not market.get("sectors"):
print("market.json 无数据", flush=True); return
sectors, source = market["sectors"], market.get("source", "ths")
print(f"板块: {len(sectors)}, 来源: {source}", flush=True)
pool = load_pool()
existing = {c["code"] for c in pool.get("candidates", []) if not c.get("dropped")}
# 第1步
view = step1(sectors, source)
if not view:
print("第1步失败", flush=True); return
market["market_verdict"] = view.get("market_verdict", "中性")
market["verdict_reason"] = view.get("verdict_reason", "")
market["hot_sectors"] = view.get("hot_sectors", [])
market["danger_sectors"] = view.get("danger_sectors", [])
market["xiaoguo_scan_timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M")
market.setdefault("insights", []).append(f"[小果全市场] {view.get('market_verdict','中性')}")
hot_names = [s["name"] for s in view.get("hot_sectors", [])]
print(f"热门行业: {hot_names}", flush=True)
if not hot_names:
save_pool(pool); save_json(MARKET_JSON, market); return
# 第2步
hot_data = [s for s in sectors if s.get("name", "") in hot_names][:MAX_SECTORS]
new_count = 0
for s in hot_data:
cands = step2(s, source)
if not cands: continue
for c in cands:
if add_or_update(pool, c["code"], c.get("name",""), s["name"],
c.get("score",5), c.get("reason",""),
c.get("entry_range",""), c.get("stop_loss",""), c.get("target",""),
c.get("verified_price",0), c.get("verified_change",0)):
new_count += 1
print(f" + {c.get('name','')}({c['code']}) {c.get('score',0)}", flush=True)
cleanup(pool)
save_pool(pool)
save_json(MARKET_JSON, market)
print(f"完成: 新增{new_count}, 池内{pool['total_candidates']}活跃", flush=True)
if __name__ == "__main__":
main()