#!/usr/bin/env python3 """xiaoguo_scanner.py — 小果独立扫描线 每5分钟跑一轮,全市场排行榜主动发现潜在标的。 不依赖 trend_detector 信号,独立产出到 signal_news。 """ import json, os, re, time, urllib.request from pathlib import Path from datetime import datetime try: import akshare as ak HAS_AKSHARE = True except ImportError: HAS_AKSHARE = False DATA_DIR = Path(__file__).parent / "data" DB_PATH = DATA_DIR / "mofin.db" XIAOGUO_API = "http://192.168.1.122:18003/v1/chat/completions" XIAOGUO_MODEL = "Qwen3.6-27B-MTPLX-Optimized-Speed" SCAN_INTERVAL = 3600 # 同一只股1小时内不重复搜 MAX_STOCKS_PER_RUN = 15 ARTICLES_PER_STOCK = 3 # 同花顺轮流榜 ROTATING_BOARDS = [ ("创新高", "stock_rank_cxg_ths"), ("量价齐升", "stock_rank_ljqs_ths"), ("向上突破", "stock_rank_xstp_ths"), ("连续上涨", "stock_rank_cxd_ths"), ("连续放量", "stock_rank_cxfl_ths"), ] def clean_proxy(): for k in ['http_proxy','https_proxy','HTTP_PROXY','HTTPS_PROXY']: os.environ.pop(k, None) def get_conn(): import sqlite3 conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row return conn def fetch_hot_board(): """东方财富热榜""" if not HAS_AKSHARE: return [] try: clean_proxy() df = ak.stock_hot_rank_em() if df is None or len(df) == 0: return [] # 东方财富热榜列名变化较大,自动检测 cols = list(df.columns) code_candidates = [c for c in cols if any(x in c for x in ['代码', 'code', 'CODE'])] name_candidates = [c for c in cols if any(x in c for x in ['简称', '名称', 'name', 'NAME'])] code_col = code_candidates[0] if code_candidates else cols[1] name_col = name_candidates[0] if name_candidates else cols[2] return [{"code": str(r[code_col]).zfill(6).strip(), "name": str(r[name_col]).strip(), "rank": i+1, "source": "东方财富热榜"} for i, (_, r) in enumerate(df.head(30).iterrows())] except Exception as e: print(f" 热榜失败: {e}", flush=True) return [] def fetch_rotating_board(): """同花顺轮流榜(每轮一个)""" if not HAS_AKSHARE: return [] conn = get_conn() row = conn.execute("SELECT val FROM state_meta WHERE key='xiaoguo_board_round'").fetchone() round_idx = (int(row[0]) if row else 0) % len(ROTATING_BOARDS) conn.execute("INSERT OR REPLACE INTO state_meta (key, val) VALUES ('xiaoguo_board_round', ?)", (str((round_idx + 1) % len(ROTATING_BOARDS)),)) conn.commit() conn.close() board_name, func_name = ROTATING_BOARDS[round_idx] print(f" 同花顺榜: {board_name}", flush=True) try: clean_proxy() fn = getattr(ak, func_name) df = fn() cols = list(df.columns) code_col = [c for c in cols if '代码' in c][0] name_col = [c for c in cols if '简称' in c or '名称' in c][0] return [{"code": str(r[code_col]).zfill(6), "name": str(r[name_col]).strip(), "source": f"同花顺{board_name}"} for _, r in df.head(15).iterrows()] except Exception as e: print(f" {board_name}失败: {e}", flush=True) return [] def get_scanned_codes(conn): """取1小时内已扫描过的代码""" rows = conn.execute( "SELECT code FROM xiaoguo_scan_tracker WHERE datetime(last_scanned_at) > datetime('now', '-1 hour')" ).fetchall() return {r[0] for r in rows} def mark_scanned(conn, code, name, found): conn.execute( "INSERT OR REPLACE INTO xiaoguo_scan_tracker (code, name, last_scanned_at, found_count) " "VALUES (?, ?, datetime('now','localtime'), COALESCE((SELECT found_count FROM xiaoguo_scan_tracker WHERE code=?),0)+?)", (code, name, code, 1 if found else 0) ) conn.commit() def search_news(code, max_results=3): """akshare搜个股新闻""" articles = [] if not HAS_AKSHARE: return articles try: clean_proxy() df = ak.stock_news_em(symbol=code) for _, r in df.head(max_results).iterrows(): title = r.get('新闻标题', '') content = r.get('新闻内容', '') if title and len(title) > 5: articles.append({"title": title, "content": content}) except: pass return articles def check_stock(code, name, articles): """小果LLM判断这只股票是否有料(一次调用判断所有文章)""" if not articles: return None, None lines = [f"{i+1}. {a['title']}" for i, a in enumerate(articles[:3])] prompt = f"""以下是最新关于{name}({code})的新闻标题。 该股今日上了人气热榜/技术榜单。 新闻: {chr(10).join(lines)} 这只股上榜是否跟这些新闻有关?有关的话是利好还是利空? 回答格式:有关(利好|利空|中性) 或 无关 回答:""" payload = json.dumps({ "model": XIAOGUO_MODEL, "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "max_tokens": 100, }).encode() opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) req = urllib.request.Request(XIAOGUO_API, data=payload, headers={"Content-Type": "application/json"}, method="POST") try: resp = opener.open(req, timeout=30) reply = json.loads(resp.read())["choices"][0]["message"]["content"] if "有关" in reply or "利好" in reply or "利空" in reply: for s in ["利好", "利空", "中性"]: if s in reply: return True, s return True, "中性" except: pass return None, None def main(): start_time = time.time() conn = get_conn() # 1. 拉榜 hot = fetch_hot_board() rotating = fetch_rotating_board() elapsed = time.time() - start_time print(f"榜单: 东方财富{len(hot)}只, 同花顺{len(rotating)}只 ({elapsed:.0f}s)", flush=True) if not hot and not rotating: conn.close() return # 2. 合并去重 all_stocks = {} for s in hot + rotating: code = s["code"] if code not in all_stocks: all_stocks[code] = {"code": code, "name": s["name"], "sources": []} all_stocks[code]["sources"].append(s["source"]) # 3. 排除已搜索过的 scanned = get_scanned_codes(conn) candidates = [s for code, s in all_stocks.items() if code not in scanned and len(code) == 6 and code.isdigit()][:MAX_STOCKS_PER_RUN] if not candidates: print(f"无新候选(已有 {len(scanned)} 只已扫描)", flush=True) conn.close() return print(f"待扫描: {len(candidates)} 只(跳过 {len(all_stocks)-len(candidates)} 只已扫过)", flush=True) # 4. 逐只搜新闻+判断 found_any = False for stock in candidates: code, name = stock["code"], stock["name"] sources = "|".join(stock["sources"]) articles = search_news(code, ARTICLES_PER_STOCK) if not articles: mark_scanned(conn, code, name, False) continue has_found = False ok, sentiment = check_stock(code, name, articles) if ok: has_found = True found_any = True sources = "|".join(stock["sources"]) conn.execute( "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) " "VALUES (NULL, ?, ?, ?, ?, ?, 'xiaoguo')", (f"扫描-{name}", sentiment, f"[{sources}] {articles[0]['title'][:80]}", json.dumps([{"title": a["title"], "sentiment": sentiment, "summary": (a.get("content") or "")[:100]} for a in articles[:3]], ensure_ascii=False), json.dumps([name], ensure_ascii=False)) ) print(f" ✅ {name}({code}) [{sources}] {sentiment}: {articles[0]['title'][:50]}", flush=True) mark_scanned(conn, code, name, has_found) total_time = time.time() - start_time print(f"完成: {len(candidates)}只扫描, {'有发现' if found_any else '无发现'} ({total_time:.0f}s)", flush=True) conn.close() if __name__ == "__main__": main()