meta_growth.py 脚本:每日两次分析git log→自动注入硬编码扫描规则
- 读取过去8小时git log,按修复关键词识别新问题类型
- 匹配 PATTERN_TEMPLATES(目前6个类别,可扩展)
- 去重检查→注入到 hardcode_scanner.py 的扩展点
- 更新 growth_registry.json(问题类别注册表)
- 自检:hardcode_scanner是否存在/注册表是否可写
调度:12:45(午间注入→17:25审计用新规则)
00:45(全天汇总→次日审计带新规则)
This commit is contained in:
+213
-151
@@ -1,188 +1,250 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
meta_growth.py — 自成长机制的元层(自我审视)
|
||||
meta_growth.py — 自成长机制的元层
|
||||
|
||||
每周日 22:00 运行。分析近期改动,识别修复模式,
|
||||
提出新的扫描类别和建议,扩展 hardcode_scanner 的规则。
|
||||
功能:读取近期 git log,识别修复模式,注入新扫描规则到 hardcode_scanner 的扩展点。
|
||||
让自成长机制本身也会成长——能自动发现新的问题类型并添加对应的扫描规则。
|
||||
|
||||
输出 stdout → cron delivery → 知微收到后执行新规则。
|
||||
调度:交易日 12:45 和 00:45(no_agent 模式)
|
||||
- 12:45: 上午盘发现的问题→下午17:25审计就能扫到
|
||||
- 00:45: 全天修复汇总→次日审计带新规则
|
||||
|
||||
输出:/home/hmo/web-dashboard/data/growth_registry.json
|
||||
"""
|
||||
import subprocess, json, os, re, sys
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
REGISTRY_PATH = "/home/hmo/web-dashboard/data/growth_registry.json"
|
||||
import subprocess
|
||||
import re
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import datetime
|
||||
|
||||
SCANNER_PATH = "/home/hmo/MoFin/scripts/hardcode_scanner.py"
|
||||
GIT_DIR = "/home/hmo/MoFin"
|
||||
PROFILE_SCANNER = "/home/hmo/.hermes/profiles/position-analyst/scripts/hardcode_scanner.py"
|
||||
REGISTRY_PATH = "/home/hmo/web-dashboard/data/growth_registry.json"
|
||||
EXTENSION_MARKER = "# 扩展点 — meta_growth 在此追加新规则"
|
||||
|
||||
def load_registry():
|
||||
# 已知问题类别 → 扫描规则模板
|
||||
# meta_growth 分析 git log 后,把新模式匹配到这里生成规则元组
|
||||
PATTERN_TEMPLATES = [
|
||||
{
|
||||
"name": "cash_hardcode",
|
||||
"desc": "现金/金额硬编码",
|
||||
"regex": r"return\s+\d{4,}\b",
|
||||
"reason": "可能的硬编码现金/金额",
|
||||
"git_keywords": ["cash", "现金", "硬编码", "金额", "fallback.*\\d+"],
|
||||
},
|
||||
{
|
||||
"name": "exchange_rate",
|
||||
"desc": "汇率硬编码",
|
||||
"regex": r"0\.8[5-9]\d{1,3}",
|
||||
"reason": "可能的硬编码汇率值",
|
||||
"git_keywords": ["汇率", "rate", "HKD", "CNY", "0.8[5-9]"],
|
||||
},
|
||||
{
|
||||
"name": "lot_size_hardcode",
|
||||
"desc": "港股每手股数硬编码",
|
||||
"regex": r"1手\s*[:=]\s*\d{3,}",
|
||||
"reason": "可能的每手股数硬编码",
|
||||
"git_keywords": ["lot_size", "每手", "手数", "lot", "board lot", "f\\[60\\]"],
|
||||
},
|
||||
{
|
||||
"name": "percent_threshold",
|
||||
"desc": "百分比阈值硬编码",
|
||||
"regex": r"[><=]\s*0\.[0-9]+",
|
||||
"reason": "可能的百分比阈值硬编码",
|
||||
"git_keywords": ["threshold", "阈值", "止损", "stop_loss", "止盈", "百分比"],
|
||||
},
|
||||
{
|
||||
"name": "position_limit",
|
||||
"desc": "仓位金额硬编码",
|
||||
"regex": r"仓位\s*[:=]\s*\d{3,}",
|
||||
"reason": "可能的仓位金额硬编码",
|
||||
"git_keywords": ["仓位", "position", "持仓金额"],
|
||||
},
|
||||
{
|
||||
"name": "hardcoded_path",
|
||||
"desc": "路径硬编码",
|
||||
"regex": r"['\"](?!http|~|\./|\.\./)/home/[^'\"]+['\"]",
|
||||
"reason": "可能的文件路径硬编码(应使用环境变量或配置)",
|
||||
"git_keywords": ["路径", "path", "hardcoded path"],
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def get_recent_git_log(hours=8):
|
||||
"""获取最近 N 小时的 git log"""
|
||||
try:
|
||||
if os.path.exists(REGISTRY_PATH):
|
||||
return json.load(open(REGISTRY_PATH))
|
||||
except: pass
|
||||
return {"categories": [], "meta_suggestions": [], "last_meta_run": None}
|
||||
|
||||
def save_registry(r):
|
||||
os.makedirs(os.path.dirname(REGISTRY_PATH), exist_ok=True)
|
||||
json.dump(r, open(REGISTRY_PATH, "w"), indent=2, ensure_ascii=False)
|
||||
|
||||
def get_recent_git_log(days=7, max_commits=50):
|
||||
"""返回最近N天git提交的改动统计"""
|
||||
since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
|
||||
try:
|
||||
r = subprocess.run(
|
||||
["git", "log", f"--since={since}", "--stat", f"--max-count={max_commits}"],
|
||||
capture_output=True, text=True, cwd=GIT_DIR, timeout=15
|
||||
result = subprocess.run(
|
||||
["git", "log", f"--since={hours} hours ago", "--oneline"],
|
||||
capture_output=True, text=True, cwd="/home/hmo/MoFin", timeout=10
|
||||
)
|
||||
return r.stdout
|
||||
except: return ""
|
||||
return result.stdout
|
||||
except Exception as e:
|
||||
print(f"[meta_growth] git log 失败: {e}", file=sys.stderr)
|
||||
return ""
|
||||
|
||||
def parse_fix_patterns(git_log):
|
||||
"""从git log中归类修复模式"""
|
||||
patterns = {
|
||||
"hardcode_cash": {"keywords": ["cash", "备用值", "fallback", "现金"], "count": 0},
|
||||
"hardcode_rate": {"keywords": ["汇率", "rate", "0\\.", "HK_RATE"], "count": 0},
|
||||
"hardcode_lot": {"keywords": ["每手", "lot", "手数", "股/手"], "count": 0},
|
||||
"async_bug": {"keywords": ["异步", "spawn", "async", "乱序"], "count": 0},
|
||||
"data_path": {"keywords": ["路径", "path", "文件不存在"], "count": 0},
|
||||
"stale_strategy": {"keywords": ["重评", "reassess", "过期", "stale"], "count": 0},
|
||||
"doc_missing": {"keywords": ["文档", "更新", "doc", "README"], "count": 0},
|
||||
}
|
||||
for line in git_log.lower().split("\n"):
|
||||
for key, info in patterns.items():
|
||||
for kw in info["keywords"]:
|
||||
if kw in line:
|
||||
info["count"] += 1
|
||||
|
||||
def analyze_log(log_text):
|
||||
"""分析 git log,识别修复模式"""
|
||||
found_patterns = []
|
||||
lines = log_text.strip().split("\n")
|
||||
|
||||
for tmpl in PATTERN_TEMPLATES:
|
||||
hit_count = 0
|
||||
for line in lines:
|
||||
for kw in tmpl["git_keywords"]:
|
||||
if re.search(kw, line, re.IGNORECASE):
|
||||
hit_count += 1
|
||||
break
|
||||
return patterns
|
||||
|
||||
def suggest_new_categories(patterns, registry):
|
||||
"""基于修复模式提出新扫描类别"""
|
||||
suggestions = []
|
||||
|
||||
# 类别级别的规则
|
||||
RULES = [
|
||||
{
|
||||
"trigger": lambda p: p.get("hardcode_rate", {}).get("count", 0) >= 2,
|
||||
"category": "hotfix_exchange_rate",
|
||||
"label": "汇率硬编码",
|
||||
"scanner_rule": r"(?:\b0\.\d{3,}\b).*#.*(?:fallback|备用|默认)",
|
||||
"reason": "发现多次汇率写死修复,应持续监控 fallback 值",
|
||||
},
|
||||
{
|
||||
"trigger": lambda p: p.get("async_bug", {}).get("count", 0) >= 2,
|
||||
"category": "race_condition",
|
||||
"label": "竞态/异步问题",
|
||||
"scanner_rule": None,
|
||||
"reason": "检测到多次异步乱序修复,需考虑加串行锁审计",
|
||||
},
|
||||
{
|
||||
"trigger": lambda p: p.get("hardcode_cash", {}).get("count", 0) >= 1,
|
||||
"category": "hardcoded_asset",
|
||||
"label": "资产硬编码",
|
||||
"scanner_rule": r"(?:return|=\s*)\d{5,}\b",
|
||||
"reason": "出现现金硬编码,需扫描所有 return/赋值大额数字",
|
||||
},
|
||||
]
|
||||
|
||||
for rule in RULES:
|
||||
if rule["trigger"](patterns):
|
||||
already = any(c["category"] == rule["category"] for c in registry.get("categories", []))
|
||||
if not already:
|
||||
suggestions.append({
|
||||
"category": rule["category"],
|
||||
"label": rule["label"],
|
||||
"scanner_rule": rule["scanner_rule"],
|
||||
"reason": rule["reason"],
|
||||
"suggested_at": datetime.now().isoformat(),
|
||||
if hit_count > 0:
|
||||
found_patterns.append({
|
||||
"name": tmpl["name"],
|
||||
"desc": tmpl["desc"],
|
||||
"regex": tmpl["regex"],
|
||||
"reason": tmpl["reason"],
|
||||
"hits": hit_count,
|
||||
})
|
||||
|
||||
return suggestions
|
||||
return found_patterns
|
||||
|
||||
def add_scanner_rules(new_categories):
|
||||
"""将新扫描规则写入 hardcode_scanner.py 的 RULES 表"""
|
||||
if not new_categories:
|
||||
return []
|
||||
|
||||
added = []
|
||||
def load_registry():
|
||||
"""加载问题类别注册表"""
|
||||
try:
|
||||
with open(SCANNER_PATH) as f:
|
||||
if os.path.exists(REGISTRY_PATH):
|
||||
with open(REGISTRY_PATH) as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
pass
|
||||
return {
|
||||
"known_categories": [],
|
||||
"injected_rules": [],
|
||||
"last_run": None,
|
||||
"last_findings": [],
|
||||
}
|
||||
|
||||
|
||||
def save_registry(registry):
|
||||
"""保存注册表"""
|
||||
os.makedirs(os.path.dirname(REGISTRY_PATH), exist_ok=True)
|
||||
with open(REGISTRY_PATH, "w") as f:
|
||||
json.dump(registry, f, indent=2, ensure_ascii=False)
|
||||
|
||||
|
||||
def rule_already_exists(registry, regex):
|
||||
"""检查规则是否已注入"""
|
||||
for r in registry.get("injected_rules", []):
|
||||
if r.get("regex") == regex:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def inject_rule(scanner_path, regex, reason, marker=EXTENSION_MARKER):
|
||||
"""在 hardcode_scanner.py 的扩展点后插入新规则"""
|
||||
if not os.path.exists(scanner_path):
|
||||
return False
|
||||
|
||||
try:
|
||||
with open(scanner_path, "r") as f:
|
||||
content = f.read()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
for cat in new_categories:
|
||||
rule = cat.get("scanner_rule")
|
||||
if not rule or not rule.strip():
|
||||
continue
|
||||
if regex in content:
|
||||
return False # 已存在
|
||||
|
||||
# 检查是否已存在
|
||||
if rule in content:
|
||||
continue
|
||||
new_rule = f" (r'{regex}', '{reason}'),\n {marker}"
|
||||
if marker not in content:
|
||||
return False # 没有扩展点
|
||||
|
||||
# 在 RULES 列表中找到插入点
|
||||
marker = "# 扩展点 — meta_growth 在此追加新规则"
|
||||
if marker in content:
|
||||
new_entry = f'\n {{"category": "{cat["category"]}", "label": "{cat["label"]}", "rule": r"{rule}"}},'
|
||||
content = content.replace(marker, marker + new_entry)
|
||||
added.append(cat["category"])
|
||||
content = content.replace(marker, new_rule)
|
||||
|
||||
with open(SCANNER_PATH, "w") as f:
|
||||
try:
|
||||
with open(scanner_path, "w") as f:
|
||||
f.write(content)
|
||||
except Exception as e:
|
||||
print(f"[meta_growth] 写入扫描规则失败: {e}", file=sys.stderr)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def self_check():
|
||||
"""自检:检查自成长系统本身的健康度"""
|
||||
issues = []
|
||||
if not os.path.exists(SCANNER_PATH):
|
||||
issues.append("hardcode_scanner.py 不存在")
|
||||
if not os.path.exists(REGISTRY_PATH):
|
||||
issues.append("growth_registry.json 不存在(首次运行正常)")
|
||||
return issues
|
||||
|
||||
return added
|
||||
|
||||
def main():
|
||||
print("=" * 50)
|
||||
print(f"[meta_growth] {datetime.now().isoformat()}")
|
||||
now = datetime.datetime.now().isoformat()
|
||||
period = "afternoon" if datetime.datetime.now().hour < 15 else "overnight"
|
||||
|
||||
registry = load_registry()
|
||||
git_log = get_recent_git_log()
|
||||
# 自检
|
||||
issues = self_check()
|
||||
if issues:
|
||||
for issue in issues:
|
||||
print(f"[meta_growth] ⚠ {issue}", file=sys.stderr)
|
||||
|
||||
if not git_log.strip():
|
||||
print("[meta_growth] 无近期 commit,跳过")
|
||||
# 读取 git log
|
||||
hours = 8 # 过去8小时(覆盖一整个交易时段)
|
||||
log = get_recent_git_log(hours=hours)
|
||||
if not log:
|
||||
print(f"[meta_growth] 无近期提交,跳过")
|
||||
return
|
||||
|
||||
patterns = parse_fix_patterns(git_log)
|
||||
print(f"[meta_growth] 分析 {period} 时段日志 ({len(log.strip().split(chr(10)))} 条提交)")
|
||||
|
||||
print("\n[meta_growth] 近期修复模式:")
|
||||
for key, info in sorted(patterns.items(), key=lambda x: -x[1]["count"]):
|
||||
if info["count"] > 0:
|
||||
print(f" {key}: {info['count']} 处匹配")
|
||||
# 分析修复模式
|
||||
patterns = analyze_log(log)
|
||||
|
||||
new_suggestions = suggest_new_categories(patterns, registry)
|
||||
|
||||
if new_suggestions:
|
||||
print(f"\n[meta_growth] 提出 {len(new_suggestions)} 条新扫描类别:")
|
||||
for s in new_suggestions:
|
||||
print(f" + {s['label']}: {s['reason']}")
|
||||
if s.get("scanner_rule"):
|
||||
print(f" 规则: {s['scanner_rule']}")
|
||||
|
||||
added = add_scanner_rules(new_suggestions)
|
||||
if added:
|
||||
print(f"\n[meta_growth] 已写入 scanner 规则: {', '.join(added)}")
|
||||
print("[meta_growth] 下个交易日 hardcode_scanner 将执行新规则")
|
||||
|
||||
registry["categories"] = registry.get("categories", []) + new_suggestions
|
||||
else:
|
||||
print("\n[meta_growth] 无新扫描类别建议")
|
||||
|
||||
registry["last_meta_run"] = datetime.now().isoformat()
|
||||
registry["meta_suggestions"] = registry.get("meta_suggestions", [])
|
||||
|
||||
# 自检:检查自成长机制的元数据完整性
|
||||
print("\n[meta_growth] 自成长系统健康度:")
|
||||
checks = [
|
||||
("hardcode_scanner 存在", os.path.exists(SCANNER_PATH)),
|
||||
("hardcode_scanner cron 已注册", True), # X 会在标准审计中验证
|
||||
("meta_growth 注册表可写", os.access(os.path.dirname(REGISTRY_PATH), os.W_OK)),
|
||||
("meta_growth 本周已运行", registry["last_meta_run"] is not None),
|
||||
]
|
||||
for label, ok in checks:
|
||||
print(f" {'✅' if ok else '❌'} {label}")
|
||||
# 加载注册表
|
||||
registry = load_registry()
|
||||
registry["last_run"] = now
|
||||
|
||||
if not patterns:
|
||||
print(f"[meta_growth] 未发现新修复模式")
|
||||
registry["last_findings"] = []
|
||||
save_registry(registry)
|
||||
print(f"\n[meta_growth] 完成,注册表已更新")
|
||||
return
|
||||
|
||||
# 去重注入
|
||||
injected_count = 0
|
||||
for p in patterns:
|
||||
if rule_already_exists(registry, p["regex"]):
|
||||
print(f"[meta_growth] 规则已存在: {p['name']} ({p['regex']})")
|
||||
continue
|
||||
|
||||
# 注入到 MoFin and profile 两个副本
|
||||
ok1 = inject_rule(SCANNER_PATH, p["regex"], p["reason"])
|
||||
ok2 = inject_rule(PROFILE_SCANNER, p["regex"], p["reason"])
|
||||
|
||||
if ok1 or ok2:
|
||||
registry["injected_rules"].append({
|
||||
"name": p["name"],
|
||||
"desc": p["desc"],
|
||||
"regex": p["regex"],
|
||||
"reason": p["reason"],
|
||||
"injected_at": now,
|
||||
"period": period,
|
||||
"hits_in_log": p["hits"],
|
||||
})
|
||||
injected_count += 1
|
||||
print(f"[meta_growth] 注入新规则: {p['name']} ({p['desc']})")
|
||||
|
||||
# 记录到已知类别
|
||||
if p["name"] not in registry["known_categories"]:
|
||||
registry["known_categories"].append(p["name"])
|
||||
|
||||
registry["last_findings"] = patterns
|
||||
save_registry(registry)
|
||||
|
||||
print(f"[meta_growth] 本次注入 {injected_count} 条新规则")
|
||||
if injected_count > 0:
|
||||
print(f"[meta_growth] 下次 hardcode_scanner 运行时将自动使用新规则")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user