#!/usr/bin/env python3 """mofin_news.py — 新闻采集(no_agent,无需LLM) 读未处理的 sector_signals,用 akshare 搜相关新闻, 去重后写入 signal_news 供知微分析。 """ import json import os import re from pathlib import Path try: import akshare as ak HAS_AKSHARE = True except ImportError: HAS_AKSHARE = False DATA_DIR = Path(__file__).parent / "data" DB_PATH = DATA_DIR / "mofin.db" MAX_ARTICLES = 5 def clean_proxy(): for k in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']: os.environ.pop(k, None) def get_conn(): import sqlite3 conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row return conn def search_akshare_news(code, max_results=3): """用 akshare 搜个股新闻(含全文)""" articles = [] if not HAS_AKSHARE: return articles try: clean_proxy() df = ak.stock_news_em(symbol=code) for _, r in df.head(max_results).iterrows(): title = r.get('新闻标题', '') content = r.get('新闻内容', '') if title and len(title) > 5: articles.append({ "title": title, "content": content, "url": r.get('新闻链接', ''), }) except: pass return articles def main(): conn = get_conn() signals = conn.execute( "SELECT * FROM sector_signals WHERE processed = 0 ORDER BY severity DESC, id ASC LIMIT 1" ).fetchall() if not signals: print("无未处理的信号", flush=True) conn.close() return signal = dict(signals[0]) sector = signal["sector"] related = json.loads(signal["related_stocks"] or "[]") holdings = json.loads(signal["holdings_in_sector"] or "[]") watchlist = json.loads(signal["watchlist_in_sector"] or "[]") print(f"处理信号: [{signal['severity']}] {signal['signal_type']} {sector}", flush=True) codes = {} for item in related + holdings + watchlist: if item.get("code"): codes[item["code"]] = item.get("name", "") members = conn.execute( "SELECT s.code, s.name FROM stocks s JOIN stock_sectors ss ON s.code=ss.code WHERE ss.sector_name=? LIMIT 5", (sector,) ).fetchall() for m in members: if m["code"] not in codes: codes[m["code"]] = m["name"] all_articles = [] for code, name in codes.items(): arts = search_akshare_news(code, 3) for a in arts: if a["title"] not in [x["title"] for x in all_articles]: all_articles.append(a) print(f" 搜 {name}({code}): {len(arts)} 篇", flush=True) if not all_articles: print(" 未搜到新闻", flush=True) conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],)) conn.commit() conn.close() return # 过滤脏数据,取前5篇 filtered = [] for a in all_articles: c = a.get('content', '') or '' if any(kw in c for kw in ['主力资金', '资金净流入', '代码', '简称']): continue filtered.append(a) if len(filtered) >= MAX_ARTICLES: break batch = filtered[:MAX_ARTICLES] print(f" 共{len(all_articles)}篇,采集{len(batch)}篇,交由知微分析", flush=True) searched_names = list(set(codes.values())) conn.execute( "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks) VALUES (?, ?, ?, ?, ?, ?)", (signal["id"], sector, "待知微判断", "", json.dumps(batch, ensure_ascii=False), json.dumps(searched_names, ensure_ascii=False)) ) conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],)) conn.commit() print(f" 完成: {len(batch)} 篇新闻已入库,等知微分析", flush=True) conn.close() if __name__ == "__main__": main()