#!/usr/bin/env python3 """xiaoguo_news_processor.py — 小果新闻情报处理 配合 trend_detector(每30分)运行,处理未处理的 sector_signals。 流程: 1. 读未 processed 的 signals(每次1条) 2. akshare 搜新闻(板块相关个股 + 持仓 + 自选) 3. 调小果 LLM 逐批分析(每批3-5篇,给摘要+情感) 4. 写入 signal_news 5. 标记 signal.processed = true """ import json import os import urllib.request import re from pathlib import Path try: import akshare as ak HAS_AKSHARE = True except ImportError: HAS_AKSHARE = False DATA_DIR = Path(__file__).parent / "data" DB_PATH = DATA_DIR / "mofin.db" XIAOGUO_API = "http://192.168.1.122:18003/v1/chat/completions" XIAOGUO_MODEL = "Qwen3.6-27B-MTPLX-Optimized-Speed" MAX_ARTICLES = 5 # 每次最多分析篇数(实测5篇12s) def clean_proxy(): for k in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']: os.environ.pop(k, None) def get_conn(): import sqlite3 conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row return conn def search_akshare_news(code, max_results=3): """用 akshare 搜个股新闻(含全文)""" articles = [] if not HAS_AKSHARE: return articles try: clean_proxy() df = ak.stock_news_em(symbol=code) for _, r in df.head(max_results).iterrows(): title = r.get('新闻标题', '') content = r.get('新闻内容', '') if title and len(title) > 5: articles.append({ "title": title, "content": content, "url": r.get('新闻链接', '') }) except: pass return articles def extract_json(text): """从回复中提取JSON数组或对象""" # 先找 ```json ... ``` 代码块 m = re.search(r'```(?:json)?\s*(\[[\s\S]*?\]|\{[\s\S]*?\})\s*```', text) if m: try: return json.loads(m.group(1)) except: pass # 找第一个 [ 或 { 到最后一个 ] 或 } for start_ch, end_ch in [('[', ']'), ('{', '}')]: s = text.find(start_ch) if s >= 0: depth = 0 for i in range(s, len(text)): if text[i] == start_ch: depth += 1 elif text[i] == end_ch: depth -= 1 if depth == 0: try: return json.loads(text[s:i+1]) except: break return None def call_xiaoguo(articles): """调小果LLM:给摘要+情感""" lines = [] for a in articles: title = re.sub(r'\b\d{6}\b', '', a['title']).strip() title = re.sub(r'\s+', ' ', title) content = (a.get('content') or '')[:200] # 给正文加标点分隔(akshare正文无标点,模型推理会卡) if content and not any(c in content for c in '。,!?;'): content = '。'.join([content[i:i+20] for i in range(0, len(content), 20)]) if content: lines.append(f"{len(lines)+1}. {title}\n {content}") else: lines.append(f"{len(lines)+1}. {title}") prompt = "\n".join(lines) + "\n\n逐篇分析:给摘要(概括核心内容)和情感(positive/negative/neutral)。JSON数组。" payload = json.dumps({ "model": XIAOGUO_MODEL, "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "max_tokens": 2048, }).encode() clean_proxy() opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) req = urllib.request.Request( XIAOGUO_API, data=payload, headers={"Content-Type": "application/json"}, method="POST" ) try: resp = opener.open(req, timeout=60) data = json.loads(resp.read()) content = data["choices"][0]["message"]["content"] result = extract_json(content) if isinstance(result, list): return result except Exception as e: print(f" 小果调用失败: {e}", flush=True) return None def translate_sentiment(s): """将英文情感转中文""" m = {"positive": "利好", "negative": "利空", "neutral": "中性"} return m.get(s.lower() if isinstance(s, str) else "", s) def main(): conn = get_conn() signals = conn.execute( "SELECT * FROM sector_signals WHERE processed = 0 ORDER BY severity DESC, id ASC LIMIT 1" ).fetchall() if not signals: print("无未处理的信号", flush=True) conn.close() return signal = dict(signals[0]) sector = signal["sector"] related = json.loads(signal["related_stocks"] or "[]") holdings = json.loads(signal["holdings_in_sector"] or "[]") watchlist = json.loads(signal["watchlist_in_sector"] or "[]") print(f"处理信号: [{signal['severity']}] {signal['signal_type']} {sector}", flush=True) codes = {} for item in related + holdings + watchlist: if item.get("code"): codes[item["code"]] = item.get("name", "") members = conn.execute( "SELECT s.code, s.name FROM stocks s JOIN stock_sectors ss ON s.code=ss.code WHERE ss.sector_name=? LIMIT 5", (sector,) ).fetchall() for m in members: if m["code"] not in codes: codes[m["code"]] = m["name"] all_articles = [] for code, name in codes.items(): arts = search_akshare_news(code, 3) for a in arts: if a["title"] not in [x["title"] for x in all_articles]: all_articles.append(a) print(f" 搜 {name}({code}): {len(arts)} 篇", flush=True) if not all_articles: print(" 未搜到新闻", flush=True) conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],)) conn.commit() conn.close() return # 只取前5篇,跳过含有表格数据的脏内容 filtered = [] for a in all_articles: c = a.get('content', '') or '' if any(kw in c for kw in ['主力资金', '资金净流入', '代码', '简称']): continue filtered.append(a) if len(filtered) >= MAX_ARTICLES: break batch = filtered[:MAX_ARTICLES] print(f" 共{len(all_articles)}篇,送小果分析{len(batch)}篇", flush=True) results = call_xiaoguo(batch) if not results: print(" 小果分析失败", flush=True) conn.close() return # 合并结果(用索引位置匹配) for i, r in enumerate(results): if i < len(batch): batch[i]["sentiment"] = translate_sentiment(r.get("sentiment", r.get("情感", ""))) batch[i]["summary"] = r.get("summary", r.get("摘要", "")) else: break # 汇总情感 sentiments = [a.get("sentiment", "中性") for a in batch if a.get("sentiment")] pos = sentiments.count("利好") neg = sentiments.count("利空") overall = "利好" if pos > neg * 1.5 else "利空" if neg > pos * 1.5 else "中性" summaries = [a.get("summary", "") for a in batch if a.get("summary")] combined = f"{sector}板块信号:{'|'.join(summaries[:3])}。总体{overall}。" searched_names = list(set(codes.values())) conn.execute( "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks) VALUES (?, ?, ?, ?, ?, ?)", (signal["id"], sector, overall, combined, json.dumps(batch, ensure_ascii=False), json.dumps(searched_names, ensure_ascii=False)) ) conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],)) conn.commit() print(f" 完成: {overall} — {combined[:100]}", flush=True) conn.close() if __name__ == "__main__": main()