feat: mo_alphasift_bridge v2 — async mode + source tracking (date/strategy/notes)

This commit is contained in:
hmo
2026-06-30 01:57:01 +08:00
parent 897bc54bab
commit 790b0f9acc
+138 -81
View File
@@ -3,20 +3,19 @@
mo_alphasift_bridge.py — AlphaSift 选股 → MoFin 自选池自动对接
流程:
1. 调用 DSA AlphaSift API → 获取候选股列表
2. 过滤(评分阈值 + 去重自选池/持仓
3. 写入 MoFin watchlist.json
4. 调用 regenerate_all() 生成策略 → decisions.json
5. price_monitor 自动接管监控
1. 提交 AlphaSift 异步选股任务 → 轮询结果
2. 过滤(评分阈值 + 去重)
3. 写入 MoFin watchlist.json(含来源/日期/策略备注)
4. 调用 regenerate_all() 生成策略
5. price_monitor 自动接管
用法:
python3 mo_alphasift_bridge.py # 默认策略 balanced_alpha
python3 mo_alphasift_bridge.py --strategy dual_low # 指定策略
python3 mo_alphasift_bridge.py # 默认策略
python3 mo_alphasift_bridge.py --dry-run # 只看不写
python3 mo_alphasift_bridge.py --min-score 6 # 最低评分阈值
python3 mo_alphasift_bridge.py --min-score 6 # 最低评分
python3 mo_alphasift_bridge.py --strategy list # 列出所有策略
集成到 cron:
0 10 * * 1-5 cd /home/hmo/MoFin && python3 mo_alphasift_bridge.py
cron: 0 10 * * 1-5 cd /home/hmo/MoFin && python3 mo_alphasift_bridge.py
"""
import sys, os, json, argparse, urllib.request, time
@@ -33,8 +32,10 @@ PORTFOLIO_PATH = MOFIN_DATA / "portfolio.json"
DEFAULT_STRATEGY = "balanced_alpha"
DEFAULT_MARKET = "cn"
DEFAULT_MAX = 15
MIN_SCORE = 5 # 最低评分
MAX_ADD = 5 # 每次最多加几只
MIN_SCORE = 5
MAX_ADD = 5
POLL_INTERVAL = 5
POLL_TIMEOUT = 300
def load_json(path):
@@ -49,135 +50,191 @@ def save_json(path, data):
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def call_api(endpoint, method="GET", body=None):
def api(endpoint, method="GET", body=None):
url = f"{DSA_API}{endpoint}"
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(url, data=data, method=method,
headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=120) as r:
with urllib.request.urlopen(req, timeout=30) as r:
return json.loads(r.read())
except Exception as e:
print(f" API 错误: {e}")
return None
# ── 核心逻辑 ─────────────────────────────────────────────────────────
def get_existing_codes():
codes = set()
for path in [WATCHLIST_PATH, PORTFOLIO_PATH]:
data = load_json(path)
if not data:
continue
key = "stocks" if "watchlist" in str(path) else "holdings"
for item in data.get(key, []):
c = str(item.get("code", "")).strip()
if c:
codes.add(c)
return codes
def run_screen(strategy, market, max_results, dry_run=False):
"""运行 AlphaSift 选股并写入 MoFin 自选池"""
now = datetime.now()
date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%Y-%m-%d %H:%M")
# 1. AlphaSift 选股
print(f"🔍 AlphaSift 选股: 策略={strategy} 市场={market}", flush=True)
result = call_api("/api/v1/alphasift/screen", "POST", {
# 1. 提交异步任务
print(f"AlphaSift 选股: {strategy} (异步)", flush=True)
task = api("/api/v1/alphasift/screen/tasks", "POST", {
"strategy": strategy, "market": market, "max_results": max_results
})
if not result or not result.get("candidates"):
print(" ❌ 无候选股或 AlphaSift 返回空")
if not task or not task.get("task_id"):
print(" FAIL: 提交任务失败")
return
candidates = result["candidates"]
print(f" AlphaSift 返回 {len(candidates)} 只候选股")
if result.get("llm_market_view"):
print(f" 市场观点: {result['llm_market_view'][:80]}...")
task_id = task["task_id"]
print(f" 任务ID: {task_id[:12]}...", flush=True)
# 2. 加载现有自选池和持仓,去重
wl = load_json(WATCHLIST_PATH) or {"stocks": []}
pf = load_json(PORTFOLIO_PATH) or {"holdings": []}
# 2. 轮询
waited = 0
result = None
while waited < POLL_TIMEOUT:
time.sleep(POLL_INTERVAL)
waited += POLL_INTERVAL
status = api(f"/api/v1/alphasift/screen/tasks/{task_id}")
if not status:
continue
s = status.get("status", "")
if s == "completed":
print(f" 完成 ({waited}s)")
result = status.get("result", {})
break
elif s == "failed":
print(f" FAIL: {status.get('error', '')}")
return
else:
pct = status.get("progress", 0)
print(f" ...{s} ({pct}%)", flush=True)
else:
print(f" FAIL: 超时 ({POLL_TIMEOUT}s)")
return
existing_codes = set()
for s in wl.get("stocks", []):
existing_codes.add(str(s.get("code", "")).strip())
for h in pf.get("holdings", []):
existing_codes.add(str(h.get("code", "")).strip())
candidates = result.get("candidates", [])
if not candidates:
print(" 无候选股")
return
# 3. 过滤:评分达标 + 不在现有自选/持仓中
market_view = result.get("llm_market_view", "")
print(f" 返回 {len(candidates)} 只候选股")
if market_view:
print(f" 市场观点: {market_view[:100]}...")
# 3. 过滤 + 去重
existing = get_existing_codes()
new_stocks = []
skipped = 0
for c in candidates:
code = str(c.get("code", "")).strip()
score = c.get("score", 0) or c.get("llm_score", 0) or 0
name = c.get("name", "") or c.get("llm_thesis", "")[:20] or code
name = c.get("name", "") or c.get("title", "") or code
reason = c.get("reason", "") or c.get("llm_thesis", "")
catalysts = c.get("llm_catalysts", "")
risks = c.get("llm_risks", "")
if score < MIN_SCORE:
skipped += 1
continue
if code in existing_codes:
print(f"{code} {name} (已在自选/持仓中)")
if code in existing:
continue
entry = {
notes_parts = [f"AlphaSift/{strategy} 评分{score}"]
if reason:
notes_parts.append(reason[:150])
if catalysts:
notes_parts.append(f"催化剂:{catalysts[:100]}")
new_stocks.append({
"code": code,
"name": name,
"price": c.get("price", 0),
"tag": f"alpha_sift_{strategy}",
"source": "alpha_sift",
"added_at": datetime.now().strftime("%Y-%m-%d %H:%M"),
"alpha_score": score,
"alpha_reason": (c.get("reason", "") or c.get("llm_thesis", ""))[:200],
"source_detail": {
"strategy": strategy,
"score": score,
"date": date_str,
"reason": reason[:300],
"catalysts": catalysts[:200] if catalysts else "",
"risks": risks[:200] if risks else "",
"market_view": market_view[:300] if market_view else "",
},
"notes": " | ".join(notes_parts),
"added_at": time_str,
"added_by": "AlphaSift",
"analysis": {},
}
new_stocks.append(entry)
existing_codes.add(code)
})
existing.add(code)
print(f" 过滤: {len(new_stocks)} 新标的 (跳过{skipped}只评分不足)")
if not new_stocks:
print(" ️ 没有符合条件的新标的")
print(" 符合条件的新标的")
return
# 限制数量
new_stocks = new_stocks[:MAX_ADD]
print(f"\n ✅ 新增 {len(new_stocks)} 只到自选池:")
print(f"\n 新增 {len(new_stocks)} 只到自选池:")
for s in new_stocks:
print(f" {s['code']} {s['name']} (评分{s['alpha_score']})")
print(f" {s['code']} {s['name']} (评分{s['source_detail']['score']})")
if s['source_detail'].get('reason'):
print(f" {s['source_detail']['reason'][:120]}")
if dry_run:
print("\n [DRY RUN] 未实际写入")
print("\n [DRY RUN] 未写入")
return
# 4. 写入 watchlist.json
# 4. 写入
wl = load_json(WATCHLIST_PATH) or {"stocks": []}
wl["stocks"].extend(new_stocks)
wl["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M")
wl["updated_at"] = time_str
save_json(WATCHLIST_PATH, wl)
print(f" 已写入 {WATCHLIST_PATH}")
print(f"\n 已写入 {WATCHLIST_PATH}")
# 5. 触发策略生成
print("\n🔄 调用 regenerate_all() 生成策略...")
# 5. 策略生成
print("\n 调用 regenerate_all() 生成策略...")
try:
sys.path.insert(0, str(MOFIN_DATA.parent))
from strategy_lifecycle import regenerate_all
result = regenerate_all(stdout=True)
if result:
print(f" 策略生成完成: total={result.get('total',0)} ok={result.get('ok',0)}")
r = regenerate_all(stdout=True)
if r:
print(f" 完成: {r.get('ok',0)}/{r.get('total',0)} 只策略已生成")
print(f" price_monitor 将在 2 分钟内自动开始监控")
except Exception as e:
print(f" ⚠️ regenerate_all() 失败: {e}")
print(" 股票已加入自选池,下次 cron 会自动生成策略")
print(f" WARN: regenerate_all 失败: {e}")
print(f" 股票已加入自选池,下次 cron 会自动处理")
# ── CLI ──────────────────────────────────────────────────────────────
def list_strategies():
r = api("/api/v1/alphasift/strategies")
if r and r.get("strategies"):
print("AlphaSift 策略:")
for s in r["strategies"]:
print(f" {s['id']:20s} {s.get('name','?'):15s} {s.get('description','')[:60]}")
def main():
parser = argparse.ArgumentParser(description="AlphaSift → MoFin 自动对接")
parser.add_argument("--strategy", default=DEFAULT_STRATEGY,
help=f"选股策略 (默认: {DEFAULT_STRATEGY})")
parser.add_argument("--market", default=DEFAULT_MARKET,
help=f"市场 (默认: {DEFAULT_MARKET})")
parser.add_argument("--max", type=int, default=DEFAULT_MAX,
help=f"AlphaSift 最多返回 (默认: {DEFAULT_MAX})")
parser.add_argument("--min-score", type=int, default=MIN_SCORE,
help=f"最低评分 (默认: {MIN_SCORE})")
parser.add_argument("--dry-run", action="store_true",
help="只看不写")
global MIN_SCORE
parser = argparse.ArgumentParser(description="AlphaSift → MoFin")
parser.add_argument("--strategy", default=DEFAULT_STRATEGY)
parser.add_argument("--market", default=DEFAULT_MARKET)
parser.add_argument("--max", type=int, default=DEFAULT_MAX)
parser.add_argument("--min-score", type=int, default=MIN_SCORE)
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
MIN_SCORE = args.min_score
# 列出可用策略
if args.strategy == "list":
r = call_api("/api/v1/alphasift/strategies")
if r and r.get("strategies"):
print("可用策略:")
for s in r["strategies"]:
print(f" {s['id']}: {s.get('name','?')} - {s.get('description','')[:60]}")
list_strategies()
return
run_screen(args.strategy, args.market, args.max, args.dry_run)