#!/usr/bin/env python3 """xiaoguo_scanner.py — 小果独立扫描线 每5分钟跑一轮,全市场排行榜主动发现潜在标的。 不依赖 trend_detector 信号,独立产出到 signal_news。 """ import json, os, re, time, urllib.request from pathlib import Path from datetime import datetime try: import akshare as ak HAS_AKSHARE = True except ImportError: HAS_AKSHARE = False DATA_DIR = Path(__file__).parent / "data" DB_PATH = DATA_DIR / "mofin.db" XIAOGUO_API = "http://192.168.1.122:18003/v1/chat/completions" XIAOGUO_MODEL = "Qwen3.6-27B-MTPLX-Optimized-Speed" SCAN_INTERVAL = 3600 # 同一只股1小时内不重复搜 MAX_STOCKS_PER_RUN = 15 ARTICLES_PER_STOCK = 3 # 同花顺看多榜(挖掘潜力股) BULLISH_BOARDS = [ ("创新高", "stock_rank_cxg_ths"), ("量价齐升", "stock_rank_ljqs_ths"), ("向上突破", "stock_rank_xstp_ths"), ("连续上涨", "stock_rank_lxsz_ths"), ("持续放量", "stock_rank_cxfl_ths"), ("险资举牌", "stock_rank_xzjp_ths"), ] # 同花顺看空榜(持仓风险预警) BEARISH_BOARDS = [ ("创新低", "stock_rank_cxd_ths"), ("持续缩量", "stock_rank_cxsl_ths"), ("量价齐跌", "stock_rank_ljqd_ths"), ("连续下跌", "stock_rank_lxxd_ths"), ("向下突破", "stock_rank_xxtp_ths"), ] ALL_BOARDS = BULLISH_BOARDS + BEARISH_BOARDS BULLISH_COUNT = len(BULLISH_BOARDS) # 行业领涨股扫描(不轮换,每轮都跑) # 从 market.json 读热门行业领涨股 SECTOR_HOT_THRESHOLD = 2.5 # 板块涨幅>2.5%时捞它的领涨股 def clean_proxy(): for k in ['http_proxy','https_proxy','HTTP_PROXY','HTTPS_PROXY']: os.environ.pop(k, None) def get_conn(): import sqlite3 conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row return conn def fetch_hot_board(): """东方财富热榜""" if not HAS_AKSHARE: return [] try: clean_proxy() df = ak.stock_hot_rank_em() if df is None or len(df) == 0: return [] # 东方财富热榜列名变化较大,自动检测 cols = list(df.columns) code_candidates = [c for c in cols if any(x in c for x in ['代码', 'code', 'CODE'])] name_candidates = [c for c in cols if any(x in c for x in ['简称', '名称', 'name', 'NAME'])] code_col = code_candidates[0] if code_candidates else cols[1] name_col = name_candidates[0] if name_candidates else cols[2] return [{"code": str(r[code_col]).zfill(6).strip(), "name": str(r[name_col]).strip(), "rank": i+1, "source": "东方财富热榜"} for i, (_, r) in enumerate(df.head(30).iterrows())] except Exception: pass return [] def fetch_rotating_board(): """同花顺轮流榜(每轮一个),返回 (股票列表, 是否看多)""" if not HAS_AKSHARE: return [], True conn = get_conn() row = conn.execute("SELECT val FROM state_meta WHERE key='xiaoguo_board_round'").fetchone() round_idx = (int(row[0]) if row else 0) % len(ALL_BOARDS) conn.execute("INSERT OR REPLACE INTO state_meta (key, val) VALUES ('xiaoguo_board_round', ?)", (str((round_idx + 1) % len(ALL_BOARDS)),)) conn.commit() conn.close() board_name, func_name = ALL_BOARDS[round_idx] is_bullish = round_idx < BULLISH_COUNT print(f" 同花顺榜: {board_name} {'📈看多' if is_bullish else '📉看空'}", flush=True) try: clean_proxy() fn = getattr(ak, func_name) df = fn() cols = list(df.columns) code_col = [c for c in cols if '代码' in c][0] name_col = [c for c in cols if '简称' in c or '名称' in c][0] return [{"code": str(r[code_col]).zfill(6), "name": str(r[name_col]).strip(), "source": f"同花顺{board_name}"} for _, r in df.head(15).iterrows()], is_bullish except Exception as e: print(f" {board_name}失败: {e}", flush=True) return [], is_bullish def fetch_sector_leaders(): """从 market.json 读热门行业领涨股""" mkt_path = DATA_DIR / "market.json" if not mkt_path.exists(): return [] try: mkt = json.loads(mkt_path.read_text()) sectors = mkt.get("sectors", []) # 代码→名称映射(优先用本地缓存,避免每次跑都调akshare) cache_path = DATA_DIR / "stock_name_code_cache.json" name_to_code = {} if cache_path.exists(): name_to_code = json.loads(cache_path.read_text()) if not name_to_code: try: import akshare as ak df = ak.stock_info_a_code_name() for _, r in df.iterrows(): name_to_code[r["简称"]] = r["代码"] cache_path.write_text(json.dumps(name_to_code, ensure_ascii=False)) print(f" 名称代码映射: {len(name_to_code)}只已缓存", flush=True) except Exception as e: print(f" 名称代码映射加载失败: {e}", flush=True) leaders = [] seen = set() for s in sectors: chg = s.get("change", 0) or 0 lead_name = s.get("lead_stock", "") if chg < SECTOR_HOT_THRESHOLD or not lead_name or lead_name in seen: continue seen.add(lead_name) code = name_to_code.get(lead_name, "") if not code: continue leaders.append({ "code": code, "name": lead_name, "source": f"行业领涨-{s['name']}+{chg:+.1f}%", }) return leaders except Exception as e: print(f" 行业领涨获取失败: {e}", flush=True) return [] def get_scanned_codes(conn): """取1小时内已扫描过的代码""" rows = conn.execute( "SELECT code FROM xiaoguo_scan_tracker WHERE datetime(last_scanned_at) > datetime('now', '-1 hour')" ).fetchall() return {r[0] for r in rows} def mark_scanned(conn, code, name, found): conn.execute( "INSERT OR REPLACE INTO xiaoguo_scan_tracker (code, name, last_scanned_at, found_count) " "VALUES (?, ?, datetime('now','localtime'), COALESCE((SELECT found_count FROM xiaoguo_scan_tracker WHERE code=?),0)+?)", (code, name, code, 1 if found else 0) ) conn.commit() def search_news(code, max_results=3): """akshare搜个股新闻""" articles = [] if not HAS_AKSHARE: return articles try: clean_proxy() df = ak.stock_news_em(symbol=code) for _, r in df.head(max_results).iterrows(): title = r.get('新闻标题', '') content = r.get('新闻内容', '') if title and len(title) > 5: articles.append({"title": title, "content": content}) except: pass return articles def check_stock(code, name, articles): """小果LLM判断这只股票是否有料(一次调用判断所有文章)""" if not articles: return None, None lines = [f"{i+1}. {a['title']}" for i, a in enumerate(articles[:3])] prompt = f"""以下是最新关于{name}({code})的新闻标题。 该股今日上了人气热榜/技术榜单。 新闻: {chr(10).join(lines)} 这只股上榜是否跟这些新闻有关?有关的话是利好还是利空? 回答格式:有关(利好|利空|中性) 或 无关 回答:""" payload = json.dumps({ "model": XIAOGUO_MODEL, "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "max_tokens": 100, }).encode() opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) req = urllib.request.Request(XIAOGUO_API, data=payload, headers={"Content-Type": "application/json"}, method="POST") try: resp = opener.open(req, timeout=15) reply = json.loads(resp.read())["choices"][0]["message"]["content"] if "有关" in reply or "利好" in reply or "利空" in reply: for s in ["利好", "利空", "中性"]: if s in reply: return True, s return True, "中性" except Exception as e: # LLM不可达 → 降级:标记为unknown,不阻塞扫描流程 print(f" ⚠️ 小果LLM不可达({str(e)[:30]}),降级为unknown", flush=True) return True, "unknown" return None, None def main(): start_time = time.time() conn = get_conn() # 1. 拉板 hot = fetch_hot_board() rotating, is_bullish = fetch_rotating_board() leaders = fetch_sector_leaders() elapsed = time.time() - start_time print(f"榜单: 东方财富{len(hot)}只, 同花顺{len(rotating)}只, 行业领涨{len(leaders)}只 ({elapsed:.0f}s)", flush=True) if not hot and not rotating and not leaders: conn.close() return # 加载持仓代码(用于看空榜比对) holdings = set() if not is_bullish: cur = conn.execute("SELECT code FROM holdings WHERE is_active=1") holdings = {r[0].lstrip("0") for r in cur.fetchall()} # 也查自选 cur2 = conn.execute("SELECT code FROM watchlist_stocks") holdings.update({r[0].lstrip("0") for r in cur2.fetchall()}) # 2. 合并去重 + 看空榜只保留持仓股 all_stocks = {} # 行业领涨优先(热门板块龙头) for s in (leaders if is_bullish else []) + hot + rotating: code = s["code"] code_stripped = code.lstrip("0") if not is_bullish: # 看空榜:只处理持仓/自选中的股票 if code_stripped not in holdings: continue if code not in all_stocks: all_stocks[code] = {"code": code, "name": s["name"], "sources": []} all_stocks[code]["sources"].append(s["source"]) if not all_stocks: if is_bullish: print("榜单为空", flush=True) else: print(f"看空榜无持仓股命中", flush=True) conn.close() return # 3. 排除已搜索过的(看空榜不排除——每次都要检查风险) scanned = get_scanned_codes(conn) if is_bullish: candidates = [s for code, s in all_stocks.items() if code not in scanned and len(code) == 6 and code.isdigit()][:MAX_STOCKS_PER_RUN] else: # 看空榜:不限数量,全检 candidates = [s for code, s in all_stocks.items() if len(code) == 6 and code.isdigit()] if not candidates: print(f"无新候选(已有 {len(scanned)} 只已扫描)", flush=True) conn.close() return print(f"待扫描: {len(candidates)} 只({'看多' if is_bullish else '看空'}榜)", flush=True) # 4. 逐只处理 found_any = False for stock in candidates: code, name = stock["code"], stock["name"] sources = "|".join(stock["sources"]) articles = search_news(code, ARTICLES_PER_STOCK) if is_bullish else [] if not articles and is_bullish: mark_scanned(conn, code, name, False) continue has_found = False if is_bullish: ok, sentiment = check_stock(code, name, articles) if ok: has_found = True found_any = True conn.execute( "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) " "VALUES (NULL, ?, ?, ?, ?, ?, 'xiaoguo')", (f"扫描-{name}", sentiment, f"[{sources}] {articles[0]['title'][:80]}", json.dumps([{"title": a["title"], "sentiment": sentiment, "summary": (a.get("content") or "")[:100]} for a in articles[:3]], ensure_ascii=False), json.dumps([name], ensure_ascii=False)) ) print(f" ✅ {name}({code}) [{sources}] {sentiment}: {articles[0]['title'][:50]}", flush=True) mark_scanned(conn, code, name, has_found) else: # 看空榜:直接写入风险信号 has_found = True found_any = True conn.execute( "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) " "VALUES (NULL, ?, ?, ?, ?, ?, 'xiaoguo_risk')", (f"预警-{name}", "偏空", f"[{sources}] {name}登上{sources}榜,需关注持仓风险", json.dumps([{"title": name, "sentiment": "偏空", "summary": f"上榜{sources}"}], ensure_ascii=False), json.dumps([name], ensure_ascii=False)) ) print(f" ⚠️ {name}({code}) [{sources}] 持仓风险信号", flush=True) mark_scanned(conn, code, name, has_found) total_time = time.time() - start_time print(f"完成: {len(candidates)}只{'看多' if is_bullish else '看空'}扫描, {'有发现' if found_any else '无发现'} ({total_time:.0f}s)", flush=True) conn.close() if __name__ == "__main__": main()