Files
MoFin/xiaoguo_news_processor.py
T

165 lines
5.4 KiB
Python

#!/usr/bin/env python3
"""xiaoguo_news_processor.py — 小果新闻情报处理
配合 trend_detector(每30分)运行,处理未处理的 sector_signals。
流程:
1. 读未 processed 的 signals(每次1条)
2. akshare 搜新闻(板块相关个股 + 持仓 + 自选)
3. 调小果 LLM 逐篇分析情感
4. 写入 signal_news
5. 标记 signal.processed = true
"""
import json
import os
import urllib.request
from datetime import datetime
from pathlib import Path
try:
import akshare as ak
HAS_AKSHARE = True
except ImportError:
HAS_AKSHARE = False
DATA_DIR = Path(__file__).parent / "data"
DB_PATH = DATA_DIR / "mofin.db"
XIAOGUO_API = "http://192.168.1.122:18003/v1/chat/completions"
XIAOGUO_MODEL = "Qwen3.6-27B-AEON-Uncensored-4bit"
XIAOGUO_TIMEOUT = 120
def get_conn():
import sqlite3
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def search_akshare_news(code, max_results=3):
"""用 akshare 搜个股新闻"""
titles = []
if not HAS_AKSHARE:
return titles
try:
for k in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
os.environ.pop(k, None)
df = ak.stock_news_em(symbol=code)
for _, r in df.head(max_results).iterrows():
title = r.get('新闻标题', '')
if title and len(title) > 5:
titles.append({"title": title, "url": r.get('新闻链接', '')})
except:
pass
return titles
def classify_sentiment(title):
"""基于关键词的快速情感分类(不调LLM,速度快)"""
title_lower = title.lower()
positive_kw = ['突破', '增长', '利好', '加单', '订单', '放量', '新高', '获批', '量产',
'超预期', '供应', '投产', '融资', '加仓', '增持', '回购', '降息',
'减税', '补贴', '国产替代', '自主可控', '准入']
negative_kw = ['管制', '限制', '制裁', '利空', '减持', '抛售', '下跌', '跌停',
'风险', '违约', '调查', '暂停', '取消', '下滑', '亏损', '裁员',
'诉讼', '退市', '做空', '关税', '禁令']
pos_score = sum(1 for kw in positive_kw if kw in title)
neg_score = sum(1 for kw in negative_kw if kw in title)
if pos_score > neg_score:
return "利好"
elif neg_score > pos_score:
return "利空"
return "中性"
def main():
conn = get_conn()
# 读未处理的 signals(每次1条)
signals = conn.execute(
"SELECT * FROM sector_signals WHERE processed = 0 ORDER BY severity DESC, id ASC LIMIT 1"
).fetchall()
if not signals:
print("无未处理的信号", flush=True)
conn.close()
return
signal = dict(signals[0])
print(f"处理信号: [{signal['severity']}] {signal['signal_type']} {signal['sector']}", flush=True)
# 从信号中提取需要搜索的股票代码
sector = signal["sector"]
related = json.loads(signal["related_stocks"] or "[]")
holdings = json.loads(signal["holdings_in_sector"] or "[]")
watchlist = json.loads(signal["watchlist_in_sector"] or "[]")
# 收集所有要搜的股票代码
codes_to_search = {}
for item in related + holdings + watchlist:
code = item.get("code", "")
name = item.get("name", "")
if code:
codes_to_search[code] = name
# 补充板块成分股
members = conn.execute(
"SELECT s.code, s.name FROM stocks s "
"JOIN stock_sectors ss ON s.code = ss.code "
"WHERE ss.sector_name = ? LIMIT 5", (sector,)
).fetchall()
for m in members:
if m["code"] not in codes_to_search:
codes_to_search[m["code"]] = m["name"]
# 搜新闻
all_articles = []
for code, name in codes_to_search.items():
articles = search_akshare_news(code, 3)
for a in articles:
if a["title"] not in [x["title"] for x in all_articles]:
# 规则分类
a["sentiment"] = classify_sentiment(a["title"])
all_articles.append(a)
print(f"{name}({code}): {len(articles)}", flush=True)
if not all_articles:
print(f" 未搜到相关新闻", flush=True)
conn.execute("UPDATE sector_signals SET processed = 1 WHERE id = ?", (signal["id"],))
conn.commit()
conn.close()
return
print(f" 共搜到 {len(all_articles)} 篇新闻(规则分类)", flush=True)
# 统计总体情感
sentiments = [a["sentiment"] for a in all_articles]
pos = sentiments.count("利好")
neg = sentiments.count("利空")
overall = "利好" if pos > neg * 1.5 else "利空" if neg > pos * 1.5 else "中性"
summary = f"{sector}板块搜到{len(all_articles)}篇相关新闻,利好{pos}篇,利空{neg}篇,整体{overall}"
# 写入 signal_news
searched_names = list(set(codes_to_search.values()))
conn.execute("""
INSERT INTO signal_news
(signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks)
VALUES (?, ?, ?, ?, ?, ?)
""", (
signal["id"], sector, overall, summary,
json.dumps(all_articles, ensure_ascii=False),
json.dumps(searched_names, ensure_ascii=False),
))
conn.execute("UPDATE sector_signals SET processed = 1 WHERE id = ?", (signal["id"],))
conn.commit()
print(f" 完成: {overall}{summary}", flush=True)
conn.close()
if __name__ == "__main__":
main()