xiaoguo_news_processor: 小果LLM分析3篇/次,含摘要+情感,60秒跑通

This commit is contained in:
知微
2026-06-20 23:36:53 +08:00
parent 683bf39a46
commit 7814d1d492
+118 -66
View File
@@ -6,7 +6,7 @@
流程:
1. 读未 processed 的 signals(每次1条)
2. akshare 搜新闻(板块相关个股 + 持仓 + 自选)
3. 调小果 LLM 逐分析情感
3. 调小果 LLM 逐分析(每批3-5篇,给摘要+情感
4. 写入 signal_news
5. 标记 signal.processed = true
"""
@@ -14,7 +14,7 @@
import json
import os
import urllib.request
from datetime import datetime
import re
from pathlib import Path
try:
@@ -26,8 +26,13 @@ except ImportError:
DATA_DIR = Path(__file__).parent / "data"
DB_PATH = DATA_DIR / "mofin.db"
XIAOGUO_API = "http://192.168.1.122:18003/v1/chat/completions"
XIAOGUO_MODEL = "Qwen3.6-27B-AEON-Uncensored-4bit"
XIAOGUO_TIMEOUT = 120
XIAOGUO_MODEL = "Qwen3.6-27B-MTPLX-Optimized-Speed"
MAX_ARTICLES = 3 # 每次最多分析篇数
def clean_proxy():
for k in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
os.environ.pop(k, None)
def get_conn():
@@ -43,8 +48,7 @@ def search_akshare_news(code, max_results=3):
if not HAS_AKSHARE:
return titles
try:
for k in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
os.environ.pop(k, None)
clean_proxy()
df = ak.stock_news_em(symbol=code)
for _, r in df.head(max_results).iterrows():
title = r.get('新闻标题', '')
@@ -55,30 +59,76 @@ def search_akshare_news(code, max_results=3):
return titles
def classify_sentiment(title):
"""基于关键词的快速情感分类(不调LLM,速度快)"""
title_lower = title.lower()
positive_kw = ['突破', '增长', '利好', '加单', '订单', '放量', '新高', '获批', '量产',
'超预期', '供应', '投产', '融资', '加仓', '增持', '回购', '降息',
'减税', '补贴', '国产替代', '自主可控', '准入']
negative_kw = ['管制', '限制', '制裁', '利空', '减持', '抛售', '下跌', '跌停',
'风险', '违约', '调查', '暂停', '取消', '下滑', '亏损', '裁员',
'诉讼', '退市', '做空', '关税', '禁令']
def extract_json(text):
"""从回复中提取JSON数组或对象"""
# 先找 ```json ... ``` 代码块
m = re.search(r'```(?:json)?\s*(\[[\s\S]*?\]|\{[\s\S]*?\})\s*```', text)
if m:
try:
return json.loads(m.group(1))
except:
pass
# 找第一个 [ 或 { 到最后一个 ] 或 }
for start_ch, end_ch in [('[', ']'), ('{', '}')]:
s = text.find(start_ch)
if s >= 0:
depth = 0
for i in range(s, len(text)):
if text[i] == start_ch:
depth += 1
elif text[i] == end_ch:
depth -= 1
if depth == 0:
try:
return json.loads(text[s:i+1])
except:
break
return None
pos_score = sum(1 for kw in positive_kw if kw in title)
neg_score = sum(1 for kw in negative_kw if kw in title)
if pos_score > neg_score:
return "利好"
elif neg_score > pos_score:
return "利空"
return "中性"
def call_xiaoguo(articles):
"""调小果LLM:给摘要+情感"""
lines = []
for a in articles:
# 清理标题:去掉股票代码(6位数字+前后空格)
title = re.sub(r'\b\d{6}\b', '', a['title']).strip()
title = re.sub(r'\s+', ' ', title)
lines.append(f"{len(lines)+1}. {title}")
prompt = "\n".join(lines) + "\n\n逐篇给一句话摘要和情感(positive/negative/neutral)。JSON数组。"
payload = json.dumps({
"model": XIAOGUO_MODEL,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"max_tokens": 2048,
}).encode()
clean_proxy()
opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
req = urllib.request.Request(
XIAOGUO_API, data=payload,
headers={"Content-Type": "application/json"}, method="POST"
)
try:
resp = opener.open(req, timeout=60)
data = json.loads(resp.read())
content = data["choices"][0]["message"]["content"]
result = extract_json(content)
if isinstance(result, list):
return result
except Exception as e:
print(f" 小果调用失败: {e}", flush=True)
return None
def translate_sentiment(s):
"""将英文情感转中文"""
m = {"positive": "利好", "negative": "利空", "neutral": "中性"}
return m.get(s.lower() if isinstance(s, str) else "", s)
def main():
conn = get_conn()
# 读未处理的 signals(每次1条)
signals = conn.execute(
"SELECT * FROM sector_signals WHERE processed = 0 ORDER BY severity DESC, id ASC LIMIT 1"
).fetchall()
@@ -89,74 +139,76 @@ def main():
return
signal = dict(signals[0])
print(f"处理信号: [{signal['severity']}] {signal['signal_type']} {signal['sector']}", flush=True)
# 从信号中提取需要搜索的股票代码
sector = signal["sector"]
related = json.loads(signal["related_stocks"] or "[]")
holdings = json.loads(signal["holdings_in_sector"] or "[]")
watchlist = json.loads(signal["watchlist_in_sector"] or "[]")
# 收集所有要搜的股票代码
codes_to_search = {}
for item in related + holdings + watchlist:
code = item.get("code", "")
name = item.get("name", "")
if code:
codes_to_search[code] = name
print(f"处理信号: [{signal['severity']}] {signal['signal_type']} {sector}", flush=True)
codes = {}
for item in related + holdings + watchlist:
if item.get("code"):
codes[item["code"]] = item.get("name", "")
# 补充板块成分股
members = conn.execute(
"SELECT s.code, s.name FROM stocks s "
"JOIN stock_sectors ss ON s.code = ss.code "
"WHERE ss.sector_name = ? LIMIT 5", (sector,)
"SELECT s.code, s.name FROM stocks s JOIN stock_sectors ss ON s.code=ss.code WHERE ss.sector_name=? LIMIT 5",
(sector,)
).fetchall()
for m in members:
if m["code"] not in codes_to_search:
codes_to_search[m["code"]] = m["name"]
if m["code"] not in codes:
codes[m["code"]] = m["name"]
# 搜新闻
all_articles = []
for code, name in codes_to_search.items():
articles = search_akshare_news(code, 3)
for a in articles:
for code, name in codes.items():
arts = search_akshare_news(code, 3)
for a in arts:
if a["title"] not in [x["title"] for x in all_articles]:
# 规则分类
a["sentiment"] = classify_sentiment(a["title"])
all_articles.append(a)
print(f"{name}({code}): {len(articles)}", flush=True)
print(f"{name}({code}): {len(arts)}", flush=True)
if not all_articles:
print(f" 未搜到相关新闻", flush=True)
conn.execute("UPDATE sector_signals SET processed = 1 WHERE id = ?", (signal["id"],))
print(" 未搜到新闻", flush=True)
conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],))
conn.commit()
conn.close()
return
print(f" 共搜到 {len(all_articles)} 篇新闻(规则分类)", flush=True)
batch = all_articles[:MAX_ARTICLES]
print(f"{len(all_articles)}篇,送小果分析{len(batch)}", flush=True)
# 统计总体情感
sentiments = [a["sentiment"] for a in all_articles]
results = call_xiaoguo(batch)
if not results:
print(" 小果分析失败", flush=True)
conn.close()
return
# 合并结果(用索引位置匹配,不依赖标题文本)
for i, r in enumerate(results):
if i < len(batch):
batch[i]["sentiment"] = translate_sentiment(r.get("sentiment", ""))
batch[i]["summary"] = r.get("summary", "")
else:
# 超过批次的额外结果
break
# 汇总情感
sentiments = [a.get("sentiment", "中性") for a in batch if a.get("sentiment")]
pos = sentiments.count("利好")
neg = sentiments.count("利空")
overall = "利好" if pos > neg * 1.5 else "利空" if neg > pos * 1.5 else "中性"
summary = f"{sector}板块搜到{len(all_articles)}篇相关新闻,利好{pos}篇,利空{neg}篇,整体{overall}"
summaries = [a.get("summary", "") for a in batch if a.get("summary")]
combined = f"{sector}板块信号:{''.join(summaries[:3])}。总体{overall}"
# 写入 signal_news
searched_names = list(set(codes_to_search.values()))
conn.execute("""
INSERT INTO signal_news
(signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks)
VALUES (?, ?, ?, ?, ?, ?)
""", (
signal["id"], sector, overall, summary,
json.dumps(all_articles, ensure_ascii=False),
json.dumps(searched_names, ensure_ascii=False),
))
conn.execute("UPDATE sector_signals SET processed = 1 WHERE id = ?", (signal["id"],))
searched_names = list(set(codes.values()))
conn.execute(
"INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks) VALUES (?, ?, ?, ?, ?, ?)",
(signal["id"], sector, overall, combined, json.dumps(batch, ensure_ascii=False), json.dumps(searched_names, ensure_ascii=False))
)
conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],))
conn.commit()
print(f" 完成: {overall}{summary}", flush=True)
print(f" 完成: {overall}{combined[:100]}", flush=True)
conn.close()