#!/usr/bin/env python3 """xiaoguo_news_processor.py — 小果新闻情报处理 配合 trend_detector(每30分)运行,处理未处理的 sector_signals。 流程: 1. 读未 processed 的 signals(每次1条) 2. akshare 搜新闻(板块相关个股 + 持仓 + 自选) 3. 调小果 LLM 逐篇分析情感 4. 写入 signal_news 5. 标记 signal.processed = true """ import json import os import urllib.request from datetime import datetime from pathlib import Path try: import akshare as ak HAS_AKSHARE = True except ImportError: HAS_AKSHARE = False DATA_DIR = Path(__file__).parent / "data" DB_PATH = DATA_DIR / "mofin.db" XIAOGUO_API = "http://192.168.1.122:18003/v1/chat/completions" XIAOGUO_MODEL = "Qwen3.6-27B-MTPLX-Optimized-Speed" XIAOGUO_TIMEOUT = 60 def get_conn(): import sqlite3 conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row return conn def search_akshare_news(code, max_results=3): """用 akshare 搜个股新闻""" titles = [] if not HAS_AKSHARE: return titles try: for k in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']: os.environ.pop(k, None) df = ak.stock_news_em(symbol=code) for _, r in df.head(max_results).iterrows(): title = r.get('新闻标题', '') if title and len(title) > 5: titles.append({"title": title, "url": r.get('新闻链接', '')}) except: pass return titles def call_xiaoguo(articles_text, timeout=XIAOGUO_TIMEOUT): """调小果 LLM 分析新闻情感""" prompt = f"""分析以下新闻标题,对每篇给出情感分类和摘要,再加总体判断。 新闻: {articles_text} JSON格式: {{"overall_sentiment":"利好|利空|中性","summary":"总体判断","articles":[{{"title":"","sentiment":"","summary":"","reason":""}}]}}""" payload = json.dumps({ "model": XIAOGUO_MODEL, "messages": [ {"role": "system", "content": "你只输出JSON。"}, {"role": "user", "content": prompt} ], "temperature": 0.1, "max_tokens": 2000, }).encode() opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) req = urllib.request.Request( XIAOGUO_API, data=payload, headers={"Content-Type": "application/json"}, method="POST" ) try: resp = opener.open(req, timeout=timeout) result = json.loads(resp.read()) content = result["choices"][0]["message"]["content"] # 从末尾提取完整JSON depth = 0 start = -1 end = len(content) for i in range(len(content) - 1, -1, -1): if content[i] == "}": if depth == 0: end = i + 1 depth += 1 elif content[i] == "{": depth -= 1 if depth == 0: start = i break if start >= 0: return json.loads(content[start:end]) except Exception as e: print(f" 小果调用失败: {e}", flush=True) return None def main(): conn = get_conn() # 读未处理的 signals(每次1条) signals = conn.execute( "SELECT * FROM sector_signals WHERE processed = 0 ORDER BY severity DESC, id ASC LIMIT 1" ).fetchall() if not signals: print("无未处理的信号", flush=True) conn.close() return signal = dict(signals[0]) print(f"处理信号: [{signal['severity']}] {signal['signal_type']} {signal['sector']}", flush=True) # 从信号中提取需要搜索的股票代码 sector = signal["sector"] related = json.loads(signal["related_stocks"] or "[]") holdings = json.loads(signal["holdings_in_sector"] or "[]") watchlist = json.loads(signal["watchlist_in_sector"] or "[]") # 收集所有要搜的股票代码 codes_to_search = [] for item in related + holdings + watchlist: code = item.get("code", "") if code and code not in [c["code"] for c in codes_to_search]: codes_to_search.append(item) # 如果 stock_sectors 表中有成分股数据,也搜一下 members = conn.execute( "SELECT s.code, s.name FROM stocks s " "JOIN stock_sectors ss ON s.code = ss.code " "WHERE ss.sector_name = ? LIMIT 5", (sector,) ).fetchall() for m in members: if not any(c.get("code") == m["code"] for c in codes_to_search): codes_to_search.append({"code": m["code"], "name": m["name"]}) # 搜新闻 all_articles = [] for item in codes_to_search: code = item.get("code", "") name = item.get("name", "") if code: articles = search_akshare_news(code, 3) for a in articles: if a["title"] not in [x["title"] for x in all_articles]: all_articles.append(a) print(f" 搜 {name}({code}): {len(articles)} 篇", flush=True) if not all_articles: print(f" 未搜到相关新闻", flush=True) conn.execute("UPDATE sector_signals SET processed = 1 WHERE id = ?", (signal["id"],)) conn.commit() conn.close() return print(f" 共搜到 {len(all_articles)} 篇新闻,取前8篇分析", flush=True) # 只取前8篇,避免小果LLM处理超时 batch = all_articles[:8] # 调小果LLM分析 articles_text = "\n".join([f"{i+1}. {a['title']}" for i, a in enumerate(batch)]) result = call_xiaoguo(articles_text) if not result: print(" 小果分析失败", flush=True) conn.close() return # 写入 signal_news searched_names = list(set([c.get("name", "") for c in codes_to_search if c.get("name")])) conn.execute(""" INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks) VALUES (?, ?, ?, ?, ?, ?) """, ( signal["id"], sector, result.get("overall_sentiment", "中性"), result.get("summary", ""), json.dumps(result.get("articles", []), ensure_ascii=False), json.dumps(searched_names, ensure_ascii=False), )) conn.execute("UPDATE sector_signals SET processed = 1 WHERE id = ?", (signal["id"],)) conn.commit() print(f" 完成: {result.get('overall_sentiment', '?')} — {str(result.get('summary', ''))[:80]}", flush=True) conn.close() if __name__ == "__main__": main()