Files
MoFin/xiaoguo_news_processor.py
T

260 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""xiaoguo_news_processor.py — 小果新闻情报处理
配合 trend_detector(每30分)运行,处理未处理的 sector_signals。
流程:
1. 读未 processed 的 signals(每次1条)
2. akshare 搜新闻(板块相关个股 + 持仓 + 自选)
3. 调小果 LLM 逐批分析(每批3-5篇,给摘要+情感)
4. 写入 signal_news
5. 标记 signal.processed = true
"""
import json
import os
import urllib.request
import re
from pathlib import Path
try:
import akshare as ak
HAS_AKSHARE = True
except ImportError:
HAS_AKSHARE = False
DATA_DIR = Path(__file__).parent / "data"
DB_PATH = DATA_DIR / "mofin.db"
XIAOGUO_API = "http://192.168.1.122:18003/v1/chat/completions"
XIAOGUO_MODEL = "Qwen3.6-27B-MTPLX-Optimized-Speed"
MAX_ARTICLES = 5 # 每次最多分析篇数(实测5篇12s
def clean_proxy():
for k in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
os.environ.pop(k, None)
def get_conn():
import sqlite3
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def search_akshare_news(code, max_results=3):
"""用 akshare 搜个股新闻(含全文)"""
articles = []
if not HAS_AKSHARE:
return articles
try:
clean_proxy()
df = ak.stock_news_em(symbol=code)
for _, r in df.head(max_results).iterrows():
title = r.get('新闻标题', '')
content = r.get('新闻内容', '')
if title and len(title) > 5:
articles.append({
"title": title,
"content": content,
"url": r.get('新闻链接', '')
})
except:
pass
return articles
def extract_json(text):
"""从回复中提取JSON数组或对象"""
# 先找 ```json ... ``` 代码块
m = re.search(r'```(?:json)?\s*(\[[\s\S]*?\]|\{[\s\S]*?\})\s*```', text)
if m:
try:
return json.loads(m.group(1))
except:
pass
# 找第一个 [ 或 { 到最后一个 ] 或 }
for start_ch, end_ch in [('[', ']'), ('{', '}')]:
s = text.find(start_ch)
if s >= 0:
depth = 0
for i in range(s, len(text)):
if text[i] == start_ch:
depth += 1
elif text[i] == end_ch:
depth -= 1
if depth == 0:
try:
return json.loads(text[s:i+1])
except:
break
return None
def call_xiaoguo(articles):
"""调小果LLM:给摘要+情感"""
lines = []
for a in articles:
title = re.sub(r'\b\d{6}\b', '', a['title']).strip()
title = re.sub(r'\s+', ' ', title)
content = a.get('content') or ''
# 给正文加标点分隔(akshare正文无标点,模型推理会卡)
if content and not any(c in content for c in '。,!?;'):
content = ''.join([content[i:i+20] for i in range(0, len(content), 20)])
if content:
lines.append(f"{len(lines)+1}. {title}\n {content}")
else:
lines.append(f"{len(lines)+1}. {title}")
prompt = "\n".join(lines) + "\n\n逐篇分析:给摘要(概括核心内容)和情感(positive/negative/neutral)。JSON数组。"
payload = json.dumps({
"model": XIAOGUO_MODEL,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"max_tokens": 2048,
}).encode()
clean_proxy()
opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
req = urllib.request.Request(
XIAOGUO_API, data=payload,
headers={"Content-Type": "application/json"}, method="POST"
)
try:
resp = opener.open(req, timeout=60)
data = json.loads(resp.read())
content = data["choices"][0]["message"]["content"]
result = extract_json(content)
if isinstance(result, list):
return result
except Exception as e:
print(f" 小果调用失败: {e}", flush=True)
return None
def translate_sentiment(s):
"""将英文情感转中文"""
m = {"positive": "利好", "negative": "利空", "neutral": "中性"}
return m.get(s.lower() if isinstance(s, str) else "", s)
def fallback_classify(batch):
"""关键词降级分类(小果API不可用时)"""
positive_kw = ['突破', '增长', '利好', '加单', '订单', '放量', '新高', '获批', '量产',
'超预期', '投产', '融资', '增持', '回购', '降息', '减税', '补贴',
'国产替代', '自主可控', '准入']
negative_kw = ['管制', '限制', '制裁', '利空', '减持', '抛售', '下跌', '跌停',
'风险', '违约', '调查', '暂停', '取消', '下滑', '亏损', '裁员',
'诉讼', '退市', '做空', '关税', '禁令']
for a in batch:
text = a['title'] + (a.get('content') or '')
pos = sum(1 for kw in positive_kw if kw in text)
neg = sum(1 for kw in negative_kw if kw in text)
if pos > neg:
a['sentiment'] = '利好'
elif neg > pos:
a['sentiment'] = '利空'
else:
a['sentiment'] = '中性'
a['summary'] = a['title'][:80]
return batch
def main():
conn = get_conn()
signals = conn.execute(
"SELECT * FROM sector_signals WHERE processed = 0 ORDER BY severity DESC, id ASC LIMIT 1"
).fetchall()
if not signals:
print("无未处理的信号", flush=True)
conn.close()
return
signal = dict(signals[0])
sector = signal["sector"]
related = json.loads(signal["related_stocks"] or "[]")
holdings = json.loads(signal["holdings_in_sector"] or "[]")
watchlist = json.loads(signal["watchlist_in_sector"] or "[]")
print(f"处理信号: [{signal['severity']}] {signal['signal_type']} {sector}", flush=True)
codes = {}
for item in related + holdings + watchlist:
if item.get("code"):
codes[item["code"]] = item.get("name", "")
members = conn.execute(
"SELECT s.code, s.name FROM stocks s JOIN stock_sectors ss ON s.code=ss.code WHERE ss.sector_name=? LIMIT 5",
(sector,)
).fetchall()
for m in members:
if m["code"] not in codes:
codes[m["code"]] = m["name"]
all_articles = []
for code, name in codes.items():
arts = search_akshare_news(code, 3)
for a in arts:
if a["title"] not in [x["title"] for x in all_articles]:
all_articles.append(a)
print(f"{name}({code}): {len(arts)}", flush=True)
if not all_articles:
print(" 未搜到新闻", flush=True)
conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],))
conn.commit()
conn.close()
return
# 只取前5篇,跳过含有表格数据的脏内容
filtered = []
for a in all_articles:
c = a.get('content', '') or ''
if any(kw in c for kw in ['主力资金', '资金净流入', '代码', '简称']):
continue
filtered.append(a)
if len(filtered) >= MAX_ARTICLES:
break
batch = filtered[:MAX_ARTICLES]
print(f"{len(all_articles)}篇,送小果分析{len(batch)}", flush=True)
results = call_xiaoguo(batch)
if not results:
print(" 小果API不可用,降级到关键词分类", flush=True)
fallback_classify(batch)
results = None # batch already has sentiment/summary set
if results and isinstance(results, list):
# 小果LLM返回结果,按索引匹配
for i, r in enumerate(results):
if i < len(batch):
batch[i]["sentiment"] = translate_sentiment(r.get("sentiment", r.get("情感", "")))
batch[i]["summary"] = r.get("summary", r.get("摘要", ""))
else:
break
# 汇总情感
sentiments = [a.get("sentiment", "中性") for a in batch if a.get("sentiment")]
pos = sentiments.count("利好")
neg = sentiments.count("利空")
overall = "利好" if pos > neg * 1.5 else "利空" if neg > pos * 1.5 else "中性"
summaries = [a.get("summary", "") for a in batch if a.get("summary")]
combined = f"{sector}板块信号:{''.join(summaries[:3])}。总体{overall}"
searched_names = list(set(codes.values()))
conn.execute(
"INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks) VALUES (?, ?, ?, ?, ?, ?)",
(signal["id"], sector, overall, combined, json.dumps(batch, ensure_ascii=False), json.dumps(searched_names, ensure_ascii=False))
)
conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],))
conn.commit()
print(f" 完成: {overall}{combined[:100]}", flush=True)
conn.close()
if __name__ == "__main__":
main()