400e4ee34d
新增 review_needed_watchdog.py: - 每30分钟扫描 review_needed 策略 - 自动调 per_stock_reassess 重评 - 重试3次仍失败 → 推Dad XMPP人工介入 - cron: 交易日 9:30~15:30 news-flow-analysis skill 文档同步更新: - 完整 review_needed 流程链 - 自动修复→重检→跟进→上报 闭环
106 lines
3.6 KiB
Python
106 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
review_needed_watchdog.py — review_needed 策略自动跟进
|
|
|
|
每30分钟扫描DB中 status=review_needed 的策略:
|
|
1. 对每只策略调用 per_stock_reassess 重评
|
|
2. 重评后 status 变 active → 通过,写入 changelog
|
|
3. 还是 review_needed → retry_count+=1
|
|
4. retry_count>=3 → 推 Dad 人工介入
|
|
"""
|
|
|
|
import sys, json, os, datetime
|
|
|
|
sys.path.insert(0, "/home/hmo/web-dashboard")
|
|
os.chdir("/home/hmo/MoFin")
|
|
|
|
DEC_PATH = "/home/hmo/web-dashboard/data/decisions.json"
|
|
RETRY_FILE = "/home/hmo/web-dashboard/data/review_needed_retry.json"
|
|
XMPP_USER = "hmo@yoin.fun"
|
|
XMPP_BRIDGE = "http://192.168.1.246:5805/xmpp/send"
|
|
|
|
def load_retry():
|
|
try:
|
|
return json.load(open(RETRY_FILE))
|
|
except:
|
|
return {}
|
|
|
|
def save_retry(data):
|
|
json.dump(data, open(RETRY_FILE, "w"), indent=2)
|
|
|
|
def push_xmpp(text):
|
|
try:
|
|
from urllib.request import Request, urlopen
|
|
payload = json.dumps({"to": XMPP_USER, "body": text.strip()}).encode()
|
|
req = Request(XMPP_BRIDGE, data=payload, headers={"Content-Type": "application/json"})
|
|
urlopen(req, timeout=5)
|
|
print(f" [XMPP] 已推送")
|
|
except Exception as e:
|
|
print(f" [XMPP推送失败] {e}")
|
|
|
|
def main():
|
|
dec = json.load(open(DEC_PATH))
|
|
review_list = [d for d in dec.get("decisions", []) if d.get("status") == "review_needed"]
|
|
retry_data = load_retry()
|
|
today = datetime.date.today().isoformat()
|
|
|
|
if not review_list:
|
|
print("[SILENT] 无待处理策略")
|
|
return
|
|
|
|
print(f"发现 {len(review_list)} 只 review_needed 策略")
|
|
changes = False
|
|
for d in review_list:
|
|
code = d["code"]
|
|
name = d.get("name", code)
|
|
retries = retry_data.get(code, {}).get("count", 0) + 1
|
|
retry_data[code] = {"count": retries, "last_attempt": today}
|
|
|
|
if retries >= 3:
|
|
print(f" ⛔ {name}({code}) 已重试{retries}次,跳过")
|
|
continue
|
|
|
|
print(f" 🔄 {name}({code}) 第{retries}次重试...")
|
|
import subprocess
|
|
r = subprocess.run(
|
|
[sys.executable, "/home/hmo/MoFin/scripts/per_stock_reassess.py", code],
|
|
capture_output=True, text=True, timeout=30
|
|
)
|
|
out = (r.stdout or "") + (r.stderr or "")
|
|
print(f" {out[:200]}")
|
|
|
|
# 重读决策
|
|
dec2 = json.load(open(DEC_PATH))
|
|
for d2 in dec2.get("decisions", []):
|
|
if d2["code"] == code:
|
|
if d2.get("status") == "active":
|
|
print(f" ✅ {name}({code}) 重评通过!")
|
|
retry_data[code] = {"count": 0, "last_attempt": today}
|
|
changes = True
|
|
elif d2.get("status") == "review_needed":
|
|
issues = d2.get("quality_issues", {}).get("critical", [])
|
|
print(f" ❌ {name}({code}) 仍 review_needed ({issues})")
|
|
break
|
|
|
|
# 3次以上失败 → 推 Dad
|
|
dead = [code for code, v in retry_data.items() if v.get("count", 0) >= 3]
|
|
if dead:
|
|
names = []
|
|
for code in dead:
|
|
for d in dec.get("decisions", []):
|
|
if d["code"] == code:
|
|
names.append(f"{d.get('name', code)}({code})")
|
|
break
|
|
msg = f"【知微】策略质量审核 {today}\n以下策略3次自动重评均失败,需人工介入:\n"
|
|
for n in names:
|
|
msg += f" - {n}\n"
|
|
msg += "\n原因可能是:缺少技术面数据 / 行业信息不完整 / 利润保护目标无法确定。"
|
|
push_xmpp(msg)
|
|
|
|
save_retry(retry_data)
|
|
if not changes:
|
|
print("[SILENT] 状态无变化")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|