Files
MoFin/scripts/per_stock_reassess.py
T

164 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
per_stock_reassess.py — 按个股触发重评
对每只传进来的 code 执行 reassess_strategy(),然后只更新
decisions.json 中对应的那一条记录。不碰 portfolio.json,不跑全量。
"""
import sys, json, os, re
sys.path.insert(0, "/home/hmo/web-dashboard")
from strategy_lifecycle import reassess_with_context as reassess_strategy
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
def main():
codes = [a for a in sys.argv[1:] if not a.startswith("-")]
if not codes:
print("[FULL] 无指定编码,跑全量 regenerate_all()")
from strategy_lifecycle import regenerate_all
regenerate_all(stdout=False)
print("[FULL] 全量重评完成")
return
# 读现有 decisions
with open(DECISIONS_PATH) as f:
raw = json.load(f)
decisions_map = {d["code"]: d for d in raw.get("decisions", []) if d.get("code")}
ok = 0
errors = 0
skipped = 0
for code in codes:
entry = decisions_map.get(code)
if not entry:
print(f"[SKIP] {code}: 不在 decisions.json 中")
errors += 1
continue
try:
# Always fetch live price for accurate reassessment
price = 0
try:
# 价格从 DB 读取(price_monitor 每2分钟更新,唯一价格入口)
code_raw = entry.get("code", "")
price = 0
import sqlite3
db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db')
db.row_factory = sqlite3.Row
row = db.execute("SELECT price FROM holdings WHERE code=? AND is_active=1", (code_raw,)).fetchone()
if not row:
row = db.execute("SELECT price FROM watchlist_stocks WHERE code=? AND is_active=1", (code_raw,)).fetchone()
if not row:
row = db.execute("SELECT price FROM holding_strategies WHERE code=? AND status='active' ORDER BY updated_at DESC LIMIT 1", (code_raw,)).fetchone()
if row:
price = row['price'] or 0
db.close()
if price > 0:
print(f" 实时价: {price} (来自DB)")
else:
# fallback to portfolio.json
with open("/home/hmo/web-dashboard/data/portfolio.json") as _pf:
_pf_data = json.load(_pf)
for _h in _pf_data.get("holdings", []):
if _h["code"] == code_raw:
price = float(_h.get("price", 0))
break
if price <= 0:
price = entry.get("current_price") or entry.get("price") or 0
except Exception as e:
print(f" 价格获取失败: {e}", file=sys.stderr)
price = entry.get("current_price") or entry.get("price") or 0
# Price diff debounce: skip reassessment if price changed < 1% since last update
last_price = entry.get("last_reassessed_price", 0)
if last_price > 0 and price > 0:
diff_pct = abs(price - last_price) / last_price * 100
if diff_pct < 1.0:
print(f" 价差仅{diff_pct:.2f}% (<1%),跳过重评(上次价={last_price},现价={price}")
skipped += 1
continue
result = reassess_strategy(
code=code,
name=entry.get("name", ""),
price=price,
cost=entry.get("cost", 0),
shares=entry.get("shares", 0),
current_action=entry.get("action", ""),
is_watchlist=entry.get("type", "") in ("自选策略", "watchlist"),
)
if result and result.get("action"):
# 持仓股止损不下移(移动止损规则):已有仓位的止损只上不下
is_held = entry.get("cost", 0) > 0 and entry.get("shares", 0) > 0 and \
entry.get("type", "") not in ("自选策略", "watchlist")
old_stop = entry.get("stop_loss", 0)
new_stop = result.get("stop_loss", 0)
if is_held and old_stop > 0 and new_stop > 0 and new_stop < old_stop:
print(f" 移动止损保护: {new_stop}→保持{old_stop} (持仓止损不下移)")
result["stop_loss"] = old_stop
# 同时更新 action 字符串中的止损值
act = result.get("action", "")
if act:
act = re.sub(r'止损[\d.]+', f'止损{old_stop}', act)
result["action"] = act
# 更新 decisions_map 中对应的条目
updated = entry.copy()
# 币种标记:HK股保留HKD原始值,A股为CNY
is_hk = len(str(code)) == 5 and str(code)[0] in '01'
updated.update({
"action": result["action"],
"stop_loss": result.get("stop_loss", entry.get("stop_loss")),
"entry_low": result.get("entry_low", entry.get("entry_low")),
"entry_high": result.get("entry_high", entry.get("entry_high")),
"take_profit": result.get("take_profit"),
"tech_snapshot": result.get("tech_snapshot", entry.get("tech_snapshot")),
"timing_signal": result.get("timing_signal", entry.get("timing_signal")),
"rr_ratio": result.get("rr_ratio", entry.get("rr_ratio", 0)),
"status": result.get("status", "updated"),
"price": price,
"currency": "HKD" if is_hk else "CNY",
})
# Save last reassessed price for debounce tracking
updated["last_reassessed_price"] = price
decisions_map[code] = updated
# ——— 初始化多分支策略树 ———
try:
sys.path.insert(0, '/home/hmo/MoFin')
from strategy_tree import init_default_branches
branches = init_default_branches(
code,
entry.get('name', ''),
result.get('entry_low', 0),
result.get('entry_high', 0),
result.get('stop_loss', 0),
result.get('take_profit', 0),
)
st = updated.setdefault('strategy_tree', {})
st['branches'] = branches
except Exception:
pass
print(f"[OK] {code} {entry.get('name','')}: {result['action'][:80]}")
ok += 1
else:
print(f"[SYNCED] {code}: 无变更")
ok += 1
except Exception as e:
print(f"[ERROR] {code}: {e}", file=sys.stderr)
errors += 1
# 写回 decisions.json(只更新被修改的那条,其余保留原样)
raw["decisions"] = list(decisions_map.values())
raw["total"] = len(raw["decisions"])
from datetime import datetime
raw["regenerated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M")
with open(DECISIONS_PATH, "w") as f:
json.dump(raw, f, ensure_ascii=False, indent=2)
print(f"[DONE] {ok}成功 {skipped}跳过 {errors}失败")
if __name__ == "__main__":
main()