"""MoFin 全面测试 —— 按 TEST_PLAN.md 执行""" import sys, os, subprocess sys.path.insert(0, '/home/hmo/MoFin') passed = 0 failed = 0 results = [] def test(name, cond, detail=""): global passed, failed if cond: passed += 1 results.append(f" ✅ {name}") else: failed += 1 results.append(f" ❌ {name}: {detail}") print("=" * 50) print("MoFin 全面测试") print("=" * 50) # ── 1. 导入测试 ── print("\n--- 1. 导入 ---") try: from mo_data import read_portfolio, read_decisions, read_watchlist from mo_models import is_hk_stock, get_hk_rate, to_cny, calc_total_assets, calc_total_mv from mofin_db import get_conn, write_holdings_batch, write_portfolio_summary, write_holding_strategy test("mo_data 导入", True) test("mo_models 导入", True) test("mofin_db 导入", True) except Exception as e: test("核心模块导入", False, str(e)) # ── 2. 数据读取 ── print("\n--- 2. 数据读取 ---") pf = read_portfolio() dec = read_decisions() wl = read_watchlist() test("read_portfolio 有数据", len(pf.get('holdings', [])) > 0, f"got {len(pf.get('holdings',[]))}") test("read_decisions 有数据", len(dec.get('decisions', [])) > 0, f"got {len(dec.get('decisions',[]))}") test("read_watchlist 有数据", len(wl.get('stocks', [])) > 0, f"got {len(wl.get('stocks',[]))}") # ── 3. 币种存储 ── print("\n--- 3. 币种存储 ---") hk_ok = 0; hk_fail = 0; a_ok = 0; a_fail = 0 for h in pf.get('holdings', []): code = str(h.get('code', '')) curr = h.get('currency', h.get('_currency', '')) if is_hk_stock(code): if curr == 'HKD': hk_ok += 1 else: hk_fail += 1 else: if curr == 'CNY': a_ok += 1 else: a_fail += 1 test(f"港股 currency=HKD", hk_fail == 0, f"{hk_ok} OK, {hk_fail} wrong") test(f"A股 currency=CNY", a_fail == 0, f"{a_ok} OK, {a_fail} wrong") # 决策币种 dec_hk_ok = 0; dec_hk_fail = 0 for d in dec.get('decisions', []): code = str(d.get('code', '')) curr = d.get('currency', '') if is_hk_stock(code) and curr != 'HKD': dec_hk_fail += 1 elif is_hk_stock(code): dec_hk_ok += 1 test(f"决策 港股 currency=HKD", dec_hk_fail == 0, f"{dec_hk_ok} OK, {dec_hk_fail} wrong") # ── 4. 币种转换 ── print("\n--- 4. 币种转换 ---") rate = get_hk_rate() test("is_hk_stock('01888')", is_hk_stock('01888') == True) test("is_hk_stock('000657')", is_hk_stock('000657') == False) test("is_hk_stock('AAPL')", is_hk_stock('AAPL') == False) test("get_hk_rate 有效", 0.85 < rate < 0.95, f"rate={rate}") test("to_cny HK convert", abs(to_cny(100, '00700') - 100 * rate) < 0.01) test("to_cny A股 no convert", to_cny(100, '000657') == 100) # ── 5. 总资产 ── print("\n--- 5. 总资产 ---") stored_ta = pf.get('total_assets', 0) stored_mv = pf.get('total_mv', 0) calc_ta = calc_total_assets(pf) calc_mv = calc_total_mv(pf.get('holdings', [])) test("total_assets stored ≈ calculated", abs(stored_ta - calc_ta) < 500, f"stored={stored_ta:.2f} calc={calc_ta:.2f} diff={abs(stored_ta-calc_ta):.1f}") test("total_mv stored ≈ calculated", abs(stored_mv - calc_mv) < 500, f"stored={stored_mv:.2f} calc={calc_mv:.2f}") test("total_assets > 0", stored_ta > 0) test("total_mv > 0", stored_mv > 0) test("frozen_cash 已清零", pf.get('frozen_cash', 0) == 0) # ── 6. P&L ── print("\n--- 6. P&L ---") import sqlite3 db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db') rows = db.execute("SELECT code, name, cost, price, shares, currency FROM holdings WHERE is_active=1 AND shares>0").fetchall() pnl_issues = [] for r in rows: code, name, cost, price, shares, curr = r if not cost or cost <= 0: continue if not price or price <= 0: continue pnl_pct = (price - cost) / cost * 100 # 港股 P&L 应在 HKD 范围内合理(-99% ~ +1000%) if is_hk_stock(str(code)): if pnl_pct < -95 or pnl_pct > 500: pnl_issues.append(f"{code} {name}: P&L={pnl_pct:.1f}% (HKD)") test("港股P&L 合理性", len(pnl_issues) == 0, "; ".join(pnl_issues)) db.close() # ── 7. DB 完整性 ── print("\n--- 7. DB 完整性 ---") db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db') n_holds = db.execute("SELECT COUNT(*) FROM holdings WHERE is_active=1").fetchone()[0] n_strat = db.execute("SELECT COUNT(*) FROM holding_strategies WHERE status IN ('active','updated')").fetchone()[0] n_wl = db.execute("SELECT COUNT(*) FROM watchlist_stocks WHERE is_active=1").fetchone()[0] test("holdings 记录数", n_holds > 0, str(n_holds)) test("holding_strategies 记录数", n_strat > 0, str(n_strat)) # 检查 cost=0 且 shares>0 的 bug zero_cost = db.execute("SELECT code, name FROM holdings WHERE is_active=1 AND shares>0 AND (cost IS NULL OR cost=0)").fetchall() if zero_cost: names = [f"{r[0]} {r[1]}" for r in zero_cost] results.append(f" ⚠️ cost=0 持仓: {'; '.join(names)} (需从 holding.xls 重新导入)") else: test("无 cost=0 持仓", True) db.close() # ── 8. JSON 残留 ── print("\n--- 8. JSON 残留 ---") import glob, re json_refs = [] exclude_patterns = ['mo_config.py', '__pycache__', 'test_', 'inspect_', 'check_', 'verify_', 'diagnose_', 'audit_', 'deep_', 'rollback_', 'close_', 'run_all', 'migrate_all'] for pattern in ['*.py', 'scripts/*.py']: for f in glob.glob(f'/home/hmo/MoFin/{pattern}'): if any(x in f for x in exclude_patterns): continue try: with open(f) as fh: content = fh.read() # Only catch actual I/O: json.load(open( + json filename for kw in ['portfolio.json', 'decisions.json', 'watchlist.json']: # Pattern: json.load and json.dump with open if re.search(rf'json\.(load|dump).*{kw}', content): for i, line in enumerate(content.split('\n')): if kw in line and ('json.load' in line or 'json.dump' in line) and not line.strip().startswith('#'): json_refs.append(f"{f}:{i+1}: {line.strip()[:60]}") except: pass test("无活跃 JSON I/O", len(json_refs) == 0, f"{len(json_refs)} refs" + (f" e.g. {json_refs[0]}" if json_refs else "")) # ── 9. LLM Prompt ── print("\n--- 9. LLM Prompt ---") prompt_json_issues = [] try: for fpath in glob.glob('/home/hmo/MoFin/prompt_manager/*.py'): with open(fpath) as fh: content = fh.read() if '.json' not in content: continue for i, line in enumerate(content.split('\n')): s = line.strip() if s.startswith('#') or not s: continue if '.json' in s and any(x in s for x in ['evaluation.json', 'accuracy_stats.json', 'decisions.json', 'portfolio.json']): prompt_json_issues.append(f"{fpath}:{i+1}: {s[:60]}") test("prompt_manager 无 JSON 引用", len(prompt_json_issues) == 0, f"{len(prompt_json_issues)} issues: {prompt_json_issues[0] if prompt_json_issues else ''}") except Exception as e: test("LLM prompt 检查", False, str(e)[:80]) # ── 10. API ── print("\n--- 10. API ---") import urllib.request, json try: r = urllib.request.urlopen("http://localhost:8899/api/portfolio", timeout=5) data = json.loads(r.read()) test("GET /api/portfolio", True, f"total_assets={data.get('total_assets')}") except Exception as e: test("GET /api/portfolio", False, str(e)[:60]) try: r = urllib.request.urlopen("http://localhost:8899/api/decisions", timeout=5) data = json.loads(r.read()) test("GET /api/decisions", True, f"total={data.get('total')}") except Exception as e: test("GET /api/decisions", False, str(e)[:60]) # ── 11. Cron 脚本可导入 ── print("\n--- 11. Cron 可导入 ---") cron_scripts = ['price_monitor', 'market_watch', 'market_screener', 'system_audit', 'data_freshness'] for s in cron_scripts: try: __import__(s) test(f"{s} 可导入", True) except Exception as e: test(f"{s} 可导入", False, str(e)[:50]) # ── 12. price_monitor dry run ── print("\n--- 12. price_monitor 函数测试 ---") try: from price_monitor import refresh_data_prices, is_hk_stock as pm_is_hk test("price_monitor.is_hk_stock 一致", pm_is_hk('01888') == is_hk_stock('01888')) except Exception as e: test("price_monitor 函数", False, str(e)[:80]) # ── 结果 ── print(f"\n{'='*50}") print(f"结果: {passed} passed, {failed} failed ({passed+failed} total)") for r in results: print(r)