From 790b0f9accee28d3bdde756443cdbd38bf382013 Mon Sep 17 00:00:00 2001 From: hmo Date: Tue, 30 Jun 2026 01:57:01 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20mo=5Falphasift=5Fbridge=20v2=20?= =?UTF-8?q?=E2=80=94=20async=20mode=20+=20source=20tracking=20(date/strate?= =?UTF-8?q?gy/notes)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mo_alphasift_bridge.py | 223 ++++++++++++++++++++++++++--------------- 1 file changed, 140 insertions(+), 83 deletions(-) diff --git a/mo_alphasift_bridge.py b/mo_alphasift_bridge.py index 9c75d6c..05c3816 100644 --- a/mo_alphasift_bridge.py +++ b/mo_alphasift_bridge.py @@ -3,20 +3,19 @@ mo_alphasift_bridge.py — AlphaSift 选股 → MoFin 自选池自动对接 流程: -1. 调用 DSA AlphaSift API → 获取候选股列表 -2. 过滤(评分阈值 + 去重自选池/持仓) -3. 写入 MoFin watchlist.json -4. 调用 regenerate_all() 生成策略 → decisions.json -5. price_monitor 自动接管监控 +1. 提交 AlphaSift 异步选股任务 → 轮询结果 +2. 过滤(评分阈值 + 去重) +3. 写入 MoFin watchlist.json(含来源/日期/策略备注) +4. 调用 regenerate_all() 生成策略 +5. price_monitor 自动接管 用法: - python3 mo_alphasift_bridge.py # 默认策略 balanced_alpha - python3 mo_alphasift_bridge.py --strategy dual_low # 指定策略 - python3 mo_alphasift_bridge.py --dry-run # 只看不写 - python3 mo_alphasift_bridge.py --min-score 6 # 最低评分阈值 + python3 mo_alphasift_bridge.py # 默认策略 + python3 mo_alphasift_bridge.py --dry-run # 只看不写 + python3 mo_alphasift_bridge.py --min-score 6 # 最低评分 + python3 mo_alphasift_bridge.py --strategy list # 列出所有策略 -集成到 cron: - 0 10 * * 1-5 cd /home/hmo/MoFin && python3 mo_alphasift_bridge.py +cron: 0 10 * * 1-5 cd /home/hmo/MoFin && python3 mo_alphasift_bridge.py """ import sys, os, json, argparse, urllib.request, time @@ -33,8 +32,10 @@ PORTFOLIO_PATH = MOFIN_DATA / "portfolio.json" DEFAULT_STRATEGY = "balanced_alpha" DEFAULT_MARKET = "cn" DEFAULT_MAX = 15 -MIN_SCORE = 5 # 最低评分 -MAX_ADD = 5 # 每次最多加几只 +MIN_SCORE = 5 +MAX_ADD = 5 +POLL_INTERVAL = 5 +POLL_TIMEOUT = 300 def load_json(path): @@ -49,135 +50,191 @@ def save_json(path, data): path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") -def call_api(endpoint, method="GET", body=None): +def api(endpoint, method="GET", body=None): url = f"{DSA_API}{endpoint}" data = json.dumps(body).encode() if body else None req = urllib.request.Request(url, data=data, method=method, headers={"Content-Type": "application/json"}) try: - with urllib.request.urlopen(req, timeout=120) as r: + with urllib.request.urlopen(req, timeout=30) as r: return json.loads(r.read()) except Exception as e: - print(f"API 错误: {e}") + print(f" API 错误: {e}") return None -# ── 核心逻辑 ───────────────────────────────────────────────────────── +def get_existing_codes(): + codes = set() + for path in [WATCHLIST_PATH, PORTFOLIO_PATH]: + data = load_json(path) + if not data: + continue + key = "stocks" if "watchlist" in str(path) else "holdings" + for item in data.get(key, []): + c = str(item.get("code", "")).strip() + if c: + codes.add(c) + return codes + def run_screen(strategy, market, max_results, dry_run=False): - """运行 AlphaSift 选股并写入 MoFin 自选池""" + now = datetime.now() + date_str = now.strftime("%Y-%m-%d") + time_str = now.strftime("%Y-%m-%d %H:%M") - # 1. AlphaSift 选股 - print(f"🔍 AlphaSift 选股: 策略={strategy} 市场={market}", flush=True) - result = call_api("/api/v1/alphasift/screen", "POST", { + # 1. 提交异步任务 + print(f"AlphaSift 选股: {strategy} (异步)", flush=True) + task = api("/api/v1/alphasift/screen/tasks", "POST", { "strategy": strategy, "market": market, "max_results": max_results }) - - if not result or not result.get("candidates"): - print(" ❌ 无候选股或 AlphaSift 返回空") + if not task or not task.get("task_id"): + print(" FAIL: 提交任务失败") return - candidates = result["candidates"] - print(f" AlphaSift 返回 {len(candidates)} 只候选股") - if result.get("llm_market_view"): - print(f" 市场观点: {result['llm_market_view'][:80]}...") + task_id = task["task_id"] + print(f" 任务ID: {task_id[:12]}...", flush=True) - # 2. 加载现有自选池和持仓,去重 - wl = load_json(WATCHLIST_PATH) or {"stocks": []} - pf = load_json(PORTFOLIO_PATH) or {"holdings": []} + # 2. 轮询 + waited = 0 + result = None + while waited < POLL_TIMEOUT: + time.sleep(POLL_INTERVAL) + waited += POLL_INTERVAL + status = api(f"/api/v1/alphasift/screen/tasks/{task_id}") + if not status: + continue + s = status.get("status", "") + if s == "completed": + print(f" 完成 ({waited}s)") + result = status.get("result", {}) + break + elif s == "failed": + print(f" FAIL: {status.get('error', '')}") + return + else: + pct = status.get("progress", 0) + print(f" ...{s} ({pct}%)", flush=True) + else: + print(f" FAIL: 超时 ({POLL_TIMEOUT}s)") + return - existing_codes = set() - for s in wl.get("stocks", []): - existing_codes.add(str(s.get("code", "")).strip()) - for h in pf.get("holdings", []): - existing_codes.add(str(h.get("code", "")).strip()) + candidates = result.get("candidates", []) + if not candidates: + print(" 无候选股") + return - # 3. 过滤:评分达标 + 不在现有自选/持仓中 + market_view = result.get("llm_market_view", "") + print(f" 返回 {len(candidates)} 只候选股") + if market_view: + print(f" 市场观点: {market_view[:100]}...") + + # 3. 过滤 + 去重 + existing = get_existing_codes() new_stocks = [] + skipped = 0 + for c in candidates: code = str(c.get("code", "")).strip() score = c.get("score", 0) or c.get("llm_score", 0) or 0 - name = c.get("name", "") or c.get("llm_thesis", "")[:20] or code + name = c.get("name", "") or c.get("title", "") or code + reason = c.get("reason", "") or c.get("llm_thesis", "") + catalysts = c.get("llm_catalysts", "") + risks = c.get("llm_risks", "") if score < MIN_SCORE: + skipped += 1 continue - if code in existing_codes: - print(f" ⏭ {code} {name} (已在自选/持仓中)") + if code in existing: continue - entry = { + notes_parts = [f"AlphaSift/{strategy} 评分{score}"] + if reason: + notes_parts.append(reason[:150]) + if catalysts: + notes_parts.append(f"催化剂:{catalysts[:100]}") + + new_stocks.append({ "code": code, "name": name, "price": c.get("price", 0), - "tag": f"alpha_sift_{strategy}", "source": "alpha_sift", - "added_at": datetime.now().strftime("%Y-%m-%d %H:%M"), - "alpha_score": score, - "alpha_reason": (c.get("reason", "") or c.get("llm_thesis", ""))[:200], + "source_detail": { + "strategy": strategy, + "score": score, + "date": date_str, + "reason": reason[:300], + "catalysts": catalysts[:200] if catalysts else "", + "risks": risks[:200] if risks else "", + "market_view": market_view[:300] if market_view else "", + }, + "notes": " | ".join(notes_parts), + "added_at": time_str, + "added_by": "AlphaSift", "analysis": {}, - } - new_stocks.append(entry) - existing_codes.add(code) + }) + existing.add(code) + + print(f" 过滤: {len(new_stocks)} 新标的 (跳过{skipped}只评分不足)") if not new_stocks: - print(" ℹ️ 没有符合条件的新标的") + print(" 无符合条件的新标的") return - # 限制数量 new_stocks = new_stocks[:MAX_ADD] - print(f"\n ✅ 新增 {len(new_stocks)} 只到自选池:") + + print(f"\n 新增 {len(new_stocks)} 只到自选池:") for s in new_stocks: - print(f" {s['code']} {s['name']} (评分{s['alpha_score']})") + print(f" {s['code']} {s['name']} (评分{s['source_detail']['score']})") + if s['source_detail'].get('reason'): + print(f" {s['source_detail']['reason'][:120]}") if dry_run: - print("\n [DRY RUN] 未实际写入") + print("\n [DRY RUN] 未写入") return - # 4. 写入 watchlist.json + # 4. 写入 + wl = load_json(WATCHLIST_PATH) or {"stocks": []} wl["stocks"].extend(new_stocks) - wl["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M") + wl["updated_at"] = time_str save_json(WATCHLIST_PATH, wl) - print(f" 已写入 {WATCHLIST_PATH}") + print(f"\n 已写入 {WATCHLIST_PATH}") - # 5. 触发策略生成 - print("\n🔄 调用 regenerate_all() 生成策略...") + # 5. 策略生成 + print("\n 调用 regenerate_all() 生成策略...") try: sys.path.insert(0, str(MOFIN_DATA.parent)) from strategy_lifecycle import regenerate_all - result = regenerate_all(stdout=True) - if result: - print(f" 策略生成完成: total={result.get('total',0)} ok={result.get('ok',0)}") + r = regenerate_all(stdout=True) + if r: + print(f" 完成: {r.get('ok',0)}/{r.get('total',0)} 只策略已生成") + print(f" price_monitor 将在 2 分钟内自动开始监控") except Exception as e: - print(f" ⚠️ regenerate_all() 失败: {e}") - print(" 新股票已加入自选池,下次 cron 会自动生成策略") + print(f" WARN: regenerate_all 失败: {e}") + print(f" 股票已加入自选池,下次 cron 会自动处理") -# ── CLI ────────────────────────────────────────────────────────────── +def list_strategies(): + r = api("/api/v1/alphasift/strategies") + if r and r.get("strategies"): + print("AlphaSift 策略:") + for s in r["strategies"]: + print(f" {s['id']:20s} {s.get('name','?'):15s} {s.get('description','')[:60]}") + def main(): - parser = argparse.ArgumentParser(description="AlphaSift → MoFin 自动对接") - parser.add_argument("--strategy", default=DEFAULT_STRATEGY, - help=f"选股策略 (默认: {DEFAULT_STRATEGY})") - parser.add_argument("--market", default=DEFAULT_MARKET, - help=f"市场 (默认: {DEFAULT_MARKET})") - parser.add_argument("--max", type=int, default=DEFAULT_MAX, - help=f"AlphaSift 最多返回 (默认: {DEFAULT_MAX})") - parser.add_argument("--min-score", type=int, default=MIN_SCORE, - help=f"最低评分 (默认: {MIN_SCORE})") - parser.add_argument("--dry-run", action="store_true", - help="只看不写") - + global MIN_SCORE + parser = argparse.ArgumentParser(description="AlphaSift → MoFin") + parser.add_argument("--strategy", default=DEFAULT_STRATEGY) + parser.add_argument("--market", default=DEFAULT_MARKET) + parser.add_argument("--max", type=int, default=DEFAULT_MAX) + parser.add_argument("--min-score", type=int, default=MIN_SCORE) + parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() + MIN_SCORE = args.min_score - # 列出可用策略 if args.strategy == "list": - r = call_api("/api/v1/alphasift/strategies") - if r and r.get("strategies"): - print("可用策略:") - for s in r["strategies"]: - print(f" {s['id']}: {s.get('name','?')} - {s.get('description','')[:60]}") + list_strategies() return - run_screen(args.strategy, args.market, args.max, args.dry_run)