diff --git a/docs/market-screening-pipeline.md b/docs/market-screening-pipeline.md index 4a27610..5cbb64b 100644 --- a/docs/market-screening-pipeline.md +++ b/docs/market-screening-pipeline.md @@ -46,7 +46,7 @@ 每轮流程(全部在一个cron内完成,~2-3分钟): ① market_watch → 拉90个行业板块数据 ② trend_detector → SQL检测17种信号 - ③ xiaoguo_news_processor → 搜新闻+LLM分析 + ③ mofin_news → 搜新闻(原文入库) ④ 我(知微)→ 判断信号 → 更新候选池 → 推报告/紧急消息 ``` @@ -136,23 +136,16 @@ --- -## 四、小果情报处理(:48,no_agent) +## 四、新闻采集(:48,no_agent) -### xiaoguo_news_processor.py +### mofin_news.py -读取未处理 signal(每次1条),做3件事: +读取未处理 signal(每次1条),用 akshare 搜新闻: +- 搜索范围:领涨股 + 成分股 + 持仓股 + 自选股 +- 去重后取前5篇,含标题 + 正文全文 +- 写入 signal_news,标记「待知微判断」 -**第1步:搜新闻** -用 akshare 搜 signal 涉及的每只股票的最新新闻。 -搜索范围:领涨股 + 成分股 + 持仓股 + 自选股 -去重后取前5篇,含标题 + 正文全文。 - -**第2步:小果 LLM 分析** -模型:Qwen3.6-27B-MTPLX-Optimized-Speed(192.168.1.122:18003) -每次送5篇,逐篇给完整摘要 + 情感(positive/negative/neutral)。 -单次调用约 10-15 秒,直接输出 JSON。 - -**第3步:写入 signal_news** +**不做情感分析,不调LLM。** 新闻分析由知微在下一轮cron中完成。 | 字段 | 说明 | |------|------| diff --git a/mofin_collect.py b/mofin_collect.py index 3fac27d..1159d2d 100644 --- a/mofin_collect.py +++ b/mofin_collect.py @@ -15,7 +15,7 @@ BASE = Path(__file__).parent.parent if "hermes" in str(Path(__file__).resolve()) SCRIPTS = [ ("market_watch.py", 60), ("trend_detector.py", 10), - ("xiaoguo_news_processor.py", 60), + ("mofin_news.py", 50), ] for script, timeout in SCRIPTS: diff --git a/mofin_collector.py b/mofin_collector.py new file mode 100644 index 0000000..6a9c28e --- /dev/null +++ b/mofin_collector.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +"""mofin_collector.py — 数据采集链:采集+检测+情报 + +在一个脚本内顺序跑完三件事: + 1. market_watch — 拉90个板块实时数据 + 2. trend_detector — 检测17种信号 + 3. xiaoguo_news_processor — 搜新闻+LLM分析 +""" + +import os +import subprocess +import sys +from datetime import datetime +from pathlib import Path + +BASE = Path(__file__).parent + +def run_script(name): + path = BASE / name + print(f"[{datetime.now().strftime('%H:%M:%S')}] 开始: {name}", flush=True) + result = subprocess.run( + [sys.executable, str(path)], + capture_output=True, text=True, timeout=120 + ) + if result.returncode == 0: + for line in result.stdout.strip().split('\n'): + if line: + print(f" {line}", flush=True) + print(f" ✅ {name} 完成", flush=True) + else: + print(f" ❌ {name} 失败: {result.stderr[:200]}", flush=True) + return result.returncode + +def main(): + # 跳过午休 + now = datetime.now() + hour = now.hour + minute = now.minute + if hour == 12 or (hour == 11 and minute > 30): + print("午休时间,跳过", flush=True) + return + + run_script("market_watch.py") + run_script("trend_detector.py") + run_script("xiaoguo_news_processor.py") + +if __name__ == "__main__": + main() diff --git a/mofin_news.py b/mofin_news.py new file mode 100644 index 0000000..4da6ed7 --- /dev/null +++ b/mofin_news.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +"""mofin_news.py — 新闻采集(no_agent,无需LLM) + +读未处理的 sector_signals,用 akshare 搜相关新闻, +去重后写入 signal_news 供知微分析。 +""" + +import json +import os +import re +from pathlib import Path + +try: + import akshare as ak + HAS_AKSHARE = True +except ImportError: + HAS_AKSHARE = False + +DATA_DIR = Path(__file__).parent / "data" +DB_PATH = DATA_DIR / "mofin.db" +MAX_ARTICLES = 5 + + +def clean_proxy(): + for k in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']: + os.environ.pop(k, None) + + +def get_conn(): + import sqlite3 + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + return conn + + +def search_akshare_news(code, max_results=3): + """用 akshare 搜个股新闻(含全文)""" + articles = [] + if not HAS_AKSHARE: + return articles + try: + clean_proxy() + df = ak.stock_news_em(symbol=code) + for _, r in df.head(max_results).iterrows(): + title = r.get('新闻标题', '') + content = r.get('新闻内容', '') + if title and len(title) > 5: + articles.append({ + "title": title, + "content": content, + "url": r.get('新闻链接', ''), + }) + except: + pass + return articles + + +def main(): + conn = get_conn() + signals = conn.execute( + "SELECT * FROM sector_signals WHERE processed = 0 ORDER BY severity DESC, id ASC LIMIT 1" + ).fetchall() + + if not signals: + print("无未处理的信号", flush=True) + conn.close() + return + + signal = dict(signals[0]) + sector = signal["sector"] + related = json.loads(signal["related_stocks"] or "[]") + holdings = json.loads(signal["holdings_in_sector"] or "[]") + watchlist = json.loads(signal["watchlist_in_sector"] or "[]") + + print(f"处理信号: [{signal['severity']}] {signal['signal_type']} {sector}", flush=True) + + codes = {} + for item in related + holdings + watchlist: + if item.get("code"): + codes[item["code"]] = item.get("name", "") + + members = conn.execute( + "SELECT s.code, s.name FROM stocks s JOIN stock_sectors ss ON s.code=ss.code WHERE ss.sector_name=? LIMIT 5", + (sector,) + ).fetchall() + for m in members: + if m["code"] not in codes: + codes[m["code"]] = m["name"] + + all_articles = [] + for code, name in codes.items(): + arts = search_akshare_news(code, 3) + for a in arts: + if a["title"] not in [x["title"] for x in all_articles]: + all_articles.append(a) + print(f" 搜 {name}({code}): {len(arts)} 篇", flush=True) + + if not all_articles: + print(" 未搜到新闻", flush=True) + conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],)) + conn.commit() + conn.close() + return + + # 过滤脏数据,取前5篇 + filtered = [] + for a in all_articles: + c = a.get('content', '') or '' + if any(kw in c for kw in ['主力资金', '资金净流入', '代码', '简称']): + continue + filtered.append(a) + if len(filtered) >= MAX_ARTICLES: + break + batch = filtered[:MAX_ARTICLES] + print(f" 共{len(all_articles)}篇,采集{len(batch)}篇,交由知微分析", flush=True) + + searched_names = list(set(codes.values())) + conn.execute( + "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks) VALUES (?, ?, ?, ?, ?, ?)", + (signal["id"], sector, "待知微判断", "", json.dumps(batch, ensure_ascii=False), json.dumps(searched_names, ensure_ascii=False)) + ) + conn.execute("UPDATE sector_signals SET processed=1 WHERE id=?", (signal["id"],)) + conn.commit() + print(f" 完成: {len(batch)} 篇新闻已入库,等知微分析", flush=True) + conn.close() + + +if __name__ == "__main__": + main()