Files
MoFin/mo_alphasift_bridge.py
T

242 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""
mo_alphasift_bridge.py — AlphaSift 多策略并行选股 → MoFin 自选池
支持同时跑多个策略,合并去重后写入自选池。
默认三策略: balanced_alpha + dual_low + quality_value
用法:
python3 mo_alphasift_bridge.py # 默认三策略
python3 mo_alphasift_bridge.py --strategy dual_low # 单策略
python3 mo_alphasift_bridge.py --dry-run # 只看不写
python3 mo_alphasift_bridge.py --strategy list # 列出所有策略
"""
import sys, os, json, argparse, urllib.request, time
from datetime import datetime
from pathlib import Path
DSA_API = "http://127.0.0.1:8001"
MOFIN_DATA = Path("/home/hmo/web-dashboard/data")
WATCHLIST_PATH = MOFIN_DATA / "watchlist.json"
PORTFOLIO_PATH = MOFIN_DATA / "portfolio.json"
DEFAULT_STRATEGIES = "balanced_alpha,dual_low,quality_value"
DEFAULT_MARKET = "cn"
DEFAULT_MAX = 15
MIN_SCORE = 5
MAX_ADD = 5
POLL_INTERVAL = 5
POLL_TIMEOUT = 300
def load_json(path):
try: return json.loads(path.read_text(encoding="utf-8"))
except: return None
def save_json(path, data):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def api(endpoint, method="GET", body=None):
url = f"{DSA_API}{endpoint}"
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(url, data=data, method=method,
headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=30) as r:
return json.loads(r.read())
except Exception as e:
print(f" API错误: {e}")
return None
def get_existing_codes():
codes = set()
for path in [WATCHLIST_PATH, PORTFOLIO_PATH]:
data = load_json(path)
if not data: continue
key = "stocks" if "watchlist" in str(path) else "holdings"
for item in data.get(key, []):
c = str(item.get("code", "")).strip()
if c: codes.add(c)
return codes
def run_one_strategy(strategy, market, max_results):
"""跑单个策略,返回候选股列表"""
print(f"\n{'='*50}")
print(f"策略: {strategy}")
print(f"{'='*50}")
task = api("/api/v1/alphasift/screen/tasks", "POST", {
"strategy": strategy, "market": market, "max_results": max_results
})
if not task or not task.get("task_id"):
print(f" FAIL: 提交失败")
return []
task_id = task["task_id"]
print(f" 任务: {task_id[:12]}...", flush=True)
waited = 0
while waited < POLL_TIMEOUT:
time.sleep(POLL_INTERVAL)
waited += POLL_INTERVAL
status = api(f"/api/v1/alphasift/screen/tasks/{task_id}")
if not status: continue
s = status.get("status", "")
if s == "completed":
print(f" 完成 ({waited}s)")
result = status.get("result", {})
candidates = result.get("candidates", [])
print(f" 候选: {len(candidates)}")
if candidates:
for c in candidates[:3]:
print(f" {c.get('code','?')} {c.get('name',c.get('title','?'))} 评分{c.get('score','?'):.1f}")
return candidates
elif s == "failed":
print(f" FAIL: {status.get('error','')}")
return []
else:
if waited % 60 == 0:
print(f" ...{s} ({status.get('progress',0)}%)", flush=True)
print(f" FAIL: 超时")
return []
def run_all(strategies_str, market, max_results, dry_run=False):
"""多策略并行 → 合并去重 → MoFin 自选池"""
strategies = [s.strip() for s in strategies_str.split(",") if s.strip()]
now = datetime.now()
date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%Y-%m-%d %H:%M")
print(f"AlphaSift 多策略选股: {', '.join(strategies)}")
print(f"开始: {time_str}")
# 逐个跑策略,汇总
all_candidates = []
seen = set()
for strategy in strategies:
candidates = run_one_strategy(strategy, market, max_results)
for c in candidates:
code = str(c.get("code", "")).strip()
if code in seen: continue
seen.add(code)
c["_strategy"] = strategy
all_candidates.append(c)
if not all_candidates:
print("\n无候选股")
return
print(f"\n汇总: {len(all_candidates)} 只候选股 (去重后)")
for s in strategies:
cnt = sum(1 for c in all_candidates if c.get("_strategy") == s)
print(f" {s}: {cnt}")
# 过滤
existing = get_existing_codes()
new_stocks = []
skipped_score = 0
skipped_dup = 0
for c in all_candidates:
code = str(c.get("code", "")).strip()
score = c.get("score", 0) or c.get("llm_score", 0) or 0
if score < MIN_SCORE:
skipped_score += 1; continue
if code in existing:
skipped_dup += 1; continue
name = c.get("name", "") or c.get("title", "") or code
reason = c.get("reason", "") or c.get("llm_thesis", "")
src = c.get("_strategy", "unknown")
factors = c.get("factor_scores", {})
factor_note = ", ".join(f"{k}={v:.0f}" for k,v in list(factors.items())[:3]) if factors else ""
notes = f"AlphaSift/{src} 评分{score:.0f}"
if factor_note: notes += f" [{factor_note}]"
if reason: notes += f" | {reason[:120]}"
new_stocks.append({
"code": code,
"name": name,
"price": c.get("price", 0),
"source": "alpha_sift",
"source_detail": {
"strategy": src,
"strategies_run": strategies,
"score": score,
"factor_scores": factors,
"date": date_str,
"reason": reason[:300],
},
"notes": notes,
"added_at": time_str,
"added_by": "AlphaSift",
"analysis": {},
})
existing.add(code)
print(f"\n过滤: {len(new_stocks)} 新标的 (评分不足{skipped_score} + 重复{skipped_dup})")
if not new_stocks:
print("无符合条件的新标的")
return
new_stocks = new_stocks[:MAX_ADD]
print(f"\n新增 {len(new_stocks)} 只到自选池:")
for s in new_stocks:
sd = s["source_detail"]
print(f" {s['code']} {s['name']} ({sd['strategy']} 评分{sd['score']:.0f})")
if dry_run:
print("\n[DRY RUN] 未写入")
return
# 写入
wl = load_json(WATCHLIST_PATH) or {"stocks": []}
wl["stocks"].extend(new_stocks)
wl["updated_at"] = time_str
save_json(WATCHLIST_PATH, wl)
print(f"\n已写入 {WATCHLIST_PATH}")
# 策略生成
print("\n调用 regenerate_all()...")
try:
sys.path.insert(0, str(MOFIN_DATA.parent))
from strategy_lifecycle import regenerate_all
r = regenerate_all(stdout=True)
if r: print(f"完成: {r.get('ok',0)}/{r.get('total',0)} 只策略已生成")
except Exception as e:
print(f"WARN: {e}")
def list_strategies():
r = api("/api/v1/alphasift/strategies")
if r and r.get("strategies"):
for s in r["strategies"]:
print(f" {s['id']:22s} {s.get('name','?'):10s} {s.get('description','')[:60]}")
def main():
global MIN_SCORE
p = argparse.ArgumentParser(description="AlphaSift → MoFin")
p.add_argument("--strategy", default=DEFAULT_STRATEGIES)
p.add_argument("--market", default=DEFAULT_MARKET)
p.add_argument("--max", type=int, default=DEFAULT_MAX)
p.add_argument("--min-score", type=int, default=MIN_SCORE)
p.add_argument("--dry-run", action="store_true")
args = p.parse_args()
MIN_SCORE = args.min_score
if args.strategy == "list": list_strategies()
else: run_all(args.strategy, args.market, args.max, args.dry_run)
if __name__ == "__main__":
main()