130 lines
3.9 KiB
Python
130 lines
3.9 KiB
Python
#!/usr/bin/env python3
|
||
"""mofin_news.py — 新闻采集(no_agent,无需LLM)
|
||
|
||
读未处理的 sector_signals,用 akshare 搜相关新闻,
|
||
去重后写入 signal_news 供知微分析。
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
from pathlib import Path
|
||
|
||
try:
|
||
import akshare as ak
|
||
HAS_AKSHARE = True
|
||
except ImportError:
|
||
HAS_AKSHARE = False
|
||
|
||
DATA_DIR = Path(__file__).parent / "data"
|
||
DB_PATH = DATA_DIR / "mofin.db"
|
||
MAX_ARTICLES = 5
|
||
|
||
|
||
def clean_proxy():
|
||
for k in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
|
||
os.environ.pop(k, None)
|
||
|
||
|
||
def get_conn():
|
||
import sqlite3
|
||
conn = sqlite3.connect(str(DB_PATH))
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
|
||
def search_akshare_news(code, max_results=3):
|
||
"""用 akshare 搜个股新闻(含全文)"""
|
||
articles = []
|
||
if not HAS_AKSHARE:
|
||
return articles
|
||
try:
|
||
clean_proxy()
|
||
df = ak.stock_news_em(symbol=code)
|
||
for _, r in df.head(max_results).iterrows():
|
||
title = r.get('新闻标题', '')
|
||
content = r.get('新闻内容', '')
|
||
if title and len(title) > 5:
|
||
articles.append({
|
||
"title": title,
|
||
"content": content,
|
||
"url": r.get('新闻链接', ''),
|
||
})
|
||
except:
|
||
pass
|
||
return articles
|
||
|
||
|
||
def main():
|
||
conn = get_conn()
|
||
signals = conn.execute(
|
||
"SELECT * FROM sector_signals WHERE processed = 0 ORDER BY severity DESC, id ASC LIMIT 1"
|
||
).fetchall()
|
||
|
||
if not signals:
|
||
print("无未处理的信号", flush=True)
|
||
conn.close()
|
||
return
|
||
|
||
signal = dict(signals[0])
|
||
sector = signal["sector"]
|
||
related = json.loads(signal["related_stocks"] or "[]")
|
||
holdings = json.loads(signal["holdings_in_sector"] or "[]")
|
||
watchlist = json.loads(signal["watchlist_in_sector"] or "[]")
|
||
|
||
print(f"处理信号: [{signal['severity']}] {signal['signal_type']} {sector}", flush=True)
|
||
|
||
codes = {}
|
||
for item in related + holdings + watchlist:
|
||
if item.get("code"):
|
||
codes[item["code"]] = item.get("name", "")
|
||
|
||
members = conn.execute(
|
||
"SELECT s.code, s.name FROM stocks s JOIN stock_sectors ss ON s.code=ss.code WHERE ss.sector_name=? LIMIT 5",
|
||
(sector,)
|
||
).fetchall()
|
||
for m in members:
|
||
if m["code"] not in codes:
|
||
codes[m["code"]] = m["name"]
|
||
|
||
all_articles = []
|
||
for code, name in codes.items():
|
||
arts = search_akshare_news(code, 3)
|
||
for a in arts:
|
||
if a["title"] not in [x["title"] for x in all_articles]:
|
||
all_articles.append(a)
|
||
print(f" 搜 {name}({code}): {len(arts)} 篇", flush=True)
|
||
|
||
if not all_articles:
|
||
print(" 未搜到新闻", flush=True)
|
||
conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],))
|
||
conn.commit()
|
||
conn.close()
|
||
return
|
||
|
||
# 过滤脏数据,取前5篇
|
||
filtered = []
|
||
for a in all_articles:
|
||
c = a.get('content', '') or ''
|
||
if any(kw in c for kw in ['主力资金', '资金净流入', '代码', '简称']):
|
||
continue
|
||
filtered.append(a)
|
||
if len(filtered) >= MAX_ARTICLES:
|
||
break
|
||
batch = filtered[:MAX_ARTICLES]
|
||
print(f" 共{len(all_articles)}篇,采集{len(batch)}篇,交由知微分析", flush=True)
|
||
|
||
searched_names = list(set(codes.values()))
|
||
conn.execute(
|
||
"INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks) VALUES (?, ?, ?, ?, ?, ?)",
|
||
(signal["id"], sector, "待知微判断", "", json.dumps(batch, ensure_ascii=False), json.dumps(searched_names, ensure_ascii=False))
|
||
)
|
||
conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],))
|
||
conn.commit()
|
||
print(f" 完成: {len(batch)} 篇新闻已入库,等知微分析", flush=True)
|
||
conn.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|