Files
MoFin/scripts/meta_growth.py
知微 3c9a758424 meta_growth.py 脚本:每日两次分析git log→自动注入硬编码扫描规则
- 读取过去8小时git log,按修复关键词识别新问题类型
- 匹配 PATTERN_TEMPLATES(目前6个类别,可扩展)
- 去重检查→注入到 hardcode_scanner.py 的扩展点
- 更新 growth_registry.json(问题类别注册表)
- 自检:hardcode_scanner是否存在/注册表是否可写

调度:12:45(午间注入→17:25审计用新规则)
      00:45(全天汇总→次日审计带新规则)
2026-06-24 00:15:33 +08:00

251 lines
7.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
meta_growth.py — 自成长机制的元层
功能:读取近期 git log,识别修复模式,注入新扫描规则到 hardcode_scanner 的扩展点。
让自成长机制本身也会成长——能自动发现新的问题类型并添加对应的扫描规则。
调度:交易日 12:45 和 00:45no_agent 模式)
- 12:45: 上午盘发现的问题→下午17:25审计就能扫到
- 00:45: 全天修复汇总→次日审计带新规则
输出:/home/hmo/web-dashboard/data/growth_registry.json
"""
import subprocess
import re
import json
import os
import sys
import datetime
SCANNER_PATH = "/home/hmo/MoFin/scripts/hardcode_scanner.py"
PROFILE_SCANNER = "/home/hmo/.hermes/profiles/position-analyst/scripts/hardcode_scanner.py"
REGISTRY_PATH = "/home/hmo/web-dashboard/data/growth_registry.json"
EXTENSION_MARKER = "# 扩展点 — meta_growth 在此追加新规则"
# 已知问题类别 → 扫描规则模板
# meta_growth 分析 git log 后,把新模式匹配到这里生成规则元组
PATTERN_TEMPLATES = [
{
"name": "cash_hardcode",
"desc": "现金/金额硬编码",
"regex": r"return\s+\d{4,}\b",
"reason": "可能的硬编码现金/金额",
"git_keywords": ["cash", "现金", "硬编码", "金额", "fallback.*\\d+"],
},
{
"name": "exchange_rate",
"desc": "汇率硬编码",
"regex": r"0\.8[5-9]\d{1,3}",
"reason": "可能的硬编码汇率值",
"git_keywords": ["汇率", "rate", "HKD", "CNY", "0.8[5-9]"],
},
{
"name": "lot_size_hardcode",
"desc": "港股每手股数硬编码",
"regex": r"1手\s*[:=]\s*\d{3,}",
"reason": "可能的每手股数硬编码",
"git_keywords": ["lot_size", "每手", "手数", "lot", "board lot", "f\\[60\\]"],
},
{
"name": "percent_threshold",
"desc": "百分比阈值硬编码",
"regex": r"[><=]\s*0\.[0-9]+",
"reason": "可能的百分比阈值硬编码",
"git_keywords": ["threshold", "阈值", "止损", "stop_loss", "止盈", "百分比"],
},
{
"name": "position_limit",
"desc": "仓位金额硬编码",
"regex": r"仓位\s*[:=]\s*\d{3,}",
"reason": "可能的仓位金额硬编码",
"git_keywords": ["仓位", "position", "持仓金额"],
},
{
"name": "hardcoded_path",
"desc": "路径硬编码",
"regex": r"['\"](?!http|~|\./|\.\./)/home/[^'\"]+['\"]",
"reason": "可能的文件路径硬编码(应使用环境变量或配置)",
"git_keywords": ["路径", "path", "hardcoded path"],
},
]
def get_recent_git_log(hours=8):
"""获取最近 N 小时的 git log"""
try:
result = subprocess.run(
["git", "log", f"--since={hours} hours ago", "--oneline"],
capture_output=True, text=True, cwd="/home/hmo/MoFin", timeout=10
)
return result.stdout
except Exception as e:
print(f"[meta_growth] git log 失败: {e}", file=sys.stderr)
return ""
def analyze_log(log_text):
"""分析 git log,识别修复模式"""
found_patterns = []
lines = log_text.strip().split("\n")
for tmpl in PATTERN_TEMPLATES:
hit_count = 0
for line in lines:
for kw in tmpl["git_keywords"]:
if re.search(kw, line, re.IGNORECASE):
hit_count += 1
break
if hit_count > 0:
found_patterns.append({
"name": tmpl["name"],
"desc": tmpl["desc"],
"regex": tmpl["regex"],
"reason": tmpl["reason"],
"hits": hit_count,
})
return found_patterns
def load_registry():
"""加载问题类别注册表"""
try:
if os.path.exists(REGISTRY_PATH):
with open(REGISTRY_PATH) as f:
return json.load(f)
except Exception:
pass
return {
"known_categories": [],
"injected_rules": [],
"last_run": None,
"last_findings": [],
}
def save_registry(registry):
"""保存注册表"""
os.makedirs(os.path.dirname(REGISTRY_PATH), exist_ok=True)
with open(REGISTRY_PATH, "w") as f:
json.dump(registry, f, indent=2, ensure_ascii=False)
def rule_already_exists(registry, regex):
"""检查规则是否已注入"""
for r in registry.get("injected_rules", []):
if r.get("regex") == regex:
return True
return False
def inject_rule(scanner_path, regex, reason, marker=EXTENSION_MARKER):
"""在 hardcode_scanner.py 的扩展点后插入新规则"""
if not os.path.exists(scanner_path):
return False
try:
with open(scanner_path, "r") as f:
content = f.read()
except Exception:
return False
if regex in content:
return False # 已存在
new_rule = f" (r'{regex}', '{reason}'),\n {marker}"
if marker not in content:
return False # 没有扩展点
content = content.replace(marker, new_rule)
try:
with open(scanner_path, "w") as f:
f.write(content)
return True
except Exception:
return False
def self_check():
"""自检:检查自成长系统本身的健康度"""
issues = []
if not os.path.exists(SCANNER_PATH):
issues.append("hardcode_scanner.py 不存在")
if not os.path.exists(REGISTRY_PATH):
issues.append("growth_registry.json 不存在(首次运行正常)")
return issues
def main():
now = datetime.datetime.now().isoformat()
period = "afternoon" if datetime.datetime.now().hour < 15 else "overnight"
# 自检
issues = self_check()
if issues:
for issue in issues:
print(f"[meta_growth] ⚠ {issue}", file=sys.stderr)
# 读取 git log
hours = 8 # 过去8小时(覆盖一整个交易时段)
log = get_recent_git_log(hours=hours)
if not log:
print(f"[meta_growth] 无近期提交,跳过")
return
print(f"[meta_growth] 分析 {period} 时段日志 ({len(log.strip().split(chr(10)))} 条提交)")
# 分析修复模式
patterns = analyze_log(log)
# 加载注册表
registry = load_registry()
registry["last_run"] = now
if not patterns:
print(f"[meta_growth] 未发现新修复模式")
registry["last_findings"] = []
save_registry(registry)
return
# 去重注入
injected_count = 0
for p in patterns:
if rule_already_exists(registry, p["regex"]):
print(f"[meta_growth] 规则已存在: {p['name']} ({p['regex']})")
continue
# 注入到 MoFin and profile 两个副本
ok1 = inject_rule(SCANNER_PATH, p["regex"], p["reason"])
ok2 = inject_rule(PROFILE_SCANNER, p["regex"], p["reason"])
if ok1 or ok2:
registry["injected_rules"].append({
"name": p["name"],
"desc": p["desc"],
"regex": p["regex"],
"reason": p["reason"],
"injected_at": now,
"period": period,
"hits_in_log": p["hits"],
})
injected_count += 1
print(f"[meta_growth] 注入新规则: {p['name']} ({p['desc']})")
# 记录到已知类别
if p["name"] not in registry["known_categories"]:
registry["known_categories"].append(p["name"])
registry["last_findings"] = patterns
save_registry(registry)
print(f"[meta_growth] 本次注入 {injected_count} 条新规则")
if injected_count > 0:
print(f"[meta_growth] 下次 hardcode_scanner 运行时将自动使用新规则")
if __name__ == "__main__":
main()