#!/usr/bin/env python3 """ meta_growth.py — 自成长机制的元层(自我审视) 每周日 22:00 运行。分析近期改动,识别修复模式, 提出新的扫描类别和建议,扩展 hardcode_scanner 的规则。 输出 stdout → cron delivery → 知微收到后执行新规则。 """ import subprocess, json, os, re, sys from datetime import datetime, timedelta REGISTRY_PATH = "/home/hmo/web-dashboard/data/growth_registry.json" SCANNER_PATH = "/home/hmo/MoFin/scripts/hardcode_scanner.py" GIT_DIR = "/home/hmo/MoFin" def load_registry(): try: if os.path.exists(REGISTRY_PATH): return json.load(open(REGISTRY_PATH)) except: pass return {"categories": [], "meta_suggestions": [], "last_meta_run": None} def save_registry(r): os.makedirs(os.path.dirname(REGISTRY_PATH), exist_ok=True) json.dump(r, open(REGISTRY_PATH, "w"), indent=2, ensure_ascii=False) def get_recent_git_log(days=7, max_commits=50): """返回最近N天git提交的改动统计""" since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") try: r = subprocess.run( ["git", "log", f"--since={since}", "--stat", f"--max-count={max_commits}"], capture_output=True, text=True, cwd=GIT_DIR, timeout=15 ) return r.stdout except: return "" def parse_fix_patterns(git_log): """从git log中归类修复模式""" patterns = { "hardcode_cash": {"keywords": ["cash", "备用值", "fallback", "现金"], "count": 0}, "hardcode_rate": {"keywords": ["汇率", "rate", "0\\.", "HK_RATE"], "count": 0}, "hardcode_lot": {"keywords": ["每手", "lot", "手数", "股/手"], "count": 0}, "async_bug": {"keywords": ["异步", "spawn", "async", "乱序"], "count": 0}, "data_path": {"keywords": ["路径", "path", "文件不存在"], "count": 0}, "stale_strategy": {"keywords": ["重评", "reassess", "过期", "stale"], "count": 0}, "doc_missing": {"keywords": ["文档", "更新", "doc", "README"], "count": 0}, } for line in git_log.lower().split("\n"): for key, info in patterns.items(): for kw in info["keywords"]: if kw in line: info["count"] += 1 break return patterns def suggest_new_categories(patterns, registry): """基于修复模式提出新扫描类别""" suggestions = [] # 类别级别的规则 RULES = [ { "trigger": lambda p: p.get("hardcode_rate", {}).get("count", 0) >= 2, "category": "hotfix_exchange_rate", "label": "汇率硬编码", "scanner_rule": r"(?:\b0\.\d{3,}\b).*#.*(?:fallback|备用|默认)", "reason": "发现多次汇率写死修复,应持续监控 fallback 值", }, { "trigger": lambda p: p.get("async_bug", {}).get("count", 0) >= 2, "category": "race_condition", "label": "竞态/异步问题", "scanner_rule": None, "reason": "检测到多次异步乱序修复,需考虑加串行锁审计", }, { "trigger": lambda p: p.get("hardcode_cash", {}).get("count", 0) >= 1, "category": "hardcoded_asset", "label": "资产硬编码", "scanner_rule": r"(?:return|=\s*)\d{5,}\b", "reason": "出现现金硬编码,需扫描所有 return/赋值大额数字", }, ] for rule in RULES: if rule["trigger"](patterns): already = any(c["category"] == rule["category"] for c in registry.get("categories", [])) if not already: suggestions.append({ "category": rule["category"], "label": rule["label"], "scanner_rule": rule["scanner_rule"], "reason": rule["reason"], "suggested_at": datetime.now().isoformat(), }) return suggestions def add_scanner_rules(new_categories): """将新扫描规则写入 hardcode_scanner.py 的 RULES 表""" if not new_categories: return [] added = [] try: with open(SCANNER_PATH) as f: content = f.read() for cat in new_categories: rule = cat.get("scanner_rule") if not rule or not rule.strip(): continue # 检查是否已存在 if rule in content: continue # 在 RULES 列表中找到插入点 marker = "# 扩展点 — meta_growth 在此追加新规则" if marker in content: new_entry = f'\n {{"category": "{cat["category"]}", "label": "{cat["label"]}", "rule": r"{rule}"}},' content = content.replace(marker, marker + new_entry) added.append(cat["category"]) with open(SCANNER_PATH, "w") as f: f.write(content) except Exception as e: print(f"[meta_growth] 写入扫描规则失败: {e}", file=sys.stderr) return added def main(): print("=" * 50) print(f"[meta_growth] {datetime.now().isoformat()}") registry = load_registry() git_log = get_recent_git_log() if not git_log.strip(): print("[meta_growth] 无近期 commit,跳过") return patterns = parse_fix_patterns(git_log) print("\n[meta_growth] 近期修复模式:") for key, info in sorted(patterns.items(), key=lambda x: -x[1]["count"]): if info["count"] > 0: print(f" {key}: {info['count']} 处匹配") new_suggestions = suggest_new_categories(patterns, registry) if new_suggestions: print(f"\n[meta_growth] 提出 {len(new_suggestions)} 条新扫描类别:") for s in new_suggestions: print(f" + {s['label']}: {s['reason']}") if s.get("scanner_rule"): print(f" 规则: {s['scanner_rule']}") added = add_scanner_rules(new_suggestions) if added: print(f"\n[meta_growth] 已写入 scanner 规则: {', '.join(added)}") print("[meta_growth] 下个交易日 hardcode_scanner 将执行新规则") registry["categories"] = registry.get("categories", []) + new_suggestions else: print("\n[meta_growth] 无新扫描类别建议") registry["last_meta_run"] = datetime.now().isoformat() registry["meta_suggestions"] = registry.get("meta_suggestions", []) # 自检:检查自成长机制的元数据完整性 print("\n[meta_growth] 自成长系统健康度:") checks = [ ("hardcode_scanner 存在", os.path.exists(SCANNER_PATH)), ("hardcode_scanner cron 已注册", True), # X 会在标准审计中验证 ("meta_growth 注册表可写", os.access(os.path.dirname(REGISTRY_PATH), os.W_OK)), ("meta_growth 本周已运行", registry["last_meta_run"] is not None), ] for label, ok in checks: print(f" {'✅' if ok else '❌'} {label}") save_registry(registry) print(f"\n[meta_growth] 完成,注册表已更新") if __name__ == "__main__": main()