diff --git a/mo_alphasift_bridge.py b/mo_alphasift_bridge.py index 05c3816..f372e526 100644 --- a/mo_alphasift_bridge.py +++ b/mo_alphasift_bridge.py @@ -1,35 +1,27 @@ #!/usr/bin/env python3 """ -mo_alphasift_bridge.py — AlphaSift 选股 → MoFin 自选池自动对接 +mo_alphasift_bridge.py — AlphaSift 多策略并行选股 → MoFin 自选池 -流程: -1. 提交 AlphaSift 异步选股任务 → 轮询结果 -2. 过滤(评分阈值 + 去重) -3. 写入 MoFin watchlist.json(含来源/日期/策略备注) -4. 调用 regenerate_all() 生成策略 -5. price_monitor 自动接管 +支持同时跑多个策略,合并去重后写入自选池。 +默认三策略: balanced_alpha + dual_low + quality_value -用法: - python3 mo_alphasift_bridge.py # 默认策略 - python3 mo_alphasift_bridge.py --dry-run # 只看不写 - python3 mo_alphasift_bridge.py --min-score 6 # 最低评分 - python3 mo_alphasift_bridge.py --strategy list # 列出所有策略 - -cron: 0 10 * * 1-5 cd /home/hmo/MoFin && python3 mo_alphasift_bridge.py +用法: + python3 mo_alphasift_bridge.py # 默认三策略 + python3 mo_alphasift_bridge.py --strategy dual_low # 单策略 + python3 mo_alphasift_bridge.py --dry-run # 只看不写 + python3 mo_alphasift_bridge.py --strategy list # 列出所有策略 """ import sys, os, json, argparse, urllib.request, time from datetime import datetime from pathlib import Path -# ── 配置 ───────────────────────────────────────────────────────────── - DSA_API = "http://127.0.0.1:8001" MOFIN_DATA = Path("/home/hmo/web-dashboard/data") WATCHLIST_PATH = MOFIN_DATA / "watchlist.json" PORTFOLIO_PATH = MOFIN_DATA / "portfolio.json" -DEFAULT_STRATEGY = "balanced_alpha" +DEFAULT_STRATEGIES = "balanced_alpha,dual_low,quality_value" DEFAULT_MARKET = "cn" DEFAULT_MAX = 15 MIN_SCORE = 5 @@ -39,17 +31,13 @@ POLL_TIMEOUT = 300 def load_json(path): - try: - return json.loads(path.read_text(encoding="utf-8")) - except: - return None - + try: return json.loads(path.read_text(encoding="utf-8")) + except: return None def save_json(path, data): path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") - def api(endpoint, method="GET", body=None): url = f"{DSA_API}{endpoint}" data = json.dumps(body).encode() if body else None @@ -59,99 +47,119 @@ def api(endpoint, method="GET", body=None): with urllib.request.urlopen(req, timeout=30) as r: return json.loads(r.read()) except Exception as e: - print(f" API 错误: {e}") + print(f" API错误: {e}") return None - def get_existing_codes(): codes = set() for path in [WATCHLIST_PATH, PORTFOLIO_PATH]: data = load_json(path) - if not data: - continue + if not data: continue key = "stocks" if "watchlist" in str(path) else "holdings" for item in data.get(key, []): c = str(item.get("code", "")).strip() - if c: - codes.add(c) + if c: codes.add(c) return codes -def run_screen(strategy, market, max_results, dry_run=False): - now = datetime.now() - date_str = now.strftime("%Y-%m-%d") - time_str = now.strftime("%Y-%m-%d %H:%M") +def run_one_strategy(strategy, market, max_results): + """跑单个策略,返回候选股列表""" + print(f"\n{'='*50}") + print(f"策略: {strategy}") + print(f"{'='*50}") - # 1. 提交异步任务 - print(f"AlphaSift 选股: {strategy} (异步)", flush=True) task = api("/api/v1/alphasift/screen/tasks", "POST", { "strategy": strategy, "market": market, "max_results": max_results }) if not task or not task.get("task_id"): - print(" FAIL: 提交任务失败") - return + print(f" FAIL: 提交失败") + return [] task_id = task["task_id"] - print(f" 任务ID: {task_id[:12]}...", flush=True) + print(f" 任务: {task_id[:12]}...", flush=True) - # 2. 轮询 waited = 0 - result = None while waited < POLL_TIMEOUT: time.sleep(POLL_INTERVAL) waited += POLL_INTERVAL status = api(f"/api/v1/alphasift/screen/tasks/{task_id}") - if not status: - continue + if not status: continue s = status.get("status", "") if s == "completed": print(f" 完成 ({waited}s)") result = status.get("result", {}) - break + candidates = result.get("candidates", []) + print(f" 候选: {len(candidates)} 只") + if candidates: + for c in candidates[:3]: + print(f" {c.get('code','?')} {c.get('name',c.get('title','?'))} 评分{c.get('score','?'):.1f}") + return candidates elif s == "failed": - print(f" FAIL: {status.get('error', '')}") - return + print(f" FAIL: {status.get('error','')}") + return [] else: - pct = status.get("progress", 0) - print(f" ...{s} ({pct}%)", flush=True) - else: - print(f" FAIL: 超时 ({POLL_TIMEOUT}s)") + if waited % 60 == 0: + print(f" ...{s} ({status.get('progress',0)}%)", flush=True) + + print(f" FAIL: 超时") + return [] + + +def run_all(strategies_str, market, max_results, dry_run=False): + """多策略并行 → 合并去重 → MoFin 自选池""" + strategies = [s.strip() for s in strategies_str.split(",") if s.strip()] + now = datetime.now() + date_str = now.strftime("%Y-%m-%d") + time_str = now.strftime("%Y-%m-%d %H:%M") + + print(f"AlphaSift 多策略选股: {', '.join(strategies)}") + print(f"开始: {time_str}") + + # 逐个跑策略,汇总 + all_candidates = [] + seen = set() + for strategy in strategies: + candidates = run_one_strategy(strategy, market, max_results) + for c in candidates: + code = str(c.get("code", "")).strip() + if code in seen: continue + seen.add(code) + c["_strategy"] = strategy + all_candidates.append(c) + + if not all_candidates: + print("\n无候选股") return - candidates = result.get("candidates", []) - if not candidates: - print(" 无候选股") - return + print(f"\n汇总: {len(all_candidates)} 只候选股 (去重后)") + for s in strategies: + cnt = sum(1 for c in all_candidates if c.get("_strategy") == s) + print(f" {s}: {cnt} 只") - market_view = result.get("llm_market_view", "") - print(f" 返回 {len(candidates)} 只候选股") - if market_view: - print(f" 市场观点: {market_view[:100]}...") - - # 3. 过滤 + 去重 + # 过滤 existing = get_existing_codes() new_stocks = [] - skipped = 0 + skipped_score = 0 + skipped_dup = 0 - for c in candidates: + for c in all_candidates: code = str(c.get("code", "")).strip() score = c.get("score", 0) or c.get("llm_score", 0) or 0 + if score < MIN_SCORE: + skipped_score += 1; continue + if code in existing: + skipped_dup += 1; continue + name = c.get("name", "") or c.get("title", "") or code reason = c.get("reason", "") or c.get("llm_thesis", "") - catalysts = c.get("llm_catalysts", "") - risks = c.get("llm_risks", "") + src = c.get("_strategy", "unknown") - if score < MIN_SCORE: - skipped += 1 - continue - if code in existing: - continue + factors = c.get("factor_scores", {}) + factor_note = ", ".join(f"{k}={v:.0f}" for k,v in list(factors.items())[:3]) if factors else "" - notes_parts = [f"AlphaSift/{strategy} 评分{score}"] - if reason: - notes_parts.append(reason[:150]) - if catalysts: - notes_parts.append(f"催化剂:{catalysts[:100]}") + notes = f"AlphaSift/{src} 评分{score:.0f}" + if factor_note: notes += f" [{factor_note}]" + if reason: notes += f" | {reason[:120]}" new_stocks.append({ "code": code, @@ -159,83 +167,74 @@ def run_screen(strategy, market, max_results, dry_run=False): "price": c.get("price", 0), "source": "alpha_sift", "source_detail": { - "strategy": strategy, + "strategy": src, + "strategies_run": strategies, "score": score, + "factor_scores": factors, "date": date_str, "reason": reason[:300], - "catalysts": catalysts[:200] if catalysts else "", - "risks": risks[:200] if risks else "", - "market_view": market_view[:300] if market_view else "", }, - "notes": " | ".join(notes_parts), + "notes": notes, "added_at": time_str, "added_by": "AlphaSift", "analysis": {}, }) existing.add(code) - print(f" 过滤: {len(new_stocks)} 新标的 (跳过{skipped}只评分不足)") + print(f"\n过滤: {len(new_stocks)} 新标的 (评分不足{skipped_score} + 重复{skipped_dup})") if not new_stocks: - print(" 无符合条件的新标的") + print("无符合条件的新标的") return new_stocks = new_stocks[:MAX_ADD] - print(f"\n 新增 {len(new_stocks)} 只到自选池:") + print(f"\n新增 {len(new_stocks)} 只到自选池:") for s in new_stocks: - print(f" {s['code']} {s['name']} (评分{s['source_detail']['score']})") - if s['source_detail'].get('reason'): - print(f" {s['source_detail']['reason'][:120]}") + sd = s["source_detail"] + print(f" {s['code']} {s['name']} ({sd['strategy']} 评分{sd['score']:.0f})") if dry_run: - print("\n [DRY RUN] 未写入") + print("\n[DRY RUN] 未写入") return - # 4. 写入 + # 写入 wl = load_json(WATCHLIST_PATH) or {"stocks": []} wl["stocks"].extend(new_stocks) wl["updated_at"] = time_str save_json(WATCHLIST_PATH, wl) - print(f"\n 已写入 {WATCHLIST_PATH}") + print(f"\n已写入 {WATCHLIST_PATH}") - # 5. 策略生成 - print("\n 调用 regenerate_all() 生成策略...") + # 策略生成 + print("\n调用 regenerate_all()...") try: sys.path.insert(0, str(MOFIN_DATA.parent)) from strategy_lifecycle import regenerate_all r = regenerate_all(stdout=True) - if r: - print(f" 完成: {r.get('ok',0)}/{r.get('total',0)} 只策略已生成") - print(f" price_monitor 将在 2 分钟内自动开始监控") + if r: print(f"完成: {r.get('ok',0)}/{r.get('total',0)} 只策略已生成") except Exception as e: - print(f" WARN: regenerate_all 失败: {e}") - print(f" 股票已加入自选池,下次 cron 会自动处理") + print(f"WARN: {e}") def list_strategies(): r = api("/api/v1/alphasift/strategies") if r and r.get("strategies"): - print("AlphaSift 策略:") for s in r["strategies"]: - print(f" {s['id']:20s} {s.get('name','?'):15s} {s.get('description','')[:60]}") + print(f" {s['id']:22s} {s.get('name','?'):10s} {s.get('description','')[:60]}") def main(): global MIN_SCORE - parser = argparse.ArgumentParser(description="AlphaSift → MoFin") - parser.add_argument("--strategy", default=DEFAULT_STRATEGY) - parser.add_argument("--market", default=DEFAULT_MARKET) - parser.add_argument("--max", type=int, default=DEFAULT_MAX) - parser.add_argument("--min-score", type=int, default=MIN_SCORE) - parser.add_argument("--dry-run", action="store_true") - args = parser.parse_args() + p = argparse.ArgumentParser(description="AlphaSift → MoFin") + p.add_argument("--strategy", default=DEFAULT_STRATEGIES) + p.add_argument("--market", default=DEFAULT_MARKET) + p.add_argument("--max", type=int, default=DEFAULT_MAX) + p.add_argument("--min-score", type=int, default=MIN_SCORE) + p.add_argument("--dry-run", action="store_true") + args = p.parse_args() MIN_SCORE = args.min_score - - if args.strategy == "list": - list_strategies() - return - run_screen(args.strategy, args.market, args.max, args.dry_run) + if args.strategy == "list": list_strategies() + else: run_all(args.strategy, args.market, args.max, args.dry_run) if __name__ == "__main__":