Files
MoFin/mo_alphasift_bridge.py
T

186 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
mo_alphasift_bridge.py — AlphaSift 选股 → MoFin 自选池自动对接
流程:
1. 调用 DSA AlphaSift API → 获取候选股列表
2. 过滤(评分阈值 + 去重自选池/持仓)
3. 写入 MoFin watchlist.json
4. 调用 regenerate_all() 生成策略 → decisions.json
5. price_monitor 自动接管监控
用法:
python3 mo_alphasift_bridge.py # 默认策略 balanced_alpha
python3 mo_alphasift_bridge.py --strategy dual_low # 指定策略
python3 mo_alphasift_bridge.py --dry-run # 只看不写
python3 mo_alphasift_bridge.py --min-score 6 # 最低评分阈值
集成到 cron:
0 10 * * 1-5 cd /home/hmo/MoFin && python3 mo_alphasift_bridge.py
"""
import sys, os, json, argparse, urllib.request, time
from datetime import datetime
from pathlib import Path
# ── 配置 ─────────────────────────────────────────────────────────────
DSA_API = "http://127.0.0.1:8001"
MOFIN_DATA = Path("/home/hmo/web-dashboard/data")
WATCHLIST_PATH = MOFIN_DATA / "watchlist.json"
PORTFOLIO_PATH = MOFIN_DATA / "portfolio.json"
DEFAULT_STRATEGY = "balanced_alpha"
DEFAULT_MARKET = "cn"
DEFAULT_MAX = 15
MIN_SCORE = 5 # 最低评分
MAX_ADD = 5 # 每次最多加几只
def load_json(path):
try:
return json.loads(path.read_text(encoding="utf-8"))
except:
return None
def save_json(path, data):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def call_api(endpoint, method="GET", body=None):
url = f"{DSA_API}{endpoint}"
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(url, data=data, method=method,
headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=120) as r:
return json.loads(r.read())
except Exception as e:
print(f"API 错误: {e}")
return None
# ── 核心逻辑 ─────────────────────────────────────────────────────────
def run_screen(strategy, market, max_results, dry_run=False):
"""运行 AlphaSift 选股并写入 MoFin 自选池"""
# 1. AlphaSift 选股
print(f"🔍 AlphaSift 选股: 策略={strategy} 市场={market}", flush=True)
result = call_api("/api/v1/alphasift/screen", "POST", {
"strategy": strategy, "market": market, "max_results": max_results
})
if not result or not result.get("candidates"):
print(" ❌ 无候选股或 AlphaSift 返回空")
return
candidates = result["candidates"]
print(f" AlphaSift 返回 {len(candidates)} 只候选股")
if result.get("llm_market_view"):
print(f" 市场观点: {result['llm_market_view'][:80]}...")
# 2. 加载现有自选池和持仓,去重
wl = load_json(WATCHLIST_PATH) or {"stocks": []}
pf = load_json(PORTFOLIO_PATH) or {"holdings": []}
existing_codes = set()
for s in wl.get("stocks", []):
existing_codes.add(str(s.get("code", "")).strip())
for h in pf.get("holdings", []):
existing_codes.add(str(h.get("code", "")).strip())
# 3. 过滤:评分达标 + 不在现有自选/持仓中
new_stocks = []
for c in candidates:
code = str(c.get("code", "")).strip()
score = c.get("score", 0) or c.get("llm_score", 0) or 0
name = c.get("name", "") or c.get("llm_thesis", "")[:20] or code
if score < MIN_SCORE:
continue
if code in existing_codes:
print(f"{code} {name} (已在自选/持仓中)")
continue
entry = {
"code": code,
"name": name,
"price": c.get("price", 0),
"tag": f"alpha_sift_{strategy}",
"source": "alpha_sift",
"added_at": datetime.now().strftime("%Y-%m-%d %H:%M"),
"alpha_score": score,
"alpha_reason": (c.get("reason", "") or c.get("llm_thesis", ""))[:200],
"analysis": {},
}
new_stocks.append(entry)
existing_codes.add(code)
if not new_stocks:
print(" ️ 没有符合条件的新标的")
return
# 限制数量
new_stocks = new_stocks[:MAX_ADD]
print(f"\n ✅ 新增 {len(new_stocks)} 只到自选池:")
for s in new_stocks:
print(f" {s['code']} {s['name']} (评分{s['alpha_score']})")
if dry_run:
print("\n [DRY RUN] 未实际写入")
return
# 4. 写入 watchlist.json
wl["stocks"].extend(new_stocks)
wl["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M")
save_json(WATCHLIST_PATH, wl)
print(f" 已写入 {WATCHLIST_PATH}")
# 5. 触发策略生成
print("\n🔄 调用 regenerate_all() 生成策略...")
try:
sys.path.insert(0, str(MOFIN_DATA.parent))
from strategy_lifecycle import regenerate_all
result = regenerate_all(stdout=True)
if result:
print(f" 策略生成完成: total={result.get('total',0)} ok={result.get('ok',0)}")
except Exception as e:
print(f" ⚠️ regenerate_all() 失败: {e}")
print(" 新股票已加入自选池,下次 cron 会自动生成策略")
# ── CLI ──────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="AlphaSift → MoFin 自动对接")
parser.add_argument("--strategy", default=DEFAULT_STRATEGY,
help=f"选股策略 (默认: {DEFAULT_STRATEGY})")
parser.add_argument("--market", default=DEFAULT_MARKET,
help=f"市场 (默认: {DEFAULT_MARKET})")
parser.add_argument("--max", type=int, default=DEFAULT_MAX,
help=f"AlphaSift 最多返回 (默认: {DEFAULT_MAX})")
parser.add_argument("--min-score", type=int, default=MIN_SCORE,
help=f"最低评分 (默认: {MIN_SCORE})")
parser.add_argument("--dry-run", action="store_true",
help="只看不写")
args = parser.parse_args()
# 列出可用策略
if args.strategy == "list":
r = call_api("/api/v1/alphasift/strategies")
if r and r.get("strategies"):
print("可用策略:")
for s in r["strategies"]:
print(f" {s['id']}: {s.get('name','?')} - {s.get('description','')[:60]}")
return
run_screen(args.strategy, args.market, args.max, args.dry_run)
if __name__ == "__main__":
main()