diff --git a/market_screener.py b/market_screener.py index 7391a2a..2319284 100644 --- a/market_screener.py +++ b/market_screener.py @@ -1,254 +1,283 @@ -#!/usr/bin/env python3 -"""market_screener.py — 小果本地LLM全市场筛选(分步少吃多餐版)""" - -import json, os, re, time, urllib.request, urllib.error -from datetime import datetime -from pathlib import Path - -WEB_DASHBOARD_DIR = Path(__file__).resolve().parent.parent.parent / "web-dashboard" if "hermes" in str(Path(__file__).resolve()) else Path(__file__).parent -DATA_DIR = WEB_DASHBOARD_DIR / "data" -MARKET_JSON = DATA_DIR / "market.json" -POOL_JSON = DATA_DIR / "candidate_pool.json" -XIAOGUO_MODEL = "Qwen3.6-27B-OptiQ-4bit" -API_TIMEOUT = 60 -MAX_SECTORS = 5 -MAX_CANDIDATES_POOL = 60 -TENCENT_URL = "http://qt.gtimg.cn/q=" - -def _get_xiaoguo_url(): - try: - from mo_config import get_config - return get_config().xiaoguo_api_url - except Exception: - return "http://node122:18003/v1/chat/completions" # legacy fallback - - -def load_json(path): - try: - with open(path, "r", encoding="utf-8") as f: - return json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - return None - - -def save_json(path, data): - path.parent.mkdir(parents=True, exist_ok=True) - with open(path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=2) - - -def call_xiaoguo(messages, timeout=API_TIMEOUT): - """调小果本地LLM(通过node122直连,/etc/hosts自动走LAN或EasyTier)""" - payload = json.dumps({"model": XIAOGUO_MODEL, "messages": messages, - "temperature": 0.1, "max_tokens": 2048}).encode() - req = urllib.request.Request(_get_xiaoguo_url(), data=payload, - headers={"Content-Type": "application/json", - "Authorization": "Bearer hermes123"}, method="POST") - try: - resp = urllib.request.build_opener(urllib.request.ProxyHandler({})).open(req, timeout=timeout) - return json.loads(resp.read())["choices"][0]["message"]["content"] - except Exception as e: - print(f"API失败: {e}", flush=True) - return None - - -def extract_json(text): - """从回复中提取第一个完整JSON,跳过思考过程""" - # 优先找代码块 - m = re.search(r"```(?:json)?\s*(\{[\s\S]*?\})\s*```", text) - if m: - try: - return json.loads(m.group(1)) - except json.JSONDecodeError: - pass - # 跳过思考过程行(Here's a thinking process / 数字列表项) - clean_lines = [] - for line in text.split("\n"): - s = line.strip() - if re.match(r"^\d+\.\s+\*\*", s) or "Here's a thinking" in s or "**Analyze" in s: - continue - if s.startswith("```"): - clean_lines.append(s) - continue - clean_lines.append(line) - clean = "\n".join(clean_lines) - # 找第一个完整JSON - start = clean.find("{") - if start < 0: - return None - for mode in ["直接", "跳过头部"]: - for pos in [start, clean.find("\n{", start), clean.find("{", start + 1)]: - if pos < 0: - continue - depth = 0 - for i in range(pos, len(clean)): - if clean[i] == "{": depth += 1 - elif clean[i] == "}": - depth -= 1 - if depth == 0: - try: - return json.loads(clean[pos:i+1]) - except json.JSONDecodeError: - break - return None - - -def tencent_quote(code): - try: - opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) - tc = f"sh{code}" if code.startswith("6") or code.startswith("5") else f"sz{code}" - resp = opener.open(TENCENT_URL + tc, timeout=5) - d = resp.read().decode("gbk").split('="')[1].split('"')[0].split("~") - if len(d) > 32: - return {"name": d[1], "price": float(d[3]) if d[3] else 0, - "change_pct": float(d[32]) if d[32] else 0} - except: - pass - return None - - -def load_pool(): - pool = load_json(POOL_JSON) - return pool if pool else {"last_updated": "", "total_candidates": 0, "candidates": []} - - -def save_pool(pool): - pool["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M") - pool["total_candidates"] = len([c for c in pool.get("candidates", []) if not c.get("dropped")]) - save_json(POOL_JSON, pool) - - -def add_or_update(pool, code, name, sector, score, reason, entry_range, stop_loss, target, vprice, vchg): - now = datetime.now().strftime("%Y-%m-%d %H:%M") - for c in pool.setdefault("candidates", []): - if c["code"] == code and not c.get("dropped"): - c.update({"last_updated": now, "num_observations": c.get("num_observations", 1) + 1, - "xiaoguo_score": score, "xiaoguo_reason": reason, - "xiaoguo_strategy": {"entry_range": entry_range, "stop_loss": stop_loss, "target": target}, - "verified_price": vprice, "verified_change": vchg}) - c.setdefault("score_history", []).append({"date": now, "score": score}) - hist = c["score_history"] - if len(hist) >= 3: - recent = [h["score"] for h in hist[-3:]] - c["trend_warning"] = all(recent[i] > recent[i+1] for i in range(2)) - return False - pool["candidates"].append({"code": code, "name": name, "sector": sector, - "xiaoguo_score": score, "xiaoguo_reason": reason, - "xiaoguo_strategy": {"entry_range": entry_range, "stop_loss": stop_loss, "target": target}, - "verified_price": vprice, "verified_change": vchg, - "added_at": now, "last_updated": now, "num_observations": 1, - "score_history": [{"date": now, "score": score}], - "zhiwei_star": None, "zhiwei_reviewed": False, "zhiwei_reviewed_at": None, - "promoted": False, "promoted_at": None, "dropped": False, "drop_reason": None, - "trend_warning": False, "trend_note": ""}) - return True - - -def cleanup(pool): - now = datetime.now() - for c in pool.get("candidates", []): - if c.get("dropped"): continue - hist = c.get("score_history", []) - if len(hist) >= 3 and sum(h["score"] for h in hist[-3:]) / 3 < 5: - c.update({"dropped": True, "drop_reason": "平均评分<5"}); continue - if len(hist) >= 3: - recent = [h["score"] for h in hist[-3:]] - if all(recent[i] > recent[i+1] for i in range(2)): - if c.get("trend_warning"): c.update({"dropped": True, "drop_reason": "连续下降2轮"}) - else: c["trend_warning"] = True - if c.get("last_updated"): - try: - if (now - datetime.strptime(c["last_updated"], "%Y-%m-%d %H:%M")).days >= 7: - c.update({"dropped": True, "drop_reason": "超7天未更新"}) - except: pass - - -# ── 第1步:大盘分析 ── -def step1(sectors, source): - normalized = [] - for s in sectors: - cp = (s.get("change", 0) / 100) if source == "eastmoney" else s.get("change", 0) - parts = [f"{s['name']}({cp:+.2f}%)"] - if s.get("lead_stock"): parts.append(f"领涨:{s['lead_stock']}") - if s.get("net_inflow"): parts.append(f"资金:{s['net_inflow']}亿") - normalized.append((" | ".join(parts), cp)) - gainers = [x[0] for x in sorted(normalized, key=lambda x: x[1], reverse=True) if x[1] > 0][:10] - losers = [x[0] for x in sorted(normalized, key=lambda x: x[1]) if x[1] <= 0][:5] - text = "领涨板块:\n" + "\n".join(gainers) - if losers: text += "\n领跌板块:\n" + "\n".join(losers) - prompt = f"分析以下A股板块,选出3-5个值得关注的行业(只看趋势,排除一日游)。\n\n{text}\n\n输出JSON:{{\"market_verdict\":\"强势|中性|弱势\",\"hot_sectors\":[{{\"name\":\"板块名\",\"reason\":\"理由\"}}],\"danger_sectors\":[{{\"name\":\"板块名\",\"reason\":\"理由\"}}]}}" - print(f" 第1步:大盘分析", flush=True) - result = call_xiaoguo([{"role": "user", "content": prompt}]) - return extract_json(result) if result else None - - -# ── 第2步:个股分析 ── -def step2(sector, source): - name = sector.get("name", "") - cp = (sector.get("change", 0) / 100) if source == "eastmoney" else sector.get("change", 0) - detail = [] - if sector.get("up_count"): detail.append(f"上涨{sector['up_count']}/{sector['up_count']+sector.get('down_count',0)}家") - if sector.get("net_inflow"): detail.append(f"资金净流入{sector['net_inflow']}亿") - if sector.get("lead_stock"): detail.append(f"领涨{sector['lead_stock']}({sector.get('lead_stock_change',0):+.2f}%)") - d = " | ".join(detail) if detail else "无详细数据" - prompt = f"板块:{name}({cp:+.2f}%) | {d}\n推荐2-3只候选股,评分1-10,附入场区间/止损/目标。JSON:{{\"sector_judgment\":\"\",\"sector_reason\":\"\",\"candidates\":[{{\"code\":\"\",\"name\":\"\",\"score\":0,\"reason\":\"\",\"entry_range\":\"\",\"stop_loss\":\"\",\"target\":\"\"}}]}}" - print(f" 分析行业: {name}", flush=True) - result = call_xiaoguo([{"role": "user", "content": prompt}]) - if not result: return None - parsed = extract_json(result) - if not parsed: return None - valid = [] - for c in parsed.get("candidates", []): - q = tencent_quote(c.get("code", "")) - if q and q["price"] > 0: - c["verified_price"], c["verified_change"] = q["price"], q["change_pct"] - valid.append(c) - print(f" 候选{len(parsed.get('candidates',[]))}只, 验证通过{len(valid)}只", flush=True) - return valid - - -def main(): - market = load_json(MARKET_JSON) - if not market or not market.get("sectors"): - print("market.json 无数据", flush=True); return - sectors, source = market["sectors"], market.get("source", "ths") - print(f"板块: {len(sectors)}, 来源: {source}", flush=True) - pool = load_pool() - existing = {c["code"] for c in pool.get("candidates", []) if not c.get("dropped")} - - # 第1步 - view = step1(sectors, source) - if not view: - print("第1步失败", flush=True); return - market["market_verdict"] = view.get("market_verdict", "中性") - market["verdict_reason"] = view.get("verdict_reason", "") - market["hot_sectors"] = view.get("hot_sectors", []) - market["danger_sectors"] = view.get("danger_sectors", []) - market["xiaoguo_scan_timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M") - market.setdefault("insights", []).append(f"[小果全市场] {view.get('market_verdict','中性')}") - hot_names = [s["name"] for s in view.get("hot_sectors", [])] - print(f"热门行业: {hot_names}", flush=True) - if not hot_names: - save_pool(pool); save_json(MARKET_JSON, market); return - - # 第2步 - hot_data = [s for s in sectors if s.get("name", "") in hot_names][:MAX_SECTORS] - new_count = 0 - for s in hot_data: - cands = step2(s, source) - if not cands: continue - for c in cands: - if add_or_update(pool, c["code"], c.get("name",""), s["name"], - c.get("score",5), c.get("reason",""), - c.get("entry_range",""), c.get("stop_loss",""), c.get("target",""), - c.get("verified_price",0), c.get("verified_change",0)): - new_count += 1 - print(f" + {c.get('name','')}({c['code']}) {c.get('score',0)}分", flush=True) - - cleanup(pool) - save_pool(pool) - save_json(MARKET_JSON, market) - print(f"完成: 新增{new_count}, 池内{pool['total_candidates']}活跃", flush=True) - -if __name__ == "__main__": - main() +#!/usr/bin/env python3 +"""market_screener.py — 小果本地LLM全市场筛选(分步少吃多餐版)""" + +import json, os, re, time, urllib.request, urllib.error +from datetime import datetime +from pathlib import Path + +WEB_DASHBOARD_DIR = Path(__file__).resolve().parent.parent.parent / "web-dashboard" if "hermes" in str(Path(__file__).resolve()) else Path(__file__).parent +DATA_DIR = WEB_DASHBOARD_DIR / "data" +POOL_JSON = DATA_DIR / "candidate_pool.json" # 筛选缓存(临时) +XIAOGUO_MODEL = "Qwen3.6-27B-OptiQ-4bit" +API_TIMEOUT = 60 +MAX_SECTORS = 5 +MAX_CANDIDATES_POOL = 60 +TENCENT_URL = "http://qt.gtimg.cn/q=" + +def _get_xiaoguo_url(): + try: + from mo_config import get_config + return get_config().xiaoguo_api_url + except Exception: + return "http://node122:18003/v1/chat/completions" # legacy fallback + + +def load_json(path): + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + return None + + +def save_json(path, data): + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + +def call_xiaoguo(messages, timeout=API_TIMEOUT): + """调小果本地LLM(通过node122直连,/etc/hosts自动走LAN或EasyTier)""" + payload = json.dumps({"model": XIAOGUO_MODEL, "messages": messages, + "temperature": 0.1, "max_tokens": 2048}).encode() + req = urllib.request.Request(_get_xiaoguo_url(), data=payload, + headers={"Content-Type": "application/json", + "Authorization": "Bearer hermes123"}, method="POST") + try: + resp = urllib.request.build_opener(urllib.request.ProxyHandler({})).open(req, timeout=timeout) + return json.loads(resp.read())["choices"][0]["message"]["content"] + except Exception as e: + print(f"API失败: {e}", flush=True) + return None + + +def extract_json(text): + """从回复中提取第一个完整JSON,跳过思考过程""" + # 优先找代码块 + m = re.search(r"```(?:json)?\s*(\{[\s\S]*?\})\s*```", text) + if m: + try: + return json.loads(m.group(1)) + except json.JSONDecodeError: + pass + # 跳过思考过程行(Here's a thinking process / 数字列表项) + clean_lines = [] + for line in text.split("\n"): + s = line.strip() + if re.match(r"^\d+\.\s+\*\*", s) or "Here's a thinking" in s or "**Analyze" in s: + continue + if s.startswith("```"): + clean_lines.append(s) + continue + clean_lines.append(line) + clean = "\n".join(clean_lines) + # 找第一个完整JSON + start = clean.find("{") + if start < 0: + return None + for mode in ["直接", "跳过头部"]: + for pos in [start, clean.find("\n{", start), clean.find("{", start + 1)]: + if pos < 0: + continue + depth = 0 + for i in range(pos, len(clean)): + if clean[i] == "{": depth += 1 + elif clean[i] == "}": + depth -= 1 + if depth == 0: + try: + return json.loads(clean[pos:i+1]) + except json.JSONDecodeError: + break + return None + + +def tencent_quote(code): + try: + opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) + tc = f"sh{code}" if code.startswith("6") or code.startswith("5") else f"sz{code}" + resp = opener.open(TENCENT_URL + tc, timeout=5) + d = resp.read().decode("gbk").split('="')[1].split('"')[0].split("~") + if len(d) > 32: + return {"name": d[1], "price": float(d[3]) if d[3] else 0, + "change_pct": float(d[32]) if d[32] else 0} + except: + pass + return None + + +def load_pool(): + pool = load_json(POOL_JSON) + return pool if pool else {"last_updated": "", "total_candidates": 0, "candidates": []} + + +def save_pool(pool): + pool["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M") + pool["total_candidates"] = len([c for c in pool.get("candidates", []) if not c.get("dropped")]) + save_json(POOL_JSON, pool) + + +def add_or_update(pool, code, name, sector, score, reason, entry_range, stop_loss, target, vprice, vchg): + now = datetime.now().strftime("%Y-%m-%d %H:%M") + for c in pool.setdefault("candidates", []): + if c["code"] == code and not c.get("dropped"): + c.update({"last_updated": now, "num_observations": c.get("num_observations", 1) + 1, + "xiaoguo_score": score, "xiaoguo_reason": reason, + "xiaoguo_strategy": {"entry_range": entry_range, "stop_loss": stop_loss, "target": target}, + "verified_price": vprice, "verified_change": vchg}) + c.setdefault("score_history", []).append({"date": now, "score": score}) + hist = c["score_history"] + if len(hist) >= 3: + recent = [h["score"] for h in hist[-3:]] + c["trend_warning"] = all(recent[i] > recent[i+1] for i in range(2)) + return False + pool["candidates"].append({"code": code, "name": name, "sector": sector, + "xiaoguo_score": score, "xiaoguo_reason": reason, + "xiaoguo_strategy": {"entry_range": entry_range, "stop_loss": stop_loss, "target": target}, + "verified_price": vprice, "verified_change": vchg, + "added_at": now, "last_updated": now, "num_observations": 1, + "score_history": [{"date": now, "score": score}], + "zhiwei_star": None, "zhiwei_reviewed": False, "zhiwei_reviewed_at": None, + "promoted": False, "promoted_at": None, "dropped": False, "drop_reason": None, + "trend_warning": False, "trend_note": ""}) + return True + + +def cleanup(pool): + now = datetime.now() + for c in pool.get("candidates", []): + if c.get("dropped"): continue + hist = c.get("score_history", []) + if len(hist) >= 3 and sum(h["score"] for h in hist[-3:]) / 3 < 5: + c.update({"dropped": True, "drop_reason": "平均评分<5"}); continue + if len(hist) >= 3: + recent = [h["score"] for h in hist[-3:]] + if all(recent[i] > recent[i+1] for i in range(2)): + if c.get("trend_warning"): c.update({"dropped": True, "drop_reason": "连续下降2轮"}) + else: c["trend_warning"] = True + if c.get("last_updated"): + try: + if (now - datetime.strptime(c["last_updated"], "%Y-%m-%d %H:%M")).days >= 7: + c.update({"dropped": True, "drop_reason": "超7天未更新"}) + except: pass + + +# ── 第1步:大盘分析 ── +def step1(sectors, source): + normalized = [] + for s in sectors: + cp = (s.get("change", 0) / 100) if source == "eastmoney" else s.get("change", 0) + parts = [f"{s['name']}({cp:+.2f}%)"] + if s.get("lead_stock"): parts.append(f"领涨:{s['lead_stock']}") + if s.get("net_inflow"): parts.append(f"资金:{s['net_inflow']}亿") + normalized.append((" | ".join(parts), cp)) + gainers = [x[0] for x in sorted(normalized, key=lambda x: x[1], reverse=True) if x[1] > 0][:10] + losers = [x[0] for x in sorted(normalized, key=lambda x: x[1]) if x[1] <= 0][:5] + text = "领涨板块:\n" + "\n".join(gainers) + if losers: text += "\n领跌板块:\n" + "\n".join(losers) + prompt = f"分析以下A股板块,选出3-5个值得关注的行业(只看趋势,排除一日游)。\n\n{text}\n\n输出JSON:{{\"market_verdict\":\"强势|中性|弱势\",\"hot_sectors\":[{{\"name\":\"板块名\",\"reason\":\"理由\"}}],\"danger_sectors\":[{{\"name\":\"板块名\",\"reason\":\"理由\"}}]}}" + print(f" 第1步:大盘分析", flush=True) + result = call_xiaoguo([{"role": "user", "content": prompt}]) + return extract_json(result) if result else None + + +# ── 第2步:个股分析 ── +def step2(sector, source): + name = sector.get("name", "") + cp = (sector.get("change", 0) / 100) if source == "eastmoney" else sector.get("change", 0) + detail = [] + if sector.get("up_count"): detail.append(f"上涨{sector['up_count']}/{sector['up_count']+sector.get('down_count',0)}家") + if sector.get("net_inflow"): detail.append(f"资金净流入{sector['net_inflow']}亿") + if sector.get("lead_stock"): detail.append(f"领涨{sector['lead_stock']}({sector.get('lead_stock_change',0):+.2f}%)") + d = " | ".join(detail) if detail else "无详细数据" + prompt = f"板块:{name}({cp:+.2f}%) | {d}\n推荐2-3只候选股,评分1-10,附入场区间/止损/目标。JSON:{{\"sector_judgment\":\"\",\"sector_reason\":\"\",\"candidates\":[{{\"code\":\"\",\"name\":\"\",\"score\":0,\"reason\":\"\",\"entry_range\":\"\",\"stop_loss\":\"\",\"target\":\"\"}}]}}" + print(f" 分析行业: {name}", flush=True) + result = call_xiaoguo([{"role": "user", "content": prompt}]) + if not result: return None + parsed = extract_json(result) + if not parsed: return None + valid = [] + for c in parsed.get("candidates", []): + q = tencent_quote(c.get("code", "")) + if q and q["price"] > 0: + c["verified_price"], c["verified_change"] = q["price"], q["change_pct"] + valid.append(c) + print(f" 候选{len(parsed.get('candidates',[]))}只, 验证通过{len(valid)}只", flush=True) + return valid + + +def main(): + # 从 DB 读大盘+板块数据(替代 market.json) + from mofin_db import get_conn + conn = get_conn() + market = {"sectors": [], "source": "db"} + try: + # latest snapshot + sr = conn.execute("SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 1").fetchone() + if sr: + cols = [d[0] for d in conn.execute("SELECT * FROM market_snapshots LIMIT 0").description] + snap = dict(zip(cols, sr)) + market["up_ratio"] = snap.get("up_ratio", 0) + market["mood"] = snap.get("mood", "neutral") + market["source"] = snap.get("source", "db") + + # sector data + sectors = conn.execute(""" + SELECT s.name as name, s.change_pct as change_pct, s.lead_stock as lead_stock, + s.up_count, s.down_count, s.net_inflow + FROM sector_snapshots s + JOIN market_snapshots ms ON s.snapshot_id = ms.id + WHERE ms.id = (SELECT MAX(id) FROM market_snapshots) + ORDER BY s.change_pct DESC + """).fetchall() + if sectors: + cols = [d[0] for d in conn.execute("SELECT name, change_pct, lead_stock, up_count, down_count, net_inflow FROM sector_snapshots LIMIT 0").description] + market["sectors"] = [dict(zip(cols, r)) for r in sectors] + except Exception as e: + print(f"DB read error: {e}", flush=True) + conn.close() + + if not market.get("sectors"): + print("market 无数据", flush=True); return + sectors = market["sectors"] + source = market.get("source", "db") + print(f"板块: {len(sectors)}, 来源: {source}", flush=True) + pool = load_pool() + existing = {c["code"] for c in pool.get("candidates", []) if not c.get("dropped")} + + # 第1步 + view = step1(sectors, source) + if not view: + print("第1步失败", flush=True); return + market["market_verdict"] = view.get("market_verdict", "中性") + market["verdict_reason"] = view.get("verdict_reason", "") + market["hot_sectors"] = view.get("hot_sectors", []) + market["danger_sectors"] = view.get("danger_sectors", []) + market["xiaoguo_scan_timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M") + market.setdefault("insights", []).append(f"[小果全市场] {view.get('market_verdict','中性')}") + hot_names = [s["name"] for s in view.get("hot_sectors", [])] + print(f"热门行业: {hot_names}", flush=True) + if not hot_names: + save_pool(pool) + return + + # 第2步 + hot_data = [s for s in sectors if s.get("name", "") in hot_names][:MAX_SECTORS] + new_count = 0 + for s in hot_data: + cands = step2(s, source) + if not cands: continue + for c in cands: + if add_or_update(pool, c["code"], c.get("name",""), s["name"], + c.get("score",5), c.get("reason",""), + c.get("entry_range",""), c.get("stop_loss",""), c.get("target",""), + c.get("verified_price",0), c.get("verified_change",0)): + new_count += 1 + print(f" + {c.get('name','')}({c['code']}) {c.get('score',0)}分", flush=True) + + cleanup(pool) + save_pool(pool) + print(f"完成: 新增{new_count}, 池内{pool['total_candidates']}活跃", flush=True) + +if __name__ == "__main__": + main()