From 7814d1d4923b510bcd379769f7efc570e75e32b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Sat, 20 Jun 2026 23:36:53 +0800 Subject: [PATCH] =?UTF-8?q?xiaoguo=5Fnews=5Fprocessor:=20=E5=B0=8F?= =?UTF-8?q?=E6=9E=9CLLM=E5=88=86=E6=9E=903=E7=AF=87/=E6=AC=A1=EF=BC=8C?= =?UTF-8?q?=E5=90=AB=E6=91=98=E8=A6=81+=E6=83=85=E6=84=9F=EF=BC=8C60?= =?UTF-8?q?=E7=A7=92=E8=B7=91=E9=80=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- xiaoguo_news_processor.py | 184 ++++++++++++++++++++++++-------------- 1 file changed, 118 insertions(+), 66 deletions(-) diff --git a/xiaoguo_news_processor.py b/xiaoguo_news_processor.py index 4311ff8..121a03b 100644 --- a/xiaoguo_news_processor.py +++ b/xiaoguo_news_processor.py @@ -6,7 +6,7 @@ 流程: 1. 读未 processed 的 signals(每次1条) 2. akshare 搜新闻(板块相关个股 + 持仓 + 自选) - 3. 调小果 LLM 逐篇分析情感 + 3. 调小果 LLM 逐批分析(每批3-5篇,给摘要+情感) 4. 写入 signal_news 5. 标记 signal.processed = true """ @@ -14,7 +14,7 @@ import json import os import urllib.request -from datetime import datetime +import re from pathlib import Path try: @@ -26,8 +26,13 @@ except ImportError: DATA_DIR = Path(__file__).parent / "data" DB_PATH = DATA_DIR / "mofin.db" XIAOGUO_API = "http://192.168.1.122:18003/v1/chat/completions" -XIAOGUO_MODEL = "Qwen3.6-27B-AEON-Uncensored-4bit" -XIAOGUO_TIMEOUT = 120 +XIAOGUO_MODEL = "Qwen3.6-27B-MTPLX-Optimized-Speed" +MAX_ARTICLES = 3 # 每次最多分析篇数 + + +def clean_proxy(): + for k in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']: + os.environ.pop(k, None) def get_conn(): @@ -43,8 +48,7 @@ def search_akshare_news(code, max_results=3): if not HAS_AKSHARE: return titles try: - for k in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']: - os.environ.pop(k, None) + clean_proxy() df = ak.stock_news_em(symbol=code) for _, r in df.head(max_results).iterrows(): title = r.get('新闻标题', '') @@ -55,30 +59,76 @@ def search_akshare_news(code, max_results=3): return titles -def classify_sentiment(title): - """基于关键词的快速情感分类(不调LLM,速度快)""" - title_lower = title.lower() - positive_kw = ['突破', '增长', '利好', '加单', '订单', '放量', '新高', '获批', '量产', - '超预期', '供应', '投产', '融资', '加仓', '增持', '回购', '降息', - '减税', '补贴', '国产替代', '自主可控', '准入'] - negative_kw = ['管制', '限制', '制裁', '利空', '减持', '抛售', '下跌', '跌停', - '风险', '违约', '调查', '暂停', '取消', '下滑', '亏损', '裁员', - '诉讼', '退市', '做空', '关税', '禁令'] +def extract_json(text): + """从回复中提取JSON数组或对象""" + # 先找 ```json ... ``` 代码块 + m = re.search(r'```(?:json)?\s*(\[[\s\S]*?\]|\{[\s\S]*?\})\s*```', text) + if m: + try: + return json.loads(m.group(1)) + except: + pass + # 找第一个 [ 或 { 到最后一个 ] 或 } + for start_ch, end_ch in [('[', ']'), ('{', '}')]: + s = text.find(start_ch) + if s >= 0: + depth = 0 + for i in range(s, len(text)): + if text[i] == start_ch: + depth += 1 + elif text[i] == end_ch: + depth -= 1 + if depth == 0: + try: + return json.loads(text[s:i+1]) + except: + break + return None - pos_score = sum(1 for kw in positive_kw if kw in title) - neg_score = sum(1 for kw in negative_kw if kw in title) - if pos_score > neg_score: - return "利好" - elif neg_score > pos_score: - return "利空" - return "中性" +def call_xiaoguo(articles): + """调小果LLM:给摘要+情感""" + lines = [] + for a in articles: + # 清理标题:去掉股票代码(6位数字+前后空格) + title = re.sub(r'\b\d{6}\b', '', a['title']).strip() + title = re.sub(r'\s+', ' ', title) + lines.append(f"{len(lines)+1}. {title}") + prompt = "\n".join(lines) + "\n\n逐篇给一句话摘要和情感(positive/negative/neutral)。JSON数组。" + + payload = json.dumps({ + "model": XIAOGUO_MODEL, + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.1, + "max_tokens": 2048, + }).encode() + + clean_proxy() + opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) + req = urllib.request.Request( + XIAOGUO_API, data=payload, + headers={"Content-Type": "application/json"}, method="POST" + ) + try: + resp = opener.open(req, timeout=60) + data = json.loads(resp.read()) + content = data["choices"][0]["message"]["content"] + result = extract_json(content) + if isinstance(result, list): + return result + except Exception as e: + print(f" 小果调用失败: {e}", flush=True) + return None + + +def translate_sentiment(s): + """将英文情感转中文""" + m = {"positive": "利好", "negative": "利空", "neutral": "中性"} + return m.get(s.lower() if isinstance(s, str) else "", s) def main(): conn = get_conn() - - # 读未处理的 signals(每次1条) signals = conn.execute( "SELECT * FROM sector_signals WHERE processed = 0 ORDER BY severity DESC, id ASC LIMIT 1" ).fetchall() @@ -89,74 +139,76 @@ def main(): return signal = dict(signals[0]) - print(f"处理信号: [{signal['severity']}] {signal['signal_type']} {signal['sector']}", flush=True) - - # 从信号中提取需要搜索的股票代码 sector = signal["sector"] related = json.loads(signal["related_stocks"] or "[]") holdings = json.loads(signal["holdings_in_sector"] or "[]") watchlist = json.loads(signal["watchlist_in_sector"] or "[]") - # 收集所有要搜的股票代码 - codes_to_search = {} - for item in related + holdings + watchlist: - code = item.get("code", "") - name = item.get("name", "") - if code: - codes_to_search[code] = name + print(f"处理信号: [{signal['severity']}] {signal['signal_type']} {sector}", flush=True) + + codes = {} + for item in related + holdings + watchlist: + if item.get("code"): + codes[item["code"]] = item.get("name", "") - # 补充板块成分股 members = conn.execute( - "SELECT s.code, s.name FROM stocks s " - "JOIN stock_sectors ss ON s.code = ss.code " - "WHERE ss.sector_name = ? LIMIT 5", (sector,) + "SELECT s.code, s.name FROM stocks s JOIN stock_sectors ss ON s.code=ss.code WHERE ss.sector_name=? LIMIT 5", + (sector,) ).fetchall() for m in members: - if m["code"] not in codes_to_search: - codes_to_search[m["code"]] = m["name"] + if m["code"] not in codes: + codes[m["code"]] = m["name"] - # 搜新闻 all_articles = [] - for code, name in codes_to_search.items(): - articles = search_akshare_news(code, 3) - for a in articles: + for code, name in codes.items(): + arts = search_akshare_news(code, 3) + for a in arts: if a["title"] not in [x["title"] for x in all_articles]: - # 规则分类 - a["sentiment"] = classify_sentiment(a["title"]) all_articles.append(a) - print(f" 搜 {name}({code}): {len(articles)} 篇", flush=True) + print(f" 搜 {name}({code}): {len(arts)} 篇", flush=True) if not all_articles: - print(f" 未搜到相关新闻", flush=True) - conn.execute("UPDATE sector_signals SET processed = 1 WHERE id = ?", (signal["id"],)) + print(" 未搜到新闻", flush=True) + conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],)) conn.commit() conn.close() return - print(f" 共搜到 {len(all_articles)} 篇新闻(规则分类)", flush=True) + batch = all_articles[:MAX_ARTICLES] + print(f" 共{len(all_articles)}篇,送小果分析{len(batch)}篇", flush=True) - # 统计总体情感 - sentiments = [a["sentiment"] for a in all_articles] + results = call_xiaoguo(batch) + if not results: + print(" 小果分析失败", flush=True) + conn.close() + return + + # 合并结果(用索引位置匹配,不依赖标题文本) + for i, r in enumerate(results): + if i < len(batch): + batch[i]["sentiment"] = translate_sentiment(r.get("sentiment", "")) + batch[i]["summary"] = r.get("summary", "") + else: + # 超过批次的额外结果 + break + + # 汇总情感 + sentiments = [a.get("sentiment", "中性") for a in batch if a.get("sentiment")] pos = sentiments.count("利好") neg = sentiments.count("利空") overall = "利好" if pos > neg * 1.5 else "利空" if neg > pos * 1.5 else "中性" - summary = f"{sector}板块搜到{len(all_articles)}篇相关新闻,利好{pos}篇,利空{neg}篇,整体{overall}。" + summaries = [a.get("summary", "") for a in batch if a.get("summary")] + combined = f"{sector}板块信号:{'|'.join(summaries[:3])}。总体{overall}。" - # 写入 signal_news - searched_names = list(set(codes_to_search.values())) - conn.execute(""" - INSERT INTO signal_news - (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks) - VALUES (?, ?, ?, ?, ?, ?) - """, ( - signal["id"], sector, overall, summary, - json.dumps(all_articles, ensure_ascii=False), - json.dumps(searched_names, ensure_ascii=False), - )) - conn.execute("UPDATE sector_signals SET processed = 1 WHERE id = ?", (signal["id"],)) + searched_names = list(set(codes.values())) + conn.execute( + "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks) VALUES (?, ?, ?, ?, ?, ?)", + (signal["id"], sector, overall, combined, json.dumps(batch, ensure_ascii=False), json.dumps(searched_names, ensure_ascii=False)) + ) + conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],)) conn.commit() - print(f" 完成: {overall} — {summary}", flush=True) + print(f" 完成: {overall} — {combined[:100]}", flush=True) conn.close()