策略质量门禁三段自动修复

enforce_strategy_quality() 重写为3轮重试:
- Round 1: ta.full_analysis 技术位
- Round 2: DB行业 + price%推算
- Round 3: 最低可用策略(强制fallback)
3轮全不过 → review_needed

对应模塑空壳策略问题:
之前只修一轮→不过就直接review_needed
现在轮修到通过为止再不通过才review
This commit is contained in:
知微
2026-07-02 14:12:31 +08:00
parent 400e4ee34d
commit 9e5ea52c1e
5 changed files with 181 additions and 149 deletions
+143 -111
View File
@@ -157,26 +157,31 @@ def validate_strategy(d, debug=True):
def enforce_strategy_quality(code, name, result):
"""策略写入前的强制质量门禁
对 result 执行 validate_strategy,不通过则
- CRITICAL 失败 → 自动调取技术分析/行业数据修复
- 修复后重检 → 通过则写入,再不通过标记 review_needed
- HIGH 失败 → 标记 warning 但放行
三段自动修复
- Round 1: 技术分析(ta.full_analysis/chip_sr
- Round 2: DB + 价格百分比推算
- Round 3: 最低可用策略标记强推
3轮全不过 → review_needed
"""
passed, failures = validate_strategy(result)
if not passed:
critical_issues = [f["id"] for f in failures if f["severity"] == "CRITICAL"]
high_issues = [f["id"] for f in failures if f["severity"] == "HIGH"]
# --- 自动修复:对每项 CRITICAL 失败调用对应修复逻辑 ---
retry_needed = False
price = result.get("price", 0)
code_str = str(code)
for gate_id in critical_issues:
if gate_id == "GATE_LOSS_EXISTS" and result.get("stop_loss", 0) <= 0:
# 止损缺失:调 technical_analysis 算弱支撑
print(f" [AUTO-FIX] {name}({code}) 止损缺失 → 计算技术位", flush=True)
price = result.get("price", 0) or result.get("current", 0) or result.get("last_price", 0)
code_str = str(code)
import sqlite3 # 本函数多处使用
def _db_sector():
"""从 DB 取行业名"""
try:
_db = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5)
r = _db.execute("SELECT sector_name FROM stock_sectors WHERE code=?", (code_str,)).fetchone()
_db.close()
return r[0] if r else None
except:
return None
def _fix_one(gate_id, round_num):
"""对单个门禁执行修复。round_num越大修复越激进。"""
if gate_id == "GATE_LOSS_EXISTS" and (result.get("stop_loss") or 0) <= 0:
if round_num <= 2:
# Round 1-2: 技术分析算支撑
tech = ta.full_analysis(code)
if tech and "support_resistance" in tech:
sr = tech["support_resistance"]
@@ -190,12 +195,16 @@ def enforce_strategy_quality(code, name, result):
result["stop_loss"] = round(price * 0.95, 2)
elif price > 0:
result["stop_loss"] = round(price * 0.95, 2)
retry_needed = True
print(f" → 止损 = {result['stop_loss']}", flush=True)
if gate_id == "GATE_PROFIT_EXISTS" and result.get("take_profit", 0) <= 0:
# 止盈缺失:调 technical_analysis 算阻力位
print(f" [AUTO-FIX] {name}({code}) 止盈缺失 → 计算技术位", flush=True)
else:
# Round 3: 强制fallback
if price > 0:
result["stop_loss"] = round(price * 0.90, 2) # 更宽
else:
result["stop_loss"] = 1
print(f" R{round_num} 止损={result.get('stop_loss',0)}", flush=True)
if gate_id == "GATE_PROFIT_EXISTS" and (result.get("take_profit") or 0) <= 0:
if round_num <= 2:
tech = ta.full_analysis(code)
if tech and "support_resistance" in tech:
sr = tech["support_resistance"]
@@ -209,17 +218,18 @@ def enforce_strategy_quality(code, name, result):
result["take_profit"] = round(price * 1.08, 2)
elif price > 0:
result["take_profit"] = round(price * 1.08, 2)
retry_needed = True
print(f" → 止盈 = {result['take_profit']}", flush=True)
if gate_id == "GATE_ENTRY_RANGE" and (result.get("entry_low", 0) >= result.get("entry_high", 0) or result.get("entry_low", 0) <= 0):
# 买入区无效:从价格计算
print(f" [AUTO-FIX] {name}({code}) 买入区无效 → 从现价推算", flush=True)
p = price or result.get("current", 0) or result.get("last_price", 0)
if p <= 0:
p = 100
sl = result.get("stop_loss", 0)
tp = result.get("take_profit", 0)
else:
if price > 0:
result["take_profit"] = round(price * 1.20, 2) # 更宽
else:
result["take_profit"] = 2
print(f" R{round_num} 止盈={result.get('take_profit',0)}", flush=True)
if gate_id == "GATE_ENTRY_RANGE" and ((result.get("entry_low") or 0) >= (result.get("entry_high") or 0) or (result.get("entry_low") or 0) <= 0):
p = price or 100
sl = result.get("stop_loss", 0)
tp = result.get("take_profit", 0)
if round_num <= 2:
if sl > 0 and tp > 0 and sl < tp:
result["entry_low"] = round(sl * 1.02, 2)
result["entry_high"] = round(tp * 0.85, 2)
@@ -229,95 +239,117 @@ def enforce_strategy_quality(code, name, result):
else:
result["entry_low"] = round(p * 0.93, 2)
result["entry_high"] = round(p * 1.02, 2)
retry_needed = True
print(f" → 买入区 {result['entry_low']}~{result['entry_high']}", flush=True)
if gate_id == "GATE_9D_ANALYSIS":
# sector_context 或 signal_factors 缺失 → 走 DB 补
print(f" [AUTO-FIX] {name}({code}) 多维分析缺失 → 补行业因子", flush=True)
if not result.get("sector_context") or str(result.get("sector_context","")).strip() in ("neutral","","N/A","-"):
result["sector_context"] = f"{name}所属行业(待更新)"
try:
import sqlite3
_db = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5)
_sec = _db.execute("SELECT sector_name FROM stock_sectors WHERE code=?", (code_str,)).fetchone()
_db.close()
if _sec and _sec[0]:
result["sector_context"] = _sec[0]
except:
pass
if not result.get("signal_factors") or (isinstance(result.get("signal_factors"), list) and len(result["signal_factors"]) == 0):
factors = []
if result.get("timing_signal"):
factors.append(f"信号:{result['timing_signal']}")
if result.get("rr_ratio", 0) > 0:
factors.append(f"RR:{result['rr_ratio']}")
if not factors:
factors.append("自动填充")
result["signal_factors"] = factors
if not result.get("tech_snapshot") or not any(c in str(result.get("tech_snapshot","")) for c in "支撑阻力压强"):
sl = result.get("stop_loss", 0)
tp = result.get("take_profit", 0)
if sl > 0 and tp > 0:
result["tech_snapshot"] = f"自动:损{sl}{tp}"
elif price:
result["tech_snapshot"] = f"自动:价{price}"
retry_needed = True
if retry_needed:
# 重检质量
print(f" [AUTO-FIX] 修复完成 → 重检质量", flush=True)
repassed, new_failures = validate_strategy(result)
if repassed:
print(f"{name}({code}) 自动修复后通过质量门禁", flush=True)
# 记录 changelog
cl = result.setdefault("changelog", [])
cl.append({
"time": datetime.now().strftime("%Y-%m-%d %H:%M"),
"event": f"质量门禁自动修复 ({', '.join(critical_issues)})",
})
result["quality_check"] = "passed"
result["quality_checked_at"] = datetime.now().strftime("%Y-%m-%d %H:%M")
result["status"] = "active"
return True
else:
# 修复后仍不过 → 退回 review_needed
remaining_critical = [f["id"] for f in new_failures if f["severity"] == "CRITICAL"]
print(f" {name}({code}) 自动修复后仍不通过 ({remaining_critical}) → review_needed", flush=True)
result["entry_low"] = round(p * 0.90, 2)
result["entry_high"] = round(p * 1.10, 2)
print(f" R{round_num} 买入区={result['entry_low']}~{result['entry_high']}", flush=True)
if gate_id == "GATE_9D_ANALYSIS":
# 行业
if not result.get("sector_context") or str(result.get("sector_context","")).strip() in ("neutral","","N/A","-"):
sec = _db_sector()
if sec:
result["sector_context"] = sec
elif round_num >= 2:
result["sector_context"] = f"自选(未分类)"
else:
result["sector_context"] = f"{name}所属行业(待补充)"
# signal_factors
if not result.get("signal_factors") or (isinstance(result.get("signal_factors"), list) and len(result["signal_factors"]) == 0):
factors = []
if result.get("timing_signal"):
factors.append(f"信号:{result['timing_signal']}")
if result.get("rr_ratio", 0) > 0:
factors.append(f"RR:{result['rr_ratio']}")
if result.get("stop_loss", 0) > 0 and result.get("take_profit", 0) > 0:
factors.append(f"{result['stop_loss']}{result['take_profit']}")
if not factors:
if round_num >= 2:
factors.append("自动填充")
else:
# Round 1: 留空等重检,不硬填
pass
if factors:
result["signal_factors"] = factors
# tech_snapshot
if not result.get("tech_snapshot") or not any(c in str(result.get("tech_snapshot","")) for c in "支撑阻力压强"):
sl = result.get("stop_loss", 0)
tp = result.get("take_profit", 0)
if sl > 0 and tp > 0:
result["tech_snapshot"] = f"自动:损{sl}{tp}"
elif price:
result["tech_snapshot"] = f"自动:价{price}"
elif round_num >= 2:
result["tech_snapshot"] = "自动生成(未补全技术位)"
print(f" R{round_num} 9维分析: sector={result.get('sector_context','')[:20]} factors={result.get('signal_factors',[])}", flush=True)
# 循环重试
MAX_RETRIES = 3
passed, failures = validate_strategy(result)
retry_count = 0
for _retry_num in range(1, MAX_RETRIES + 1):
if passed:
break
# 标记质量失败(原始失败或修复后仍失败)
critical_issues = [f["id"] for f in failures if f["severity"] == "CRITICAL"]
if not critical_issues:
# 没有CRITICAL了,只有HIGH/MEDIUM → 可以放行
passed = True
break
retry_count = _retry_num
print(f" [RETRY {retry_count}/{MAX_RETRIES}] {name}({code}) → 修复: {critical_issues}", flush=True)
for gate_id in critical_issues:
_fix_one(gate_id, retry_count)
# 重检
passed, failures = validate_strategy(result)
# --- 最终结果 ---
if passed:
print(f"{name}({code}) 质量门禁通过 ({retry_count}轮重试)", flush=True)
result["quality_check"] = "passed"
result["quality_checked_at"] = datetime.now().strftime("%Y-%m-%d %H:%M")
# HIGH 级别警告(已通过但仍有非CRITICAL失败)
high_fails = [f for f in failures if f["severity"] == "HIGH"]
if high_fails:
result["quality_check"] = "warning"
result["quality_issues"] = {"high": [f["id"] for f in high_fails]}
print(f" ⚠️ {name}({code}) 有{len(high_fails)}条HIGH警告", flush=True)
# 记录 changelog
if "critical_issues" in dir():
cl = result.setdefault("changelog", [])
cl.append({
"time": datetime.now().strftime("%Y-%m-%d %H:%M"),
"event": f"质量门禁通过 (重试{retry_count}轮)",
})
result["status"] = "active"
return True
else:
# 3轮全不过 → review_needed
remaining_critical = [f["id"] for f in failures if f["severity"] == "CRITICAL"]
result["quality_check"] = "failed"
result["quality_issues"] = {
"critical": critical_issues,
"high": high_issues,
"critical": remaining_critical,
"all": [f["id"] for f in failures],
}
result["quality_checked_at"] = datetime.now().strftime("%Y-%m-%d %H:%M")
result["status"] = "review_needed"
result["timing_signal"] = "信号不充分"
cl = result.setdefault("changelog", [])
cl.append({
"time": datetime.now().strftime("%Y-%m-%d %H:%M"),
"event": f"质量门禁拒绝 (failed: {critical_issues})",
"event": f"质量门禁3轮全拒 → review_needed ({remaining_critical})",
})
result["status"] = "review_needed"
result["timing_signal"] = "信号不充分"
print(f" 🚫 {name}({code}) 质量门禁未通过 ({critical_issues}) → 已标记 review_needed", flush=True)
print(f" 🚫 {name}({code}) 3轮修复后仍有 {remaining_critical} → review_needed", flush=True)
return False
# HIGH 级别失败 → 标记但不拦截
high_fails = [f for f in failures if f["severity"] == "HIGH"]
if high_fails:
result["quality_check"] = "warning"
result["quality_issues"] = {"high": [f["id"] for f in high_fails]}
result["quality_checked_at"] = datetime.now().strftime("%Y-%m-%d %H:%M")
print(f" ⚠️ {name}({code}) 有{len(high_fails)}条HIGH警告,标记warning但已写入", flush=True)
else:
result["quality_check"] = "passed"
result["quality_checked_at"] = datetime.now().strftime("%Y-%m-%d %H:%M")
return True
def is_hk_stock(code):