diff --git a/scripts/meta_growth.py b/scripts/meta_growth.py index 40cc664..49072b7 100644 --- a/scripts/meta_growth.py +++ b/scripts/meta_growth.py @@ -1,188 +1,250 @@ #!/usr/bin/env python3 """ -meta_growth.py — 自成长机制的元层(自我审视) +meta_growth.py — 自成长机制的元层 -每周日 22:00 运行。分析近期改动,识别修复模式, -提出新的扫描类别和建议,扩展 hardcode_scanner 的规则。 +功能:读取近期 git log,识别修复模式,注入新扫描规则到 hardcode_scanner 的扩展点。 +让自成长机制本身也会成长——能自动发现新的问题类型并添加对应的扫描规则。 -输出 stdout → cron delivery → 知微收到后执行新规则。 +调度:交易日 12:45 和 00:45(no_agent 模式) +- 12:45: 上午盘发现的问题→下午17:25审计就能扫到 +- 00:45: 全天修复汇总→次日审计带新规则 + +输出:/home/hmo/web-dashboard/data/growth_registry.json """ -import subprocess, json, os, re, sys -from datetime import datetime, timedelta -REGISTRY_PATH = "/home/hmo/web-dashboard/data/growth_registry.json" +import subprocess +import re +import json +import os +import sys +import datetime + SCANNER_PATH = "/home/hmo/MoFin/scripts/hardcode_scanner.py" -GIT_DIR = "/home/hmo/MoFin" +PROFILE_SCANNER = "/home/hmo/.hermes/profiles/position-analyst/scripts/hardcode_scanner.py" +REGISTRY_PATH = "/home/hmo/web-dashboard/data/growth_registry.json" +EXTENSION_MARKER = "# 扩展点 — meta_growth 在此追加新规则" + +# 已知问题类别 → 扫描规则模板 +# meta_growth 分析 git log 后,把新模式匹配到这里生成规则元组 +PATTERN_TEMPLATES = [ + { + "name": "cash_hardcode", + "desc": "现金/金额硬编码", + "regex": r"return\s+\d{4,}\b", + "reason": "可能的硬编码现金/金额", + "git_keywords": ["cash", "现金", "硬编码", "金额", "fallback.*\\d+"], + }, + { + "name": "exchange_rate", + "desc": "汇率硬编码", + "regex": r"0\.8[5-9]\d{1,3}", + "reason": "可能的硬编码汇率值", + "git_keywords": ["汇率", "rate", "HKD", "CNY", "0.8[5-9]"], + }, + { + "name": "lot_size_hardcode", + "desc": "港股每手股数硬编码", + "regex": r"1手\s*[:=]\s*\d{3,}", + "reason": "可能的每手股数硬编码", + "git_keywords": ["lot_size", "每手", "手数", "lot", "board lot", "f\\[60\\]"], + }, + { + "name": "percent_threshold", + "desc": "百分比阈值硬编码", + "regex": r"[><=]\s*0\.[0-9]+", + "reason": "可能的百分比阈值硬编码", + "git_keywords": ["threshold", "阈值", "止损", "stop_loss", "止盈", "百分比"], + }, + { + "name": "position_limit", + "desc": "仓位金额硬编码", + "regex": r"仓位\s*[:=]\s*\d{3,}", + "reason": "可能的仓位金额硬编码", + "git_keywords": ["仓位", "position", "持仓金额"], + }, + { + "name": "hardcoded_path", + "desc": "路径硬编码", + "regex": r"['\"](?!http|~|\./|\.\./)/home/[^'\"]+['\"]", + "reason": "可能的文件路径硬编码(应使用环境变量或配置)", + "git_keywords": ["路径", "path", "hardcoded path"], + }, +] + + +def get_recent_git_log(hours=8): + """获取最近 N 小时的 git log""" + try: + result = subprocess.run( + ["git", "log", f"--since={hours} hours ago", "--oneline"], + capture_output=True, text=True, cwd="/home/hmo/MoFin", timeout=10 + ) + return result.stdout + except Exception as e: + print(f"[meta_growth] git log 失败: {e}", file=sys.stderr) + return "" + + +def analyze_log(log_text): + """分析 git log,识别修复模式""" + found_patterns = [] + lines = log_text.strip().split("\n") + + for tmpl in PATTERN_TEMPLATES: + hit_count = 0 + for line in lines: + for kw in tmpl["git_keywords"]: + if re.search(kw, line, re.IGNORECASE): + hit_count += 1 + break + if hit_count > 0: + found_patterns.append({ + "name": tmpl["name"], + "desc": tmpl["desc"], + "regex": tmpl["regex"], + "reason": tmpl["reason"], + "hits": hit_count, + }) + + return found_patterns + def load_registry(): + """加载问题类别注册表""" try: if os.path.exists(REGISTRY_PATH): - return json.load(open(REGISTRY_PATH)) - except: pass - return {"categories": [], "meta_suggestions": [], "last_meta_run": None} - -def save_registry(r): - os.makedirs(os.path.dirname(REGISTRY_PATH), exist_ok=True) - json.dump(r, open(REGISTRY_PATH, "w"), indent=2, ensure_ascii=False) - -def get_recent_git_log(days=7, max_commits=50): - """返回最近N天git提交的改动统计""" - since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") - try: - r = subprocess.run( - ["git", "log", f"--since={since}", "--stat", f"--max-count={max_commits}"], - capture_output=True, text=True, cwd=GIT_DIR, timeout=15 - ) - return r.stdout - except: return "" - -def parse_fix_patterns(git_log): - """从git log中归类修复模式""" - patterns = { - "hardcode_cash": {"keywords": ["cash", "备用值", "fallback", "现金"], "count": 0}, - "hardcode_rate": {"keywords": ["汇率", "rate", "0\\.", "HK_RATE"], "count": 0}, - "hardcode_lot": {"keywords": ["每手", "lot", "手数", "股/手"], "count": 0}, - "async_bug": {"keywords": ["异步", "spawn", "async", "乱序"], "count": 0}, - "data_path": {"keywords": ["路径", "path", "文件不存在"], "count": 0}, - "stale_strategy": {"keywords": ["重评", "reassess", "过期", "stale"], "count": 0}, - "doc_missing": {"keywords": ["文档", "更新", "doc", "README"], "count": 0}, + with open(REGISTRY_PATH) as f: + return json.load(f) + except Exception: + pass + return { + "known_categories": [], + "injected_rules": [], + "last_run": None, + "last_findings": [], } - for line in git_log.lower().split("\n"): - for key, info in patterns.items(): - for kw in info["keywords"]: - if kw in line: - info["count"] += 1 - break - return patterns -def suggest_new_categories(patterns, registry): - """基于修复模式提出新扫描类别""" - suggestions = [] - - # 类别级别的规则 - RULES = [ - { - "trigger": lambda p: p.get("hardcode_rate", {}).get("count", 0) >= 2, - "category": "hotfix_exchange_rate", - "label": "汇率硬编码", - "scanner_rule": r"(?:\b0\.\d{3,}\b).*#.*(?:fallback|备用|默认)", - "reason": "发现多次汇率写死修复,应持续监控 fallback 值", - }, - { - "trigger": lambda p: p.get("async_bug", {}).get("count", 0) >= 2, - "category": "race_condition", - "label": "竞态/异步问题", - "scanner_rule": None, - "reason": "检测到多次异步乱序修复,需考虑加串行锁审计", - }, - { - "trigger": lambda p: p.get("hardcode_cash", {}).get("count", 0) >= 1, - "category": "hardcoded_asset", - "label": "资产硬编码", - "scanner_rule": r"(?:return|=\s*)\d{5,}\b", - "reason": "出现现金硬编码,需扫描所有 return/赋值大额数字", - }, - ] - - for rule in RULES: - if rule["trigger"](patterns): - already = any(c["category"] == rule["category"] for c in registry.get("categories", [])) - if not already: - suggestions.append({ - "category": rule["category"], - "label": rule["label"], - "scanner_rule": rule["scanner_rule"], - "reason": rule["reason"], - "suggested_at": datetime.now().isoformat(), - }) - - return suggestions -def add_scanner_rules(new_categories): - """将新扫描规则写入 hardcode_scanner.py 的 RULES 表""" - if not new_categories: - return [] - - added = [] +def save_registry(registry): + """保存注册表""" + os.makedirs(os.path.dirname(REGISTRY_PATH), exist_ok=True) + with open(REGISTRY_PATH, "w") as f: + json.dump(registry, f, indent=2, ensure_ascii=False) + + +def rule_already_exists(registry, regex): + """检查规则是否已注入""" + for r in registry.get("injected_rules", []): + if r.get("regex") == regex: + return True + return False + + +def inject_rule(scanner_path, regex, reason, marker=EXTENSION_MARKER): + """在 hardcode_scanner.py 的扩展点后插入新规则""" + if not os.path.exists(scanner_path): + return False + try: - with open(SCANNER_PATH) as f: + with open(scanner_path, "r") as f: content = f.read() - - for cat in new_categories: - rule = cat.get("scanner_rule") - if not rule or not rule.strip(): - continue - - # 检查是否已存在 - if rule in content: - continue - - # 在 RULES 列表中找到插入点 - marker = "# 扩展点 — meta_growth 在此追加新规则" - if marker in content: - new_entry = f'\n {{"category": "{cat["category"]}", "label": "{cat["label"]}", "rule": r"{rule}"}},' - content = content.replace(marker, marker + new_entry) - added.append(cat["category"]) - - with open(SCANNER_PATH, "w") as f: + except Exception: + return False + + if regex in content: + return False # 已存在 + + new_rule = f" (r'{regex}', '{reason}'),\n {marker}" + if marker not in content: + return False # 没有扩展点 + + content = content.replace(marker, new_rule) + + try: + with open(scanner_path, "w") as f: f.write(content) - except Exception as e: - print(f"[meta_growth] 写入扫描规则失败: {e}", file=sys.stderr) - - return added + return True + except Exception: + return False + + +def self_check(): + """自检:检查自成长系统本身的健康度""" + issues = [] + if not os.path.exists(SCANNER_PATH): + issues.append("hardcode_scanner.py 不存在") + if not os.path.exists(REGISTRY_PATH): + issues.append("growth_registry.json 不存在(首次运行正常)") + return issues + def main(): - print("=" * 50) - print(f"[meta_growth] {datetime.now().isoformat()}") - - registry = load_registry() - git_log = get_recent_git_log() - - if not git_log.strip(): - print("[meta_growth] 无近期 commit,跳过") + now = datetime.datetime.now().isoformat() + period = "afternoon" if datetime.datetime.now().hour < 15 else "overnight" + + # 自检 + issues = self_check() + if issues: + for issue in issues: + print(f"[meta_growth] ⚠ {issue}", file=sys.stderr) + + # 读取 git log + hours = 8 # 过去8小时(覆盖一整个交易时段) + log = get_recent_git_log(hours=hours) + if not log: + print(f"[meta_growth] 无近期提交,跳过") return - - patterns = parse_fix_patterns(git_log) - - print("\n[meta_growth] 近期修复模式:") - for key, info in sorted(patterns.items(), key=lambda x: -x[1]["count"]): - if info["count"] > 0: - print(f" {key}: {info['count']} 处匹配") - - new_suggestions = suggest_new_categories(patterns, registry) - - if new_suggestions: - print(f"\n[meta_growth] 提出 {len(new_suggestions)} 条新扫描类别:") - for s in new_suggestions: - print(f" + {s['label']}: {s['reason']}") - if s.get("scanner_rule"): - print(f" 规则: {s['scanner_rule']}") - - added = add_scanner_rules(new_suggestions) - if added: - print(f"\n[meta_growth] 已写入 scanner 规则: {', '.join(added)}") - print("[meta_growth] 下个交易日 hardcode_scanner 将执行新规则") - - registry["categories"] = registry.get("categories", []) + new_suggestions - else: - print("\n[meta_growth] 无新扫描类别建议") - - registry["last_meta_run"] = datetime.now().isoformat() - registry["meta_suggestions"] = registry.get("meta_suggestions", []) - - # 自检:检查自成长机制的元数据完整性 - print("\n[meta_growth] 自成长系统健康度:") - checks = [ - ("hardcode_scanner 存在", os.path.exists(SCANNER_PATH)), - ("hardcode_scanner cron 已注册", True), # X 会在标准审计中验证 - ("meta_growth 注册表可写", os.access(os.path.dirname(REGISTRY_PATH), os.W_OK)), - ("meta_growth 本周已运行", registry["last_meta_run"] is not None), - ] - for label, ok in checks: - print(f" {'✅' if ok else '❌'} {label}") - + + print(f"[meta_growth] 分析 {period} 时段日志 ({len(log.strip().split(chr(10)))} 条提交)") + + # 分析修复模式 + patterns = analyze_log(log) + + # 加载注册表 + registry = load_registry() + registry["last_run"] = now + + if not patterns: + print(f"[meta_growth] 未发现新修复模式") + registry["last_findings"] = [] + save_registry(registry) + return + + # 去重注入 + injected_count = 0 + for p in patterns: + if rule_already_exists(registry, p["regex"]): + print(f"[meta_growth] 规则已存在: {p['name']} ({p['regex']})") + continue + + # 注入到 MoFin and profile 两个副本 + ok1 = inject_rule(SCANNER_PATH, p["regex"], p["reason"]) + ok2 = inject_rule(PROFILE_SCANNER, p["regex"], p["reason"]) + + if ok1 or ok2: + registry["injected_rules"].append({ + "name": p["name"], + "desc": p["desc"], + "regex": p["regex"], + "reason": p["reason"], + "injected_at": now, + "period": period, + "hits_in_log": p["hits"], + }) + injected_count += 1 + print(f"[meta_growth] 注入新规则: {p['name']} ({p['desc']})") + + # 记录到已知类别 + if p["name"] not in registry["known_categories"]: + registry["known_categories"].append(p["name"]) + + registry["last_findings"] = patterns save_registry(registry) - print(f"\n[meta_growth] 完成,注册表已更新") + + print(f"[meta_growth] 本次注入 {injected_count} 条新规则") + if injected_count > 0: + print(f"[meta_growth] 下次 hardcode_scanner 运行时将自动使用新规则") + if __name__ == "__main__": main()