#!/usr/bin/env python3 """ meta_growth.py — 自成长机制的元层 功能:读取近期 git log,识别修复模式,注入新扫描规则到 hardcode_scanner 的扩展点。 让自成长机制本身也会成长——能自动发现新的问题类型并添加对应的扫描规则。 调度:交易日 12:45 和 00:45(no_agent 模式) - 12:45: 上午盘发现的问题→下午17:25审计就能扫到 - 00:45: 全天修复汇总→次日审计带新规则 输出:/home/hmo/web-dashboard/data/growth_registry.json """ import subprocess import re import json import os import sys import datetime SCANNER_PATH = "/home/hmo/MoFin/scripts/hardcode_scanner.py" PROFILE_SCANNER = "/home/hmo/.hermes/profiles/position-analyst/scripts/hardcode_scanner.py" REGISTRY_PATH = "/home/hmo/web-dashboard/data/growth_registry.json" EXTENSION_MARKER = "# 扩展点 — meta_growth 在此追加新规则" # 已知问题类别 → 扫描规则模板 # meta_growth 分析 git log 后,把新模式匹配到这里生成规则元组 PATTERN_TEMPLATES = [ { "name": "cash_hardcode", "desc": "现金/金额硬编码", "regex": r"return\s+\d{4,}\b", "reason": "可能的硬编码现金/金额", "git_keywords": ["cash", "现金", "硬编码", "金额", "fallback.*\\d+"], }, { "name": "exchange_rate", "desc": "汇率硬编码", "regex": r"0\.8[5-9]\d{1,3}", "reason": "可能的硬编码汇率值", "git_keywords": ["汇率", "rate", "HKD", "CNY", "0.8[5-9]"], }, { "name": "lot_size_hardcode", "desc": "港股每手股数硬编码", "regex": r"1手\s*[:=]\s*\d{3,}", "reason": "可能的每手股数硬编码", "git_keywords": ["lot_size", "每手", "手数", "lot", "board lot", "f\\[60\\]"], }, { "name": "percent_threshold", "desc": "百分比阈值硬编码", "regex": r"[><=]\s*0\.[0-9]+", "reason": "可能的百分比阈值硬编码", "git_keywords": ["threshold", "阈值", "止损", "stop_loss", "止盈", "百分比"], }, { "name": "position_limit", "desc": "仓位金额硬编码", "regex": r"仓位\s*[:=]\s*\d{3,}", "reason": "可能的仓位金额硬编码", "git_keywords": ["仓位", "position", "持仓金额"], }, { "name": "hardcoded_path", "desc": "路径硬编码", "regex": r"['\"](?!http|~|\./|\.\./)/home/[^'\"]+['\"]", "reason": "可能的文件路径硬编码(应使用环境变量或配置)", "git_keywords": ["路径", "path", "hardcoded path"], }, ] def get_recent_git_log(hours=8): """获取最近 N 小时的 git log""" try: result = subprocess.run( ["git", "log", f"--since={hours} hours ago", "--oneline"], capture_output=True, text=True, cwd="/home/hmo/MoFin", timeout=10 ) return result.stdout except Exception as e: print(f"[meta_growth] git log 失败: {e}", file=sys.stderr) return "" def analyze_log(log_text): """分析 git log,识别修复模式""" found_patterns = [] lines = log_text.strip().split("\n") for tmpl in PATTERN_TEMPLATES: hit_count = 0 for line in lines: for kw in tmpl["git_keywords"]: if re.search(kw, line, re.IGNORECASE): hit_count += 1 break if hit_count > 0: found_patterns.append({ "name": tmpl["name"], "desc": tmpl["desc"], "regex": tmpl["regex"], "reason": tmpl["reason"], "hits": hit_count, }) return found_patterns def load_registry(): """加载问题类别注册表""" try: if os.path.exists(REGISTRY_PATH): with open(REGISTRY_PATH) as f: return json.load(f) except Exception: pass return { "known_categories": [], "injected_rules": [], "last_run": None, "last_findings": [], } def save_registry(registry): """保存注册表""" os.makedirs(os.path.dirname(REGISTRY_PATH), exist_ok=True) with open(REGISTRY_PATH, "w") as f: json.dump(registry, f, indent=2, ensure_ascii=False) def rule_already_exists(registry, regex): """检查规则是否已注入""" for r in registry.get("injected_rules", []): if r.get("regex") == regex: return True return False def inject_rule(scanner_path, regex, reason, marker=EXTENSION_MARKER): """在 hardcode_scanner.py 的扩展点后插入新规则""" if not os.path.exists(scanner_path): return False try: with open(scanner_path, "r") as f: content = f.read() except Exception: return False if regex in content: return False # 已存在 new_rule = f" (r'{regex}', '{reason}'),\n {marker}" if marker not in content: return False # 没有扩展点 content = content.replace(marker, new_rule) try: with open(scanner_path, "w") as f: f.write(content) return True except Exception: return False def self_check(): """自检:检查自成长系统本身的健康度""" issues = [] if not os.path.exists(SCANNER_PATH): issues.append("hardcode_scanner.py 不存在") if not os.path.exists(REGISTRY_PATH): issues.append("growth_registry.json 不存在(首次运行正常)") return issues def main(): now = datetime.datetime.now().isoformat() period = "afternoon" if datetime.datetime.now().hour < 15 else "overnight" # 自检 issues = self_check() if issues: for issue in issues: print(f"[meta_growth] ⚠ {issue}", file=sys.stderr) # 读取 git log hours = 8 # 过去8小时(覆盖一整个交易时段) log = get_recent_git_log(hours=hours) if not log: print(f"[meta_growth] 无近期提交,跳过") return print(f"[meta_growth] 分析 {period} 时段日志 ({len(log.strip().split(chr(10)))} 条提交)") # 分析修复模式 patterns = analyze_log(log) # 加载注册表 registry = load_registry() registry["last_run"] = now if not patterns: print(f"[meta_growth] 未发现新修复模式") registry["last_findings"] = [] save_registry(registry) return # 去重注入 injected_count = 0 for p in patterns: if rule_already_exists(registry, p["regex"]): print(f"[meta_growth] 规则已存在: {p['name']} ({p['regex']})") continue # 注入到 MoFin and profile 两个副本 ok1 = inject_rule(SCANNER_PATH, p["regex"], p["reason"]) ok2 = inject_rule(PROFILE_SCANNER, p["regex"], p["reason"]) if ok1 or ok2: registry["injected_rules"].append({ "name": p["name"], "desc": p["desc"], "regex": p["regex"], "reason": p["reason"], "injected_at": now, "period": period, "hits_in_log": p["hits"], }) injected_count += 1 print(f"[meta_growth] 注入新规则: {p['name']} ({p['desc']})") # 记录到已知类别 if p["name"] not in registry["known_categories"]: registry["known_categories"].append(p["name"]) registry["last_findings"] = patterns save_registry(registry) print(f"[meta_growth] 本次注入 {injected_count} 条新规则") if injected_count > 0: print(f"[meta_growth] 下次 hardcode_scanner 运行时将自动使用新规则") if __name__ == "__main__": main()