add daily tool scripts: hardcode_scanner, branch_scanner, prune_branches

This commit is contained in:
知微
2026-06-24 00:04:59 +08:00
parent e33a236bc1
commit 6c97870a8d
3 changed files with 198 additions and 0 deletions
+68
View File
@@ -0,0 +1,68 @@
#!/usr/bin/env python3
"""每30分钟扫描所有持仓的分支状态,发现可操作的分支就推"""
import sys, json, os
sys.path.insert(0, '/home/hmo/MoFin')
sys.path.insert(0, '/home/hmo/web-dashboard')
from strategy_tree import detect_scenario
DEC_PATH = '/home/hmo/web-dashboard/data/decisions.json'
PF_PATH = '/home/hmo/web-dashboard/data/portfolio.json'
XMPP_BRIDGE = "http://127.0.0.1:5805/"
XMPP_USER = "hmo@yoin.fun"
def push(msg):
try:
from urllib.request import Request, urlopen
payload = json.dumps({"to": XMPP_USER, "body": msg, "type": "chat"}).encode()
req = Request(XMPP_BRIDGE, data=payload, headers={"Content-Type": "application/json"})
urlopen(req, timeout=5)
except Exception:
pass
def main():
scenario = detect_scenario()
if not scenario.get('id'):
return 0 # SILENT
alives = []
dec = json.load(open(DEC_PATH))
pf = json.load(open(PF_PATH))
pf_codes = {h['code'] for h in pf.get('holdings', [])}
for e in dec.get('decisions', []):
code = e.get('code', '')
tree = e.get('strategy_tree', {})
if not tree or not tree.get('branches'):
continue
branches = tree['branches']
price = e.get('price', 0)
shares = e.get('shares', 0)
cost = e.get('cost', 0)
if price <= 0:
continue
# 用 strategy_tree 评估
try:
from strategy_tree import evaluate_branches
results = evaluate_branches(code, scenario['id'], price, shares, cost)
for r in results:
if r.get('applicable') and r.get('action_type') != 'hold':
is_held = code in pf_codes
label = '持仓' if is_held else '自选'
alives.append(f" {label} {r.get('action_type','?')} {code}@{price} | 情景{scenario.get('label','')}{r.get('branch_id','').split('_')[-1]}| {r.get('rationale','')[:30]}")
break
except Exception:
pass
if not alives:
return 0 # SILENT
# 只推单一情景行 + 操作列表
out_lines = [f"【知微】分支扫描 | {scenario.get('label','')}({scenario.get('id','')})"]
out_lines.extend(alives)
msg = '\n'.join(out_lines)
print(msg)
push(msg)
return 1
if __name__ == '__main__':
sys.exit(main())
+109
View File
@@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""
hardcode_scanner.py — 自成长扫描器
检测脚本中可能已过时的硬编码数值,写审计 JSON 供 system_audit 调用。
扫描规则:
1. 财务类硬编码(cash/金额/仓位)— 应来自 data/*.json
2. 汇率类硬编码(0.86xx, 0.87xx, 0.93等)— 应来自 hk_rate 模块
3. 数字 fallbackreturn X, fallback=X)— 应来自实时数据源
4. 每手股数硬编码(500, 1000 等)— 应来自 Tencent API field[60]
输出:/home/hmo/web-dashboard/data/hardcode_audit.json
"""
import re, ast, json, os, sys
SCAN_DIRS = [
"/home/hmo/.hermes/profiles/position-analyst/scripts",
"/home/hmo/MoFin",
"/home/hmo/web-dashboard",
]
SAFE_FALLBACK_PATTERNS = [
# Known valid fallbacks where network data is genuinely optional
"rate = 0.87", # hk_rate module's own fallback
"retry_for_secs=5", # timeouts
"timeout=5",
"timeout=10",
"timeout=30",
"timeout=60",
"port 5805",
"127.0.0.1:5805",
]
SUSPICIOUS_NUMBERS = [
# (pattern, reason)
(r'return\s+\d{4,}\b', '可能的硬编码现金/金额'),
(r'=\s*\d{5,}\b', '可能的硬编码大额数字'),
(r'0\.8[5-9]\d{1,3}', '可能的硬编码汇率值'),
(r'0\.9[0-5]\d{1,3}', '可能的硬编码汇率值'),
(r'1手\s*[:=]\s*\d{3,}', '可能的每手股数硬编码'),
]
def scan_file(filepath):
findings = []
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
except Exception:
return []
lines = content.split('\n')
for i, line in enumerate(lines, 1):
stripped = line.strip()
# Skip comments and empty lines
if not stripped or stripped.startswith('#') or '"""' in stripped:
continue
for pat, reason in SUSPICIOUS_NUMBERS:
if re.search(pat, stripped):
# Check if it's a safe fallback
if any(safe in stripped for safe in SAFE_FALLBACK_PATTERNS):
continue
findings.append({
"file": filepath,
"line": i,
"code": stripped[:120],
"reason": reason,
"suggestion": "考虑从 data/*.json 或 API 实时读取,不使用硬编码值"
})
break # one finding per line
return findings
def main():
all_findings = []
for directory in SCAN_DIRS:
if not os.path.isdir(directory):
continue
for root, _, files in os.walk(directory):
for f in files:
if f.endswith('.py'):
path = os.path.join(root, f)
findings = scan_file(path)
all_findings.extend(findings)
# Only output to stdout for cron
if all_findings:
print(f"[HARDCODE_SCAN] 发现 {len(all_findings)} 处可能硬编码:")
for f in all_findings:
rel = f['file'].replace('/home/hmo/', '')
print(f"{rel}:L{f['line']} {f['reason']}")
print(f" {f['code']}")
print(f"{f['suggestion']}")
else:
print("[HARDCODE_SCAN] 未发现可疑硬编码")
# Write audit log
os.makedirs(os.path.dirname(AUDIT_PATH), exist_ok=True)
json.dump({
"timestamp": __import__('datetime').datetime.now().isoformat(),
"findings": all_findings,
"count": len(all_findings),
}, open(AUDIT_PATH, 'w'), ensure_ascii=False, indent=2)
if __name__ == '__main__':
AUDIT_PATH = "/home/hmo/web-dashboard/data/hardcode_audit.json" if 'AUDIT_PATH' not in dir() else AUDIT_PATH
main()
+21
View File
@@ -0,0 +1,21 @@
#!/usr/bin/env python3
"""
prune_branches.py — 每周六凌晨6点执行
低效分支剪枝:
- 触发≥5次且成功率<30% → 剪枝归档
- 输出报告(stdout → cron delivery
"""
import sys, json
sys.path.insert(0, '/home/hmo/MoFin')
from strategy_tree import prune_low_performance_branches
result = prune_low_performance_branches(min_triggers=5, min_success_rate=0.3)
if result:
print(f"[分支剪枝] 已剪枝 {len(result)} 条低效分支:")
for item in result:
print(f"{item}")
print("[分支剪枝] 剪枝完成")
else:
print("[分支剪枝] 无低效分支需剪枝")