#!/usr/bin/env python3 """ review_needed_watchdog.py — review_needed 策略自动跟进 每30分钟扫描DB中 status=review_needed 的策略: 1. 对每只策略调用 per_stock_reassess 重评 2. 重评后 status 变 active → 通过,写入 changelog 3. 还是 review_needed → retry_count+=1 4. retry_count>=3 → 推 Dad 人工介入 """ import sys, json, os, datetime sys.path.insert(0, "/home/hmo/web-dashboard") os.chdir("/home/hmo/MoFin") DEC_PATH = "/home/hmo/web-dashboard/data/decisions.json" RETRY_FILE = "/home/hmo/web-dashboard/data/review_needed_retry.json" XMPP_USER = "hmo@yoin.fun" XMPP_BRIDGE = "http://192.168.1.246:5805/xmpp/send" def load_retry(): try: return json.load(open(RETRY_FILE)) except: return {} def save_retry(data): json.dump(data, open(RETRY_FILE, "w"), indent=2) def push_xmpp(text): try: from urllib.request import Request, urlopen payload = json.dumps({"to": XMPP_USER, "body": text.strip()}).encode() req = Request(XMPP_BRIDGE, data=payload, headers={"Content-Type": "application/json"}) urlopen(req, timeout=5) print(f" [XMPP] 已推送") except Exception as e: print(f" [XMPP推送失败] {e}") def main(): dec = json.load(open(DEC_PATH)) review_list = [d for d in dec.get("decisions", []) if d.get("status") == "review_needed"] retry_data = load_retry() today = datetime.date.today().isoformat() if not review_list: print("[SILENT] 无待处理策略") return print(f"发现 {len(review_list)} 只 review_needed 策略") changes = False for d in review_list: code = d["code"] name = d.get("name", code) retries = retry_data.get(code, {}).get("count", 0) + 1 retry_data[code] = {"count": retries, "last_attempt": today} if retries >= 3: print(f" ⛔ {name}({code}) 已重试{retries}次,跳过") continue print(f" 🔄 {name}({code}) 第{retries}次重试...") import subprocess r = subprocess.run( [sys.executable, "/home/hmo/MoFin/scripts/per_stock_reassess.py", code], capture_output=True, text=True, timeout=30 ) out = (r.stdout or "") + (r.stderr or "") print(f" {out[:200]}") # 重读决策 dec2 = json.load(open(DEC_PATH)) for d2 in dec2.get("decisions", []): if d2["code"] == code: if d2.get("status") == "active": print(f" ✅ {name}({code}) 重评通过!") retry_data[code] = {"count": 0, "last_attempt": today} changes = True elif d2.get("status") == "review_needed": issues = d2.get("quality_issues", {}).get("critical", []) print(f" ❌ {name}({code}) 仍 review_needed ({issues})") break # 3次以上失败 → 推 Dad dead = [code for code, v in retry_data.items() if v.get("count", 0) >= 3] if dead: names = [] for code in dead: for d in dec.get("decisions", []): if d["code"] == code: names.append(f"{d.get('name', code)}({code})") break msg = f"【知微】策略质量审核 {today}\n以下策略3次自动重评均失败,需人工介入:\n" for n in names: msg += f" - {n}\n" msg += "\n原因可能是:缺少技术面数据 / 行业信息不完整 / 利润保护目标无法确定。" push_xmpp(msg) save_retry(retry_data) if not changes: print("[SILENT] 状态无变化") if __name__ == "__main__": main()