diff --git a/scripts/cron_to_xmpp.py b/scripts/cron_to_xmpp.py deleted file mode 100644 index 3a5cfca..0000000 --- a/scripts/cron_to_xmpp.py +++ /dev/null @@ -1,359 +0,0 @@ -#!/usr/bin/env python3 -"""cron_to_xmpp.py — 智能cron报告推送 - -只推送LLM驱动的分析报告(有实质内容),不推送纯脚本输出。 -关键规则: -1. 跳过 no_agent 脚本的输出(价格监控、数据同步等机器数据) -2. 跳过自己的输出目录(30908cdc44a8),避免循环推送 -3. 正文太短(<20字)或只有 [SILENT] 的不推 -4. 超时自动跳过,不影响后续 -""" -import json -import subprocess -import re -import sys -from datetime import datetime -from pathlib import Path - -# 使用绝对路径,不受 profile 环境变量影响 -REAL_HOME = Path("/home/hmo") - -# 扫描目录 -CRON_DIRS = [ - REAL_HOME / ".hermes" / "cron" / "output", - REAL_HOME / ".hermes" / "profiles" / "position-analyst" / "cron" / "output", -] -JOURNAL = REAL_HOME / ".hermes" / "cron" / ".relay_journal.json" -SILENT_STATS = REAL_HOME / ".hermes" / "cron" / ".silent_daily_count.json" -MAX_AGE_HOURS = 6 # 只推送6小时内的报告,防止清journal后爆历史 - - -def load_no_agent_job_ids(): - """从两个profile的jobs.json中读取所有no_agent=true的job ID""" - ids = set() - for jobs_path in [ - REAL_HOME / ".hermes" / "cron" / "jobs.json", - REAL_HOME / ".hermes" / "profiles" / "position-analyst" / "cron" / "jobs.json", - ]: - try: - with open(jobs_path) as f: - data = json.load(f) - for j in data.get("jobs", []): - if j.get("no_agent"): - ids.add(j["id"]) - except: - pass - return ids - - -# 硬编码保底(如果 jobs.json 读不到) -SKIP_DIRS = { - "30908cdc44a8", # cron-推XMPP中继自身输出 - "health", # 健康检查输出 -} - -FROM = "zhiwei@yoin.fun" -TO = "hmo@yoin.fun" - - -def load_journal(): - try: - return set(json.loads(JOURNAL.read_text())) - except: - return set() - - -def save_journal(entries): - JOURNAL.write_text(json.dumps(sorted(entries))) - - -def is_pure_script_output(content): - """判断文件是否是纯脚本的机器输出(不是LLM报告)""" - # LLM报告的特征:有 ## Response 节(包含agent的回复) - if "## Response" in content: - return False - # 以 # Cron Job: 开头但没有 ## Response 的可能是脚本输出 - if content.startswith("# Cron Job:"): - return True - # 价格监控的触发输出 - if content.startswith("🔔") and "⏱" in content: - return True - # 健康检查报告 - if "MoFin 系统健康检查" in content: - return True - # 结构化数据标签(价格监控的机器数据) - if "" in content: - return True - # no_agent 脚本的输出特征(Hermes自动添加的header) - if "**Mode:** no_agent (script)" in content: - return True - return False - - -def validate_report_body(body): - """质量检查 — 不拦截,返回改进建议""" - issues = [] - text = body.strip() - - if "重点推荐操作" not in text: - issues.append("缺少【重点推荐操作】区域(如无需操作可写「无」)") - - if "风险关注" not in text: - issues.append("缺少【风险关注】区域(如无风险可写「无」)") - - if len(text) > 600: - issues.append(f"报告偏长({len(text)}字),建议压缩到600字以内") - - fuzzy = re.findall(r"可关注|可考虑|建议观察|试试|谨慎关注|择机|根据情况", text) - if fuzzy: - issues.append(f"含模糊词: {', '.join(set(fuzzy))},建议替换为明确操作指令") - - if re.search(r"如果.*就.*如果.*就|若.*则.*若.*则", text): - issues.append("含选择题句式,建议只给一个确定建议") - - return issues - - -def send_feedback(issues, job_name): - """发送质量反馈给知微自己""" - from xml.sax.saxutils import escape - feedback = f"[自我反馈] 报告质量检查发现以下问题,下次注意:\n" + "\n".join(f"• {i}" for i in issues) - safe = escape(feedback) - stanza = ( - f"" - f"{safe}" - ) - try: - subprocess.run( - ["docker", "exec", "ejabberd", "ejabberdctl", - "send_stanza", FROM, FROM, stanza], - capture_output=True, timeout=10, text=True, - ) - except: - pass - - -def extract_body(path): - content = path.read_text(encoding="utf-8", errors="replace") - - if is_pure_script_output(content): - return None - - parts = content.split("## Response") - body = parts[1].strip() if len(parts) > 1 else content.strip() - body = re.sub(r'^#.*?\n', '', body, flags=re.MULTILINE).strip() - body = re.sub(r'\n?\s*.*?\s*', '', body, flags=re.DOTALL).strip() - body = re.sub(r'\*\*(.*?)\*\*', r'\1', body) - - # 去掉agent的思考过程("Now let me...", "Let me...", "Now I have..."等开头) - body = re.sub(r'^(Now let me|Let me|I need|I will|First let me|First,? I|Now I have|Here.i|I.ll|I.m ).*?\n\n', '', body, flags=re.DOTALL).strip() - # 去掉末尾的思考尾巴 - body = re.sub(r'\n\s*(Now I|This |I have |I used |The report|The data).*?$', '', body, flags=re.DOTALL).strip() - # 如果只剩"好的"、"收到"等短回应,丢弃 - if re.match(r'^[\u4e00-\u9fff,。]{1,10}$', body): - return None - - if not body: - return None - - # [SILENT] → 不推送(计数的逻辑在 scan() 中处理) - if "[SILENT]" in body: - return None - - if len(body) < 20: - return None - - return body - - -def send(body): - from xml.sax.saxutils import escape - safe = escape(f"【知微】{body}") - stanza = ( - f"" - f"{safe}" - ) - # 重试3次 - for attempt in range(3): - try: - r = subprocess.run( - ["docker", "exec", "ejabberd", "ejabberdctl", - "send_stanza", FROM, TO, stanza], - capture_output=True, timeout=10, text=True, - ) - if r.stderr and "error" in r.stderr.lower(): - print(f"send error (attempt {attempt+1}): {r.stderr.strip()[:100]}", file=sys.stderr) - if attempt < 2: - continue - return False - return r.returncode == 0 - except subprocess.TimeoutExpired: - print(f"send timeout (attempt {attempt+1})", file=sys.stderr) - if attempt < 2: - continue - return False - except Exception as e: - print(f"send err (attempt {attempt+1}): {e}", file=sys.stderr) - if attempt < 2: - continue - return False - return False - - -def validate_format(body): - """格式检查 — 只记录不拦截,标记改进点""" - text = body.strip() - issues = [] - - # 必含区域检查 - has_key = "重点推荐操作" in text - has_risk = "风险关注" in text - has_rest = "其余持仓" in text or "今日关注" in text - if not has_key: - issues.append("缺【重点推荐操作】区域") - if not has_risk: - issues.append("缺【风险关注】区域") - - # 超长提醒 - if len(text) > 600: - issues.append(f"报告偏长({len(text)}字),建议压缩到600字内") - - # 模糊词提醒 - fuzzy = re.findall(r"可关注|可考虑|建议观察|试试|谨慎关注|择机|根据情况", text) - if fuzzy: - issues.append(f"含模糊词({', '.join(list(set(fuzzy))[:3])}),应给唯一结论") - - # 选择题句式提醒 - if re.search(r"如果.*就|若.*则|可以.*也可以", text): - issues.append("含选择题句式,应给唯一建议") - - return text, issues # 始终通过,issues 为空就是干净 - - -def load_silent_stats(): - """加载当日静默统计""" - try: - return json.loads(SILENT_STATS.read_text()) - except: - return {"date": "", "silent": 0, "short": 0, "script": 0} - - -def save_silent_stats(stats): - SILENT_STATS.write_text(json.dumps(stats)) - - -def send_silent_summary(stats): - """发送当日静默报告汇总""" - parts = [] - if stats.get("silent", 0) > 0: - parts.append(f"静默[SILENT] {stats['silent']}次") - if stats.get("short", 0) > 0: - parts.append(f"过短(<20字) {stats['short']}次") - if stats.get("script", 0) > 0: - parts.append(f"脚本输出 {stats['script']}次") - - if not parts: - body = "【每日汇总】今日所有cron报告已正常送达,无被拦截的报告。" - else: - body = "【每日汇总】今日以下cron报告未送达(已拦截):\n" + "\n".join(f"• {p}" for p in parts) + "\n\n无操作信号的报告正常静默,有操作信号的都已送达。" - - send(body) - - -def scan(): - processed = load_journal() - new = set() - n_pushed = 0 - n_silent = 0 - n_short = 0 - n_script = 0 - no_agent_ids = load_no_agent_job_ids() - skip_all = SKIP_DIRS | no_agent_ids - - for cron_dir in CRON_DIRS: - if not cron_dir.exists(): - continue - - for d in sorted(cron_dir.iterdir()): - if not d.is_dir(): - continue - if d.name in skip_all: - continue - - for f in sorted(d.iterdir()): - if f.suffix != ".md": - continue - key = str(f.resolve()) - if key in processed or key in new: - continue - new.add(key) - - # 跳过超过MAX_AGE_HOURS小时的旧文件 - age_hours = (datetime.now() - datetime.fromtimestamp(f.stat().st_mtime)).total_seconds() / 3600 - if age_hours > MAX_AGE_HOURS: - continue - - content = f.read_text(encoding="utf-8", errors="replace") - - # 提前判断脚本输出 - if is_pure_script_output(content): - n_script += 1 - continue - - parts = content.split("## Response") - body = parts[1].strip() if len(parts) > 1 else content.strip() - body = re.sub(r'^#.*?\n', '', body, flags=re.MULTILINE).strip() - body = re.sub(r'\n?\s*.*?\s*', '', body, flags=re.DOTALL).strip() - body = re.sub(r'\*\*(.*?)\*\*', r'\1', body) - - if not body: - n_short += 1 - continue - - # SILENT → 拦截,记数(在长度检查之前,因为 [SILENT] 只有8字符) - if "[SILENT]" in body: - n_silent += 1 - continue - - if len(body) < 20: - n_short += 1 - continue - - # 格式校验 — 记录改进点,不拦截 - ok_body, issues = validate_format(body) - - n_pushed += 1 - ok_sent = send(body) - if not ok_sent: - print(f" {d.name}: send failed", file=sys.stderr) - if issues: - print(f" {d.name}/{f.name}: 改进建议: {'; '.join(issues)}", file=sys.stderr) - - if new: - save_journal(processed | new) - - # 保存当日汇总到文件(供16:30汇总用) - today = datetime.now().strftime("%Y-%m-%d") - stats = load_silent_stats() - if stats.get("date") != today: - stats = {"date": today, "silent": 0, "short": 0, "script": 0} - stats["silent"] += n_silent - stats["short"] += n_short - stats["script"] += n_script - save_silent_stats(stats) - - # 16:30~16:35 发送当日汇总(收盘后) - now = datetime.now() - hhmm = now.hour * 60 + now.minute - if 990 <= hhmm <= 995: # 16:30~16:35 - send_silent_summary(stats) - - log = f"推送{n_pushed}份,静默拦截{n_silent}份,过短{n_short}份,跳过脚本{n_script}份" - print(log, file=sys.stderr) - return n_pushed - - -if __name__ == "__main__": - scan() diff --git a/scripts/regenerate_strategies.py b/scripts/regenerate_strategies.py deleted file mode 100644 index 16c2856..0000000 --- a/scripts/regenerate_strategies.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python3 -"""批量再生所有持仓+自选策略,结合技术面支撑/压力位""" - -import json -import sys -sys.path.insert(0, '/home/hmo/web-dashboard') - -from technical_analysis import full_analysis -from strategy_lifecycle import reassess_strategy - -PF = '/home/hmo/web-dashboard/data/portfolio.json' -WL = '/home/hmo/web-dashboard/data/watchlist.json' - -def main(): - # 持仓 - pf = json.load(open(PF)) - for s in pf['holdings']: - code = s['code'] - name = s['name'] - price = s.get('price', 0) - cost = s.get('cost', 0) - shares = s.get('shares', 0) - - if not price: - continue - - print(f" {name}({code}) 现价{price} 成本{cost}...", end=' ') - - try: - tech = full_analysis(code) - except: - tech = None - - result = reassess_strategy( - code, name, price, cost, shares, - current_action=s.get('analysis', {}).get('action', '') - ) - - if 'analysis' not in s: - s['analysis'] = {} - s['analysis']['stop_loss'] = result['stop_loss'] - s['analysis']['take_profit'] = result['take_profit'] - s['analysis']['entry_low'] = result['entry_low'] - s['analysis']['entry_high'] = result['entry_high'] - s['analysis']['action'] = result['action'] - s['analysis']['status'] = result['status'] - s['analysis']['reassessed_at'] = result['reassessed_at'] - - print(f"损{result['stop_loss']} 盈{result['take_profit']} 区{result['entry_low']}~{result['entry_high']}") - - json.dump(pf, open(PF, 'w'), ensure_ascii=False, indent=2) - print(f"\n持仓策略已更新: {len(pf['holdings'])} 条") - - # 自选股 - 简单重新计算买入区 - wl = json.load(open(WL)) - updated = 0 - for s in wl['stocks']: - code = s['code'] - price = s.get('price', 0) - if not price: - continue - tech = None - try: - tech = full_analysis(code) - except: - pass - - # 买入区 = 弱支撑~弱压力 - if tech: - sr = tech.get('support_resistance', {}) - ws = sr.get('weak_support') or price * 0.95 - wr = sr.get('weak_resist') or price * 1.05 - else: - ws = price * 0.92 - wr = price * 1.08 - - if 'analysis' not in s: - s['analysis'] = {} - s['analysis']['buy_low'] = round(ws, 2) - s['analysis']['buy_high'] = round(wr, 2) - if tech: - s['analysis']['tech_levels'] = { - 'strong_support': sr.get('strong_support'), - 'weak_support': sr.get('weak_support'), - 'weak_resist': sr.get('weak_resist'), - 'strong_resist': sr.get('strong_resist'), - } - updated += 1 - print(f" {s['name']}({code}) 买入区={ws:.2f}~{wr:.2f}") - - json.dump(wl, open(WL, 'w'), ensure_ascii=False, indent=2) - print(f"\n自选策略已更新: {updated} 条") - print("\n✅ 全部策略再生完成") - -if __name__ == '__main__': - main() diff --git a/scripts/session_to_cron_bridge.py b/scripts/session_to_cron_bridge.py deleted file mode 100644 index 76d7621..0000000 --- a/scripts/session_to_cron_bridge.py +++ /dev/null @@ -1,216 +0,0 @@ -#!/usr/bin/env python3 -"""session_to_cron_bridge.py — 将Hermes session DB中的cron报告写到cron/output目录 - -Hermes cron jobs(如快速盯盘)将LLM输出存在 session DB (state.db) 中。 -cron_to_xmpp.py 扫描 ~/.hermes/cron/output/ 目录的 .md 文件推送到XMPP。 -这个脚本弥补这个缺口:从state.db读取最新的cron输出,生成.md文件。 - -工作方式: -1. 查询 state.db 中最近的 cron 会话(source='cron') -2. 提取 assistant 的最后一条非空消息 -3. 与 relay journal 对比去重 -4. 新消息写入 cron/output// 目录 -5. cron_to_xmpp.py 自然捡起并推送 -""" - -import json -import sqlite3 -import subprocess -import re -import sys -from datetime import datetime -from pathlib import Path - -REAL_HOME = Path("/home/hmo") -PROFILE = "position-analyst" - -# 要中继的 cron job ID 列表(需要推送到 XMPP 的) -RELAY_JOBS = { - "62a2ba59f7ff": "快速盯盘-15分钟", - "e27e2e92ed80": "知识萃取-盘后", - "9d1236d8a07f": "策略评估-每日", - "5dde4e1a42ce": "分析师-持仓复查", -} - -# 输出目录(与 cron_to_xmpp.py 一致) -# 注意:~/.hermes 是 symlink 到 /home/hmo/.hermes/profiles/position-analyst/home/.hermes -# cron_to_xmpp.py 使用绝对路径 REAL_HOME / ".hermes" / "cron" / "output" -# 所以这里必须用绝对路径,不要相信 ~/.hermes 的解析 -OUTPUT_DIRS = [ - REAL_HOME / ".hermes" / "cron" / "output", - REAL_HOME / ".hermes" / "profiles" / PROFILE / "cron" / "output", -] - -JOURNAL = REAL_HOME / ".hermes" / "cron" / ".relay_journal.json" -STATE_DB = REAL_HOME / ".hermes" / "profiles" / PROFILE / "state.db" - -MAX_AGE_MINUTES = 70 # 只处理最近70分钟内的报告 -TRACK_FILE = REAL_HOME / ".hermes" / "cron" / ".bridge_track.json" # 追踪已桥接的session - - -def load_track(): - try: - return set(json.loads(TRACK_FILE.read_text())) - except: - return set() - - -def save_track(entries): - TRACK_FILE.write_text(json.dumps(sorted(entries))) - - -def load_journal(): - try: - return set(json.loads(JOURNAL.read_text())) - except: - return set() - - -def save_journal(entries): - JOURNAL.write_text(json.dumps(sorted(entries))) - - -def ensure_output_dirs(): - for d in OUTPUT_DIRS: - d.mkdir(parents=True, exist_ok=True) - for job_id in RELAY_JOBS: - (d / job_id).mkdir(exist_ok=True) - - -def extract_report_content(content): - """从assistant消息中提取报告正文""" - if not content or content.strip() in ("", " ", "\n", "\n\n"): - return None - - text = content.strip() - - # 跳过太短的消息 - if len(text) < 20: - return None - - # 跳过 [SILENT] - if "[SILENT]" in text: - return None - - # 跳过思考过程(只留下实际报告内容) - # 如果消息以"Now let me"/"Let me"/"I need"等开头,尝试找后面的报告正文 - lines = text.split('\n') - report_lines = [] - in_report = False - for line in lines: - if not in_report: - # 报告特征:以【开头 或 包含📊 或 包含【知微】 - if any(x in line for x in ["【", "📊", "【知微", "【⚡", "## "]): - in_report = True - report_lines.append(line) - else: - report_lines.append(line) - - if report_lines: - text = '\n'.join(report_lines) - - if len(text) < 20: - return None - - return text - - -def scan(): - processed = load_journal() - tracked = load_track() - new = set() - n_written = 0 - - if not STATE_DB.exists(): - print(f"state.db not found: {STATE_DB}", file=sys.stderr) - return - - conn = sqlite3.connect(str(STATE_DB)) - conn.row_factory = sqlite3.Row - cur = conn.cursor() - - now = datetime.now() - - for job_id, job_name in RELAY_JOBS.items(): - # Find recent sessions for this job - cur.execute(''' - SELECT id, started_at, message_count, source - FROM sessions - WHERE id LIKE ? - ORDER BY started_at DESC - LIMIT 10 - ''', (f'cron_{job_id}_%',)) - - sessions = cur.fetchall() - - for s in sessions: - session_id = s['id'] - - # Skip already bridged sessions - if session_id in tracked: - continue - - started_at = datetime.fromtimestamp(s['started_at']) if s['started_at'] else now - - # Skip too old sessions - age_minutes = (now - started_at).total_seconds() / 60 - if age_minutes > MAX_AGE_MINUTES: - continue - - # Find the last assistant message - cur.execute(''' - SELECT content, timestamp - FROM messages - WHERE session_id = ? AND role = 'assistant' - AND content NOT IN ('', ' ', '\n', '\n\n', '\n\n\n') - ORDER BY timestamp DESC - LIMIT 1 - ''', (session_id,)) - - msg = cur.fetchone() - if not msg: - tracked.add(session_id) - continue - - content = msg['content'].strip() - report = extract_report_content(content) - if not report: - tracked.add(session_id) - continue - - # Mark as tracked even before writing - tracked.add(session_id) - - # Generate a unique key for this report - ts = datetime.fromtimestamp(msg['timestamp']).strftime('%Y%m%d_%H%M%S') if msg['timestamp'] else started_at.strftime('%Y%m%d_%H%M%S') - filename = f"{job_name}_{ts}.md" - - for out_dir in OUTPUT_DIRS: - out_path = out_dir / job_id / filename - key = str(out_path.resolve()) - - if key in processed or key in new: - continue - - # Write the report as an .md file (matching cron_to_xmpp.py format) - md_content = f"# Cron Job: {job_name} ({session_id})\n\n## Response\n\n{report}\n" - out_path.write_text(md_content, encoding='utf-8') - new.add(key) - n_written += 1 - print(f" Written: {out_path.relative_to(REAL_HOME)}", file=sys.stderr) - - conn.close() - - if tracked: - save_track(tracked) - - print(f"桥接完成:写入{n_written}份新报告", file=sys.stderr) - # 桥接脚本只负责写入 .md 文件,不做去重追踪 - # 这样可以避免重复推送的复杂问题 - # 可能每次运行会写重复的文件,但cron_to_xmpp.py会用journal去重 - - print(f"桥接完成:写入{n_written}份新报告", file=sys.stderr) - - -if __name__ == "__main__": - scan() diff --git a/scripts/system_health_check.py b/scripts/system_health_check.py deleted file mode 100644 index 60244f7..0000000 --- a/scripts/system_health_check.py +++ /dev/null @@ -1,260 +0,0 @@ -#!/usr/bin/env python3 -"""system_health_check.py — MoFin 系统健康检查 - -每日运行,检查所有组件是否正常工作。 -输出报告,有问题才推送。 -""" -import json, os, sys, subprocess -from datetime import datetime, timedelta -from pathlib import Path - -DATA_DIR = Path("/home/hmo/web-dashboard/data") -DECISIONS_PATH = DATA_DIR / "decisions.json" -PORTFOLIO_PATH = DATA_DIR / "portfolio.json" -EVENTS_PATH = DATA_DIR / "price_events.json" -EVALUATION_PATH = DATA_DIR / "evaluation.json" -ACCURACY_PATH = DATA_DIR / "accuracy_stats.json" -CRON_JOBS = "/home/hmo/.hermes/cron/jobs.json" -POSITION_CRON = "/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json" - -def check(ok, msg): - icon = "✅" if ok else "⚠️" - return f" {icon} {msg}" - -def load_json(path, default=None): - try: - with open(path) as f: - return json.load(f) - except: - return {} if default is None else default - -def check_cron_jobs(path, label): - issues = [] - try: - d = load_json(path, {"jobs": []}) - for j in d.get("jobs", []): - name = j.get("name", "?") - enabled = j.get("enabled", True) - last = j.get("last_run_at", "") - status = j.get("last_status", "") - if not enabled: - issues.append(f"{name} 已禁用") - elif not last: - issues.append(f"{name} 从未运行") - elif status != "ok": - issues.append(f"{name} 上次状态={status}") - return len(d.get("jobs", [])), issues - except: - return 0, ["无法读取"] - -def run(): - now = datetime.now() - issues = [] - ok_count = 0 - warn_count = 0 - - lines = [f"MoFin 系统健康检查 | {now.strftime('%Y-%m-%d %H:%M')}"] - lines.append("") - - # 1. 进程检查 - lines.append("【进程】") - procs = { - "mofin-dashboard": "mofin-dashboard", - "xmpp-zhiwei": "xmpp_zhiwei_bot", - "ejabberd": "ejabberd", - } - for name, pattern in procs.items(): - # 先查 systemd,再查 pgrep - r = subprocess.run(["systemctl", "is-active", f"{pattern}.service"], capture_output=True, text=True, timeout=5) - alive = r.stdout.strip() == "active" - if not alive: - r2 = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5) - alive = r2.returncode == 0 - lines.append(check(alive, f"{name} {'运行中' if alive else '已停止'}")) - if not alive: issues.append(f"{name} 进程不存在"); warn_count += 1 - else: ok_count += 1 - - # 2. 端口检查 - lines.append("") - lines.append("【端口】") - ports = {"8899": "Dashboard", "5222": "ejabberd", "8643": "知微Gateway"} - for port, name in ports.items(): - r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5) - listening = f":{port}" in r.stdout - lines.append(check(listening, f"{name} :{port} {'监听中' if listening else '未监听'}")) - if not listening: issues.append(f"{name} 端口{port}未监听"); warn_count += 1 - else: ok_count += 1 - - # 3. 数据文件检查 - lines.append("") - lines.append("【数据文件】") - files = { - "portfolio.json": PORTFOLIO_PATH, - "watchlist.json": DATA_DIR / "watchlist.json", - "decisions.json": DECISIONS_PATH, - "market.json": DATA_DIR / "market.json", - "price_events.json": EVENTS_PATH, - "evaluation.json": EVALUATION_PATH, - "accuracy_stats.json": ACCURACY_PATH, - } - for name, path in files.items(): - exists = path.exists() - size = path.stat().st_size if exists else 0 - lines.append(check(exists and size > 10, f"{name} {'存在' if exists else '缺失'} ({size}B)")) - if not exists or size < 10: - issues.append(f"{name} 缺失或为空") - warn_count += 1 - else: - ok_count += 1 - - # 4. 价格事件统计 - lines.append("") - lines.append("【价格事件】") - events = load_json(EVENTS_PATH, {"events": []}) - ev_list = events.get("events", []) - today_events = [e for e in ev_list if e.get("date") == now.strftime("%Y-%m-%d")] - lines.append(check(len(ev_list) > 0, f"历史事件: {len(ev_list)}条")) - lines.append(check(len(today_events) > 0, f"今日事件: {len(today_events)}条")) - if len(ev_list) == 0: - issues.append("price_events.json 无事件记录,price_monitor可能未触发过") - warn_count += 1 - else: - ok_count += 1 - - # 5. 策略评估统计 - lines.append("") - lines.append("【策略评估】") - evals = load_json(EVALUATION_PATH, {"strategies": []}) - s_list = evals.get("strategies", []) - lines.append(check(len(s_list) > 0, f"已评估策略: {len(s_list)}条")) - if len(s_list) > 0: - avg = sum(s.get("score", 0) for s in s_list) / len(s_list) - lines.append(check(avg > 0, f"平均评分: {avg:.1f}/10")) - ok_count += 1 - else: - issues.append("evaluation.json 无评估数据") - warn_count += 1 - - # 6. 建议记录统计 - lines.append("") - lines.append("【建议记录】") - decisions = load_json(DECISIONS_PATH, {"decisions": []}) - total_advice = sum(len(d.get("advice_timeline", [])) for d in decisions.get("decisions", [])) - lines.append(check(total_advice > 0, f"建议记录: {total_advice}条")) - if total_advice == 0: - issues.append("所有策略建议记录为空") - warn_count += 1 - else: - ok_count += 1 - - # 7. Cron jobs - lines.append("") - lines.append("【Cron Jobs】") - cnt, cron_issues = check_cron_jobs(CRON_JOBS, "default") - lines.append(check(cnt > 0, f"default profile: {cnt}个job")) - for ci in cron_issues: - lines.append(f" ⚠️ {ci}") - warn_count += 1 - if cnt == 0: warn_count += 1 - cnt2, cron_issues2 = check_cron_jobs(POSITION_CRON, "position-analyst") - lines.append(check(cnt2 > 0, f"position-analyst: {cnt2}个job")) - for ci in cron_issues2: - lines.append(f" ⚠️ {ci}") - warn_count += 1 - if cnt2 == 0: warn_count += 1 - - # 8. 数据新鲜度 - lines.append("") - lines.append("【数据新鲜度】") - # 各数据文件的合理最大陈旧时间(小时) - freshness_thresholds = { - "portfolio.json": 24, # 每日有数据即可 - "decisions.json": 48, # 策略参数更新频率较低 - "multi_tf_cache.json": 24, # K线缓存每日更新 - "macro_context.json": 24, # 宏观数据每日2次 - "market.json": 48, # 行业数据每日更新 - "strategy_staleness_report.json": 24, # 时效性报告每日生成 - } - data_files = { - "portfolio.json": PORTFOLIO_PATH, - "decisions.json": DECISIONS_PATH, - "multi_tf_cache.json": DATA_DIR / "multi_tf_cache.json", - "macro_context.json": DATA_DIR / "macro_context.json", - "market.json": DATA_DIR / "market.json", - "strategy_staleness_report.json": DATA_DIR / "strategy_staleness_report.json", - } - for name, path in data_files.items(): - if not path.exists(): - lines.append(check(False, f"{name} 缺失")) - issues.append(f"{name} 文件缺失") - warn_count += 1 - continue - mtime = datetime.fromtimestamp(path.stat().st_mtime) - hours_ago = (now - mtime).total_seconds() / 3600 - threshold = freshness_thresholds.get(name, 24) - fresh = hours_ago < threshold - time_str = f"{hours_ago:.0f}h前" if hours_ago >= 1 else f"{hours_ago*60:.0f}分钟前" - lines.append(check(fresh, f"{name} 更新于 {time_str} (阈值{threshold}h)")) - if not fresh: - issues.append(f"{name} 超过{threshold}h未更新(最近更新:{time_str})") - warn_count += 1 - else: - ok_count += 1 - - # 数据管道组件检查 - lines.append("") - lines.append("【数据管道】") - pipe_checks = [ - ("再生器(regenerate_all)", r"strategy_lifecycle\.py"), - ("市场采集(market_watch)", r"market_watch\.py"), - ("宏观采集(macro)", r"macro_context_collector\.py"), - ] - for pname, ppattern in pipe_checks: - r = subprocess.run(["pgrep", "-f", ppattern], capture_output=True, timeout=5) - if r.returncode == 0: - lines.append(check(True, f"{pname} 进程存在")) - ok_count += 1 - else: - # no_agent脚本不常驻,不报warn - lines.append(" 📎 {} 无常驻进程(no_agent脚本按cron调度运行)".format(pname)) - - # 价格数据更新时间检查(盘中应有当日数据) - is_trading_day = now.weekday() < 5 # 周一到周五 - if is_trading_day and now.hour >= 9 and now.hour < 16: - if PORTFOLIO_PATH.exists(): - mtime = datetime.fromtimestamp(PORTFOLIO_PATH.stat().st_mtime) - hours_ago = (now - mtime).total_seconds() / 3600 - has_intraday_data = mtime.date() == now.date() - lines.append(check(has_intraday_data, f"盘中有当日价格数据 {'是' if has_intraday_data else '否'}(最近{mtime.strftime('%H:%M')})")) - if not has_intraday_data: - issues.append(f"盘中交易时段但portfolio.json无今日数据(最近更新{mtime.strftime('%m-%d %H:%M')})") - warn_count += 1 - else: - ok_count += 1 - - # 汇总 - total = ok_count + warn_count - lines.append("") - lines.append(f"总计: ✅ {ok_count}/{total} 正常 | ⚠️ {warn_count}/{total} 需关注") - if issues: - lines.append("") - lines.append("需关注项:") - for i, issue in enumerate(issues[:10], 1): - lines.append(f" {i}. {issue}") - - report = "\n".join(lines) - print(report) - - # 如果有问题,写入报告文件供推送 - if warn_count > 0: - report_path = Path("/home/hmo/.hermes/profiles/position-analyst/cron/output/health") - report_path.mkdir(parents=True, exist_ok=True) - report_file = report_path / f"health_{now.strftime('%Y%m%d_%H%M')}.md" - report_file.write_text(f"# MoFin 系统健康检查\n\n{report}") - print(f"\n报告已写入 {report_file}") - else: - print("\n[SILENT] 一切正常") - - -if __name__ == "__main__": - run() diff --git a/watchlist.json b/watchlist.json deleted file mode 100644 index 19fe5bf..0000000 --- a/watchlist.json +++ /dev/null @@ -1,293 +0,0 @@ -{ - "stocks": [ - { - "code": "02388", - "name": "中银香港", - "price": 47.7, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "", - "reason": "回调-0.95%至47.80,止损46安全,继续持有观察", - "updated_at": "2026-06-02T10:46:11.973827" - } - }, - { - "code": "01888", - "name": "建滔集团", - "price": 58.6, - "analysis": { - "buy_low": "55", - "buy_high": "60", - "position_recommend": "0%", - "reason": "+9.55%暴涨至54.25远超原买入区45~47。今日高55.30低49.52,52周高58.85。建议上调买入区至50~53,等回调再考虑。", - "updated_at": "2026-06-03T17:34:00" - } - }, - { - "code": "01088", - "name": "中国神华", - "price": 46.08, - "analysis": { - "buy_low": "45", - "buy_high": "46", - "position_recommend": "3%", - "reason": "-2.58%走弱至46.02接近买入区间,触及45可建仓。", - "updated_at": "2026-06-02T14:42:56.675227" - } - }, - { - "code": "01211", - "name": "比亚迪股份", - "price": 88.95, - "analysis": { - "buy_low": null, - "buy_high": null, - "position_recommend": null, - "reason": "大涨+5.34%", - "updated_at": "2026-06-02T11:01:41.539019" - } - }, - { - "code": "002594", - "name": "比亚迪", - "price": 90.75, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "0%", - "reason": "已有H股持仓01211不重复建仓。+3.37%至96.81。", - "updated_at": "2026-06-02T14:42:56.675259" - } - }, - { - "code": "09868", - "name": "小鹏汽车-W", - "price": 61.7, - "analysis": { - "buy_low": "60", - "buy_high": "64", - "position_recommend": "3%", - "reason": "今日跌5.3%至68.2,破原买入区70~72。港股科技共振回调,预计仍有下行空间。买入区下调至65~68(原62~65→上调70~72→现再调至65~68)。3%以下轻仓试探。", - "updated_at": "2026-06-03 15:30" - } - }, - { - "code": "688795", - "name": "摩尔线程-U", - "price": 614.5, - "analysis": { - "buy_low": "620", - "buy_high": "650", - "position_recommend": "0%", - "reason": "+8.35%暴涨至667.85远超原买入区580~600。今日高688.69低610.00。建议上调买入区至620~650,等回调再考虑。", - "updated_at": "2026-06-03T17:34:00" - } - }, - { - "code": "688802", - "name": "沐曦股份-U", - "price": 708.9, - "analysis": { - "buy_low": "700", - "buy_high": "740", - "position_recommend": "0%", - "reason": "+14.01%暴涨至756.40远超原买入区620~650。今日高762.70低656.66。建议上调买入区至700~740,等回调再考虑。", - "updated_at": "2026-06-03T17:34:00" - } - }, - { - "code": "06160", - "name": "百济神州", - "price": 161.4, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "", - "reason": "百济神州168.30跌-1.87%止损已击穿,建议清仓", - "updated_at": "2026-06-02T10:46:11.973846" - } - }, - { - "code": "02359", - "name": "药明康德", - "price": 119.2, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "0%", - "reason": "-0.08%持平暂观。", - "updated_at": "2026-06-02T14:42:56.675243" - } - }, - { - "code": "02628", - "name": "中国人寿", - "price": 27.16, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "0%", - "reason": "-1.66%走弱暂观。", - "updated_at": "2026-06-02T14:42:56.675246" - } - }, - { - "code": "00968", - "name": "信义光能", - "price": 2.44, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "0%", - "reason": "-1.55%走弱暂观。", - "updated_at": "2026-06-02T14:42:56.675248" - } - }, - { - "code": "06869", - "name": "长飞光纤", - "price": 248.0, - "analysis": { - "buy_low": "240", - "buy_high": "255", - "position_recommend": "0%", - "reason": "+10.65%暴涨至257.6远超原买入区210~220。今日高273.00低241.20,52周高283。连续大涨后存在回调需求,建议上调买入区至240~255。", - "updated_at": "2026-06-03T17:34:00" - } - }, - { - "code": "02318", - "name": "中国平安", - "price": 56.85, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "0%", - "reason": "+0.39%暂观。", - "updated_at": "2026-06-02T14:42:56.675250" - } - }, - { - "code": "688639", - "name": "华恒生物", - "price": 20.57, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "", - "reason": "华恒生物22.88跌-2.93%止损已破,等待反弹24+减仓", - "updated_at": "2026-06-02T10:46:11.973857" - } - }, - { - "code": "300548", - "name": "博创科技", - "price": 231.8, - "analysis": { - "buy_low": "230", - "buy_high": "235", - "position_recommend": "2%", - "reason": "+7.26%站稳230确认可追,回调至230~235建仓。", - "updated_at": "2026-06-02T14:42:56.675237" - } - }, - { - "code": "300124", - "name": "汇川技术", - "price": 74.09, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "0%", - "reason": "+2.51%温和走强暂观。", - "updated_at": "2026-06-02T14:42:56.675252" - } - }, - { - "code": "01070", - "name": "TCL电子", - "price": 13.71, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "0%", - "reason": "-3.01%走弱至13.86暂观。", - "updated_at": "2026-06-02T14:42:56.675241" - } - }, - { - "code": "09988", - "name": "阿里巴巴-W", - "price": 117.6, - "analysis": { - "buy_low": null, - "buy_high": null, - "position_recommend": null, - "reason": "大涨+5.86%可关注", - "updated_at": "2026-06-02T11:01:41.539017" - } - }, - { - "code": "00700", - "name": "腾讯控股", - "price": 443.8, - "analysis": { - "buy_low": null, - "buy_high": null, - "position_recommend": null, - "reason": "暴涨+7.75%逼近止盈470", - "updated_at": "2026-06-02T11:01:41.539014" - } - }, - { - "code": "02202", - "name": "万科企业", - "price": 2.57, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "", - "reason": "万科企业2.83微涨+1.8%,深套持有", - "updated_at": "2026-06-02T10:46:11.973869" - } - }, - { - "code": "688981", - "name": "中芯国际", - "price": 122.2, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "0%", - "reason": "+0.46%暂观。", - "updated_at": "2026-06-02T14:42:56.675254" - } - }, - { - "code": "01478", - "name": "丘钛科技", - "price": 8.72, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "", - "reason": "丘钛科技10.07跌-1.66%,深套-26.48%暂持", - "updated_at": "2026-06-02T10:46:11.973872" - } - }, - { - "code": "001309", - "name": "德明利", - "price": 609.33, - "analysis": { - "buy_low": "", - "buy_high": "", - "position_recommend": "0%", - "reason": "+1.40%暂观。", - "updated_at": "2026-06-02T14:42:56.675257" - } - } - ], - "updated_at": "2026-06-09T10:37:14.666515" -} \ No newline at end of file diff --git a/web/server.py b/web/server.py deleted file mode 100644 index 35fe79a..0000000 --- a/web/server.py +++ /dev/null @@ -1,910 +0,0 @@ -#!/usr/bin/env python3 -"""MoFin Dashboard - 莫荷持仓情报可视化系统""" - -import base64 -import json -import os -import re -import uuid -import urllib.request -from datetime import datetime -from pathlib import Path - -from flask import Flask, jsonify, send_from_directory, request - -# 提示词管理模块 -from prompt_manager.dashboard_views import register_routes - -app = Flask(__name__, static_folder="static", static_url_path="") - -DATA_DIR = Path(__file__).parent / "data" -UPLOAD_DIR = Path(__file__).parent / "uploads" - -# Hermes Gateway -GATEWAY = "http://localhost:8642/v1/chat/completions" -API_KEY = "hermes123" - - -def _load_json(path, default=None): - try: - with open(path, encoding="utf-8") as f: - return json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - return {} if default is None else default - - -def _save_json(path, data): - os.makedirs(os.path.dirname(path), exist_ok=True) - with open(path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=2) - - -# ── API 路由 ────────────────────────────────────────── - -@app.route("/") -def index(): - return send_from_directory(app.static_folder, "index.html") - - -@app.route("/api/portfolio") -def api_portfolio(): - """持仓列表""" - data = _load_json(DATA_DIR / "portfolio.json") - return jsonify(data) - - -@app.route("/api/watchlist") -def api_watchlist(): - """自选列表""" - data = _load_json(DATA_DIR / "watchlist.json") - return jsonify(data) - - -@app.route("/api/overview") -def api_overview(): - """概览数据""" - portfolio = _load_json(DATA_DIR / "portfolio.json", []) - market = _load_json(DATA_DIR / "market.json", {}) - alerts = _load_json(DATA_DIR / "alerts.json", []) - - total_assets = portfolio.get("total_assets", 0) - stock_value = portfolio.get("stock_value", 0) - cash = portfolio.get("cash", 0) - position_pct = portfolio.get("position_pct", 0) - total_pnl = portfolio.get("total_pnl", 0) - holdings = portfolio.get("holdings", []) - - top_movers = sorted( - [h for h in holdings if abs(h.get("change_pct", 0)) >= 3], - key=lambda x: abs(x.get("change_pct", 0)), - reverse=True, - )[:5] - - return jsonify({ - "total_assets": total_assets, - "stock_value": stock_value, - "cash": cash, - "position_pct": position_pct, - "total_pnl": total_pnl, - "top_movers": top_movers, - "market": market, - "alerts": alerts[:10], - "updated_at": portfolio.get("updated_at", ""), - }) - - -@app.route("/api/reports") -def api_reports(): - """历史报告列表""" - reports_dir = DATA_DIR / "reports" - reports = [] - if reports_dir.exists(): - for f in sorted(reports_dir.iterdir(), reverse=True)[:100]: - if f.suffix == ".json": - data = _load_json(f) - reports.append({ - "id": f.stem, - "title": data.get("title", f.stem), - "type": data.get("type", "未知"), - "created_at": data.get("created_at", ""), - "summary": data.get("summary", ""), - }) - return jsonify(reports) - - -@app.route("/api/report/") -def api_report(report_id): - """单个报告详情""" - # Try exact file first - path = DATA_DIR / "reports" / f"{report_id}.json" - if path.exists(): - return jsonify(_load_json(path)) - # Try prefix match - reports_dir = DATA_DIR / "reports" - if reports_dir.exists(): - for f in reports_dir.iterdir(): - if f.stem.startswith(report_id) and f.suffix == ".json": - return jsonify(_load_json(f)) - return jsonify({"error": "report not found"}), 404 - - -@app.route("/api/stock/") -def api_stock(code): - """个股详情 + 操作建议历史""" - stock_data = _load_json(DATA_DIR / "stocks" / f"{code}.json", {}) - return jsonify(stock_data) - - -@app.route("/api/market") -def api_market(): - """市场观察""" - data = _load_json(DATA_DIR / "market.json", {}) - return jsonify(data) - - -# ── 数据写入API(供 cron/update_data.py 调用) ────────── - -@app.route("/api/update/portfolio", methods=["POST"]) -def update_portfolio(): - data = request.get_json(force=True) - _save_json(DATA_DIR / "portfolio.json", data) - return jsonify({"status": "ok"}) - - -@app.route("/api/update/watchlist", methods=["POST"]) -def update_watchlist(): - data = request.get_json(force=True) - _save_json(DATA_DIR / "watchlist.json", data) - return jsonify({"status": "ok"}) - - -@app.route("/api/update/report", methods=["POST"]) -def update_report(): - data = request.get_json(force=True) - report_id = data.pop("_id", datetime.now().strftime("%Y%m%d_%H%M%S")) - data["created_at"] = data.get("created_at", datetime.now().isoformat()) - _save_json(DATA_DIR / "reports" / f"{report_id}.json", data) - return jsonify({"status": "ok", "id": report_id}) - - -@app.route("/api/update/stock/", methods=["POST"]) -def update_stock(code): - data = request.get_json(force=True) - existing = _load_json(DATA_DIR / "stocks" / f"{code}.json", {}) - history = existing.get("history", []) - if data.get("entry"): - history.append({ - "time": datetime.now().isoformat(), - "price": data.get("price"), - "recommendation": data.get("recommendation"), - "stop_loss": data.get("stop_loss"), - "take_profit": data.get("take_profit"), - "reason": data.get("reason"), - }) - existing.update(data) - existing["history"] = history[-50:] - _save_json(DATA_DIR / "stocks" / f"{code}.json", existing) - return jsonify({"status": "ok"}) - - -@app.route("/api/update/market", methods=["POST"]) -def update_market(): - data = request.get_json(force=True) or {} - _save_json(DATA_DIR / "market.json", data) - return jsonify({"status": "ok"}) - - -# ── 知微分析结果写入API ── -@app.route("/api/analysis/batch", methods=["POST"]) -def analysis_batch(): - """接收知微cron的分析结果,写回持仓/自选JSON的analysis字段""" - data = request.get_json(force=True) or {} - - # 更新持仓 - if "holdings" in data: - pf = _load_json(DATA_DIR / "portfolio.json", {}) - idx = {h["code"]: i for i, h in enumerate(pf.get("holdings", []))} - for item in data["holdings"]: - code = item.get("code", "") - if code not in idx: - continue - h = pf["holdings"][idx[code]] - h["analysis"] = { - "suggestion": item.get("suggestion"), - "stop_loss": item.get("stop_loss"), - "take_profit": item.get("take_profit"), - "buy_zone_low": item.get("buy_zone_low"), - "buy_zone_high": item.get("buy_zone_high"), - "position_suggested": item.get("position_suggested"), - "reason": item.get("reason"), - "updated_at": datetime.now().isoformat(), - } - _save_json(DATA_DIR / "portfolio.json", pf) - - # 更新自选 - if "watchlist" in data: - wl = _load_json(DATA_DIR / "watchlist.json", {}) - idx = {s["code"]: i for i, s in enumerate(wl.get("stocks", []))} - for item in data["watchlist"]: - code = item.get("code", "") - if code not in idx: - continue - s = wl["stocks"][idx[code]] - s["analysis"] = { - "buy_low": item.get("buy_low"), - "buy_high": item.get("buy_high"), - "position_recommend": item.get("position_recommend"), - "reason": item.get("reason"), - "updated_at": datetime.now().isoformat(), - } - _save_json(DATA_DIR / "watchlist.json", wl) - - return jsonify({"status": "ok", "updated_at": datetime.now().isoformat()}) - - -# ── 操作决策库API ── -@app.route("/api/decisions", methods=["GET"]) -def get_decisions(): - """返回决策库数据,统一新旧格式""" - raw = _load_json(DATA_DIR / "decisions.json", {"decisions": []}) - decisions = raw.get("decisions", []) - if not decisions and isinstance(raw, list): - decisions = raw - - # portfolio 用来判断是持仓还是自选 - portfolio = _load_json(DATA_DIR / "portfolio.json", {"holdings": []}) - watchlist = _load_json(DATA_DIR / "watchlist.json", {"stocks": []}) - holding_codes = {h.get("code","") for h in portfolio.get("holdings",[])} - watch_codes = {s.get("code","") for s in watchlist.get("stocks",[])} - - normalized = [] - for d in decisions: - if not isinstance(d, dict): - continue - - # 检测新旧格式:新格式有 stop_loss 顶层字段,旧格式有 trigger 对象 - is_new = "stop_loss" in d and "trigger" not in d - - if is_new: - code = d.get("code", "") - name = d.get("name", "") - price = d.get("price", 0) - sl = d.get("stop_loss") - tp = d.get("take_profit") - el = d.get("entry_low") - eh = d.get("entry_high") - ts = d.get("tech_snapshot", "") - - # type: 持仓还是自选 - if code in holding_codes: - dtype = "持仓策略" - elif code in watch_codes: - dtype = "自选策略" - else: - dtype = "—" - - # 判断 active - status_raw = d.get("status", "") - status = "active" if status_raw in ("active", "updated", "") else "superseded" - - # trigger 对象 - entry_zone_str = "" - if el and eh: - entry_zone_str = f"¥{el}~¥{eh}" - elif el: - entry_zone_str = f"≥¥{el}" - - trigger = {} - if sl: - trigger["stop_loss"] = f"¥{sl}" if isinstance(sl, (int,float)) else str(sl) - if tp: - trigger["take_profit"] = f"¥{tp}" if isinstance(tp, (int,float)) else str(tp) - if entry_zone_str: - trigger["entry_zone"] = entry_zone_str - - # current - current = "" - if price: - current = f"现价¥{price}" if code and not code.startswith(("0","1")) else f"¥{price}" - - # zone_breach - zone_breach = d.get("zone_breach", "") - - # updated_reason - note = d.get("note", "") - timing = d.get("timing_signal", "") - reason_parts = [] - if note: - reason_parts.append(note) - if timing and timing != "neutral": - reason_parts.append(f"时机:{timing}") - if d.get("rr_ratio"): - reason_parts.append(f"盈亏比:{d['rr_ratio']}") - - # advice_timeline - 从新格式重建 - timeline = [] - - entry = { - "code": code, - "name": name, - "type": dtype, - "status": status, - "tag": d.get("tag", ""), - "action": d.get("action", ""), - "trigger": trigger, - "current": current, - "zone_breach": zone_breach, - "updated_reason": " | ".join(reason_parts) if reason_parts else "", - "advice_timeline": timeline, - "changelog": d.get("changelog", []), - "execution": d.get("execution", {}), - "analysis": d.get("analysis", {}), - "tech_snapshot": ts, - "timestamp": d.get("timestamp", ""), - "updated_by": "知微", - } - # 保留原始数据供前端扩展 - entry["_raw_action"] = d.get("action", "") - normalized.append(entry) - else: - # 旧格式:已有 trigger 等字段,直接保留 - entry = dict(d) - # 确保 status 正确 - if entry.get("status") not in ("active", "superseded"): - entry["status"] = "active" - if not entry.get("type"): - code = entry.get("code", "") - if code in holding_codes: - entry["type"] = "持仓策略" - elif code in watch_codes: - entry["type"] = "自选策略" - else: - entry["type"] = "—" - normalized.append(entry) - - # 添加 execution 和 analysis 信息,按执行状态排序 - for n in normalized: - code = n.get("code", "") - # 从原始数据中找到 execution 和 analysis - raw_entry = next((d for d in decisions if isinstance(d, dict) and d.get("code") == code), {}) - n["execution"] = raw_entry.get("execution", {"status": "none"}) - n["analysis"] = raw_entry.get("analysis", {}) - - # 排序规则:推荐>执行中>观察>无标签 - def sort_key(x): - tag = x.get("tag", "") - exec_status = x.get("execution", {}).get("status", "none") - # 标签优先级(current_recommend才靠前,active_manual只是记录不升序) - tag_order = {"current_recommend": 0} - tag_priority = tag_order.get(tag, 50) - # 执行状态优先级 - exec_order = {"partial_exit": 0, "executing": 1, "observing": 2, "none": 99} - exec_priority = exec_order.get(exec_status, 99) - # 组合:先按标签排,再按执行状态排 - return (tag_priority, exec_priority, x.get("code", "")) - - normalized.sort(key=sort_key) - - return jsonify({ - "decisions": normalized, - "total": len(normalized), - "regenerated_at": raw.get("regenerated_at", ""), - }) - - -@app.route("/api/decisions/add", methods=["POST"]) -def add_decision(): - """新增/更新一条决策(新格式)""" - data = request.get_json(force=True) or {} - code = data.get("code", "") - if not code: - return jsonify({"status": "error", "message": "code required"}), 400 - - d = _load_json(DATA_DIR / "decisions.json", {"decisions": []}) - - # 同一股票旧决策标记为superseded - for e in d["decisions"]: - if e["code"] == code and e.get("status") in ("active", "updated"): - e["status"] = "superseded" - - entry = { - "code": code, - "name": data.get("name", ""), - "price": data.get("price", 0), - "action": data.get("action", ""), - "stop_loss": data.get("stop_loss"), - "take_profit": data.get("take_profit"), - "entry_low": data.get("entry_low"), - "entry_high": data.get("entry_high"), - "tech_snapshot": data.get("tech_snapshot", ""), - "timing_signal": data.get("timing_signal", ""), - "rr_ratio": data.get("rr_ratio"), - "tag": data.get("tag", ""), - "note": data.get("note", ""), - "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"), - "updated_reason": data.get("updated_reason", ""), - "status": "updated", - "changelog": data.get("changelog", []), - "execution": data.get("execution", {"status": "none"}), - "analysis": data.get("analysis", {}), - } - d["decisions"].append(entry) - _save_json(DATA_DIR / "decisions.json", d) - return jsonify({"status": "ok", "entry": entry}) - - -@app.route("/api/decisions/tag", methods=["POST"]) -def set_decision_tag(): - """设置/清除某只股票的推荐标签""" - data = request.get_json(force=True) or {} - code = data.get("code", "") - tag = data.get("tag", "") # 'current_recommend', 'active_manual', or '' to clear - if not code: - return jsonify({"status": "error", "message": "code required"}), 400 - - d = _load_json(DATA_DIR / "decisions.json", {"decisions": []}) - found = False - for e in d.get("decisions", []): - if e.get("code") == code: - e["tag"] = tag - e["tag_updated"] = datetime.now().isoformat() - found = True - break - - if not found: - return jsonify({"status": "error", "message": f"stock {code} not found"}), 404 - - _save_json(DATA_DIR / "decisions.json", d) - return jsonify({"status": "ok", "code": code, "tag": tag}) - - -@app.route("/api/decisions/pending") -def get_pending_decisions(): - """返回所有有未确认建议的条目""" - d = _load_json(DATA_DIR / "decisions.json", {"decisions": []}) - pending = [] - for entry in d["decisions"]: - timeline = entry.get("advice_timeline", []) - unconfirmed = [a for a in timeline if a.get("status") in (None, "pending")] - if unconfirmed: - pending.append({ - "code": entry["code"], - "name": entry["name"], - "current": entry.get("current", ""), - "pending_advice": unconfirmed, - }) - return jsonify(pending) - - -@app.route("/api/advice/record", methods=["POST"]) -def record_advice(): - """记录一条分析建议到 decisions.json 的 advice_timeline""" - data = request.get_json(force=True) or {} - code = data.get("code", "") - if not code: - return jsonify({"status": "error", "message": "code required"}), 400 - - d = _load_json(DATA_DIR / "decisions.json", {"decisions": []}) - - entry = None - for e in d["decisions"]: - if e["code"] == code and e["status"] == "active": - entry = e - break - - if not entry: - return jsonify({"status": "error", "message": f"no active decision for {code}"}), 404 - - timeline = entry.setdefault("advice_timeline", []) - advice = { - "date": datetime.now().strftime("%Y-%m-%d %H:%M"), - "direction": data.get("direction", "持有"), - "price": data.get("price", ""), - "summary": data.get("summary", ""), - "status": "pending", - } - timeline.append(advice) - _save_json(DATA_DIR / "decisions.json", d) - return jsonify({"status": "ok", "advice": advice}) - - -@app.route("/api/advice/confirm", methods=["POST"]) -def confirm_advice(): - """确认/忽略一条建议""" - data = request.get_json(force=True) or {} - code = data.get("code", "") - idx = data.get("index", -1) - action = data.get("action", "confirmed") # confirmed | ignored - - d = _load_json(DATA_DIR / "decisions.json", {"decisions": []}) - for e in d["decisions"]: - if e["code"] == code and e["status"] == "active": - timeline = e.get("advice_timeline", []) - if 0 <= idx < len(timeline): - timeline[idx]["status"] = action - _save_json(DATA_DIR / "decisions.json", d) - return jsonify({"status": "ok"}) - return jsonify({"status": "error", "message": "not found"}), 404 - - -# ── 准确率统计API ── -@app.route("/api/stats/accuracy") -def get_accuracy_stats(): - data = _load_json(DATA_DIR / "accuracy_stats.json", {}) - return jsonify(data) - - -# ── 策略评估API ── -@app.route("/api/evaluation") -def get_evaluation(): - """返回所有策略的双维度评估结果""" - # 主数据源:evaluation.json - eval_data = _load_json(DATA_DIR / "evaluation.json", {}) - strategies = eval_data.get("strategies", []) - if strategies: - return jsonify(strategies) - - # 备选:从 decisions.json 的 evaluation 字段读取(尚未反写时的兼容) - decisions = _load_json(DECISIONS_PATH if 'DECISIONS_PATH' in dir() else DATA_DIR / "decisions.json", {"decisions": []}) - evals = [] - for d in decisions.get("decisions", []): - e = d.get("evaluation", []) - if e: - evals.append({ - "code": d["code"], - "name": d["name"], - "type": d.get("type", ""), - "current": d.get("current", ""), - "evaluations": e, - }) - return jsonify(evals) - - -@app.route("/api/evaluation/trigger", methods=["POST"]) -def trigger_evaluation(): - """手动触发策略评估""" - import subprocess - try: - r = subprocess.run( - ["python3", str(DATA_DIR.parent / "strategy_evaluator.py")], - capture_output=True, timeout=60, text=True, - ) - return jsonify({"status": "ok", "output": r.stdout, "error": r.stderr}) - except Exception as e: - return jsonify({"status": "error", "message": str(e)}), 500 - - -# ── 策略反馈API ── -@app.route("/api/feedback") -def get_feedback(): - data = _load_json(DATA_DIR / "strategy_feedback.json", {}) - return jsonify(data) - - -# ── 持仓截图上传与解析 ──────────────────────────────── - - -@app.route("/upload") -def upload_page(): - return send_from_directory(app.static_folder, "upload.html") - - -def _ocr_image(image_path): - """用Tesseract OCR提取图片中的文字(预处理优化中文表格识别)""" - from PIL import Image, ImageEnhance, ImageFilter - import pytesseract - - img = Image.open(image_path) - - # 预处理:放大 + 锐化 + 二值化,提升小字识别率 - w, h = img.size - if w < 2000 or h < 2000: - scale = max(2, 2000 // min(w, h)) - img = img.resize((w * scale, h * scale), Image.LANCZOS) - - # 转灰度 - img = img.convert("L") - - # 增强对比度 - enhancer = ImageEnhance.Contrast(img) - img = enhancer.enhance(2.0) - - # 锐化 - img = img.filter(ImageFilter.SHARPEN) - - # 二值化(自适应阈值) - threshold = 128 - img = img.point(lambda x: 255 if x > threshold else 0) - - # OCR:chip_sim+eng,PSM 6(统一文本块) - text = pytesseract.image_to_string( - img, - lang="chi_sim+eng", - config="--psm 6 --oem 3", - ) - return text.strip() - - -ANALYZE_PROMPT = """你是股票持仓数据分析助手。以下是用户上传的持仓/自选截图经过OCR提取的文字,请从中提取所有股票信息。 - -判断这是「持仓截图」还是「自选截图」: -- 持仓截图:每支股票有"证券数量"(持股数)、成本价、盈亏 -- 自选截图:只有股票列表和价格,没有持股数/成本 - -股票代码格式: -- A股:6位数字(如 600519, 000858, 300750) -- 港股:纯数字代码(如 0700, 3690, 1211),不带HK前缀 - -⚠️ 重要:截图顶部通常有汇总数据,如总资产、股票市值、可用资金、当日盈亏等。 -如果OCR文字中有这些汇总数字,请一并提取到JSON的summary字段中。 -不要自己计算汇总值,直接从OCR原文中提取。 - -请严格按照以下JSON格式回复,只输出JSON: - -```json -{ - "type": "portfolio" 或 "watchlist", - "summary": { - "total_assets": "总资产数字(可选,从截图中提取)", - "stock_value": "股票市值/持仓市值数字(可选,从截图中提取)", - "cash": "可用资金/现金数字(可选,从截图中提取)", - "day_pnl": "当日盈亏金额(可选,从截图中提取)" - }, - "stocks": [ - { - "code": "股票代码", - "name": "股票名称(中文)", - "price": "现价(数字)", - "shares": "持股数量(数字,持仓截图才有)", - "cost": "成本价(数字,持仓截图才有)", - "pnl": "盈亏百分比如+15.1%(持仓截图才有)", - "position_pct": "仓位占比数字如12.5(可选)" - } - ] -} -``` - -OCR原文: -""" - - -@app.route("/api/upload/analyze", methods=["POST"]) -def upload_analyze(): - """接收图片,OCR提取文字 → LLM解析结构化数据""" - if "image" not in request.files: - return jsonify({"error": "请上传图片"}), 400 - - f = request.files["image"] - if not f.filename: - return jsonify({"error": "空文件"}), 400 - - # 保存到临时目录 - UPLOAD_DIR.mkdir(parents=True, exist_ok=True) - ext = Path(f.filename).suffix or ".png" - save_path = UPLOAD_DIR / f"{uuid.uuid4().hex}{ext}" - f.save(str(save_path)) - - try: - # 第一步:OCR提取文字 - raw_text = _ocr_image(str(save_path)) - if not raw_text: - return jsonify({"error": "OCR未识别到文字,请确认图片清晰"}), 400 - except Exception as e: - os.unlink(str(save_path)) - return jsonify({"error": f"OCR失败: {e}"}), 500 - - # 第二步:LLM解析结构化数据(走文本API,不走视觉) - llm_text = _llm_parse(raw_text, ANALYZE_PROMPT) - - os.unlink(str(save_path)) - - # 从LLM回复中提取JSON - json_match = re.search(r"```(?:json)?\s*({.*?})\s*```", llm_text, re.DOTALL) - if json_match: - try: - parsed = json.loads(json_match.group(1)) - except json.JSONDecodeError: - return jsonify({"error": f"LLM解析JSON失败: {llm_text[:500]}"}), 500 - else: - # 尝试直接找JSON(没被代码块包裹) - try: - parsed = json.loads(llm_text) - except json.JSONDecodeError: - return jsonify({"error": f"未提取到结构化数据: {raw_text[:300]}...\n\nLLM回复: {llm_text[:500]}"}), 500 - - return jsonify(parsed) - - -def _llm_parse(text, prompt_template): - """发送OCR文本到Hermes LLM解析,返回JSON字符串""" - payload = json.dumps({ - "model": "hermes-agent", - "messages": [ - {"role": "system", "content": "你是一个数据提取助手。从OCR文字中提取结构化JSON数据。"}, - {"role": "user", "content": prompt_template + "\n" + text}, - ], - "max_tokens": 4096, - }).encode() - - req = urllib.request.Request(GATEWAY, data=payload, method="POST") - req.add_header("Content-Type", "application/json") - req.add_header("Authorization", f"Bearer {API_KEY}") - req.add_header("X-Hermes-Session-Id", "upload-ocr-parse") - - try: - resp = urllib.request.urlopen(req, timeout=120) - data = json.loads(resp.read()) - return data.get("choices", [{}])[0].get("message", {}).get("content", "") - except Exception as e: - return f"ERROR: {e}" - - -@app.route("/api/upload/confirm", methods=["POST"]) -def upload_confirm(): - """确认解析结果,更新数据文件""" - data = request.get_json(force=True) - stocks = data.get("stocks", []) - doc_type = data.get("type", "portfolio") - - # 尝试获取实时行情补充数据 - try: - codes = [s["code"] for s in stocks if s.get("code")] - if codes: - qs = " ".join( - f"hk{c}" if len(c) == 5 # 港股5位代码 - else f"sz{c}" if c.startswith("0") or c.startswith("3") - else f"sh{c}" if c.startswith("6") - else f"hk{c}" - for c in codes - ) - url = f"https://qt.gtimg.cn/q={qs}" - req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) - resp = urllib.request.urlopen(req, timeout=10) - qt_text = resp.read().decode("gbk", errors="replace") - # map realtime prices - for stock in stocks: - code = stock.get("code", "") - prefix = "hk" if len(code) == 5 else "sz" if code.startswith(("0","3")) else "sh" if code.startswith("6") else "hk" - # 腾讯 API 格式: prefix+code="市场~名称~代码~当前价~昨收~今开~成交量~..." - m = re.search(rf'{prefix}{code}="([^"]+)"', qt_text) - if m: - fields = m.group(1).split('~') - name = fields[1] - price = fields[3] # 当前价 - if not stock.get("price"): - stock["price"] = price - if not stock.get("name"): - stock["name"] = name - except: - pass # 行情获取失败不影响主流程 - - # 更新对应数据文件 - if doc_type == "portfolio": - existing = _load_json(DATA_DIR / "portfolio.json", {}) - old_holdings = {h["code"]: h for h in existing.get("holdings", []) if h.get("code")} - new_holdings = [] - for s in stocks: - code = s.get("code", "") - old = old_holdings.get(code, {}) - new_shares = int(s["shares"]) if str(s.get("shares", "")).lstrip('-').isdigit() else old.get("shares", 0) - old_shares = old.get("shares", 0) - # 股数突变检测:旧200→新0是合理卖出,但旧0→新200可能是OCR错读 - if old_shares > 0 and new_shares == 0 and old_shares != new_shares: - print(f"[仓位变动] {code} {s.get('name','')}: {old_shares}→{new_shares} (卖出清仓)") - elif abs(new_shares - old_shares) > max(old_shares * 0.5, 100) and old_shares > 0: - print(f"[仓位变动] {code} {s.get('name','')}: {old_shares}→{new_shares} (变动较大)") - new_holdings.append({ - "code": code, - "name": s.get("name") or old.get("name", ""), - "shares": new_shares, - "price": float(s.get("price", 0)) or old.get("price", 0), - "cost": float(s.get("cost", 0)) if s.get("cost") else old.get("cost", 0), - "pnl": s.get("pnl") or old.get("pnl", ""), - "position_pct": float(s.get("position_pct", 0)) if s.get("position_pct") else old.get("position_pct", 0), - "change_pct": old.get("change_pct", 0), - }) - existing["holdings"] = new_holdings - - # 使用截图中的汇总数据(优先),没有则用旧数据 - summary = data.get("summary", {}) - if summary.get("stock_value"): - existing["stock_value"] = float(summary["stock_value"]) - else: - existing["stock_value"] = round( - sum(h["shares"] * h["price"] for h in existing["holdings"]), 2 - ) - if summary.get("cash"): - existing["cash"] = float(summary["cash"]) - if summary.get("total_assets"): - existing["total_assets"] = float(summary["total_assets"]) - else: - existing["total_assets"] = existing["stock_value"] + existing.get("cash", 0) - if summary.get("day_pnl"): - existing["day_pnl"] = float(summary["day_pnl"]) - existing["updated_at"] = datetime.now().isoformat() - # 计算仓位% - if existing["total_assets"] > 0: - existing["position_pct"] = round(existing["stock_value"] / existing["total_assets"] * 100, 2) - _save_json(DATA_DIR / "portfolio.json", existing) - msg = f"更新了 {len(stocks)} 只持仓股" - - elif doc_type == "watchlist": - existing = _load_json(DATA_DIR / "watchlist.json", {}) - existing["stocks"] = [ - { - "code": s.get("code", ""), - "name": s.get("name", ""), - "price": float(s.get("price", 0)) if s.get("price") else 0, - } - for s in stocks - ] - existing["updated_at"] = datetime.now().isoformat() - _save_json(DATA_DIR / "watchlist.json", existing) - msg = f"更新了 {len(stocks)} 只自选股" - - else: - return jsonify({"error": f"未知类型: {doc_type}"}), 400 - - return jsonify({"status": "ok", "message": msg}) - - -# ── TDX中继实时行情接收API ── -@app.route("/api/update/realtime", methods=["POST"]) -def update_realtime(): - """接收小小莫中继的实时行情数据""" - data = request.get_json(force=True) or {} - stocks = data.get("stocks", []) - source = data.get("source", "unknown") - - if not stocks: - return jsonify({"status": "error", "message": "没有股票数据"}), 400 - - # 更新 portfolio.json 中的实时价格(change_pct字段) - pf = _load_json(DATA_DIR / "portfolio.json", {"holdings": []}) - pf_holdings = {h["code"]: h for h in pf.get("holdings", [])} - - updated = 0 - for s in stocks: - code = s.get("code", "") - if code in pf_holdings: - pf_holdings[code]["price"] = float(s.get("price", pf_holdings[code].get("price", 0))) - pf_holdings[code]["change_pct"] = float(s.get("change_pct", 0)) - pf_holdings[code]["high"] = float(s.get("high", 0)) - pf_holdings[code]["low"] = float(s.get("low", 0)) - pf_holdings[code]["open"] = float(s.get("open", 0)) - pf_holdings[code]["volume"] = int(s.get("volume", 0)) - pf_holdings[code]["data_source"] = source - pf_holdings[code]["updated_at"] = datetime.now().isoformat() - updated += 1 - - # 也更新 watchlist.json - wl = _load_json(DATA_DIR / "watchlist.json", {"stocks": []}) - wl_stocks = {s["code"]: s for s in wl.get("stocks", [])} - - for s in stocks: - code = s.get("code", "") - if code in wl_stocks: - wl_stocks[code]["price"] = float(s.get("price", wl_stocks[code].get("price", 0))) - wl_stocks[code]["change_pct"] = float(s.get("change_pct", 0)) - - pf["updated_at"] = datetime.now().isoformat() - wl["updated_at"] = datetime.now().isoformat() - _save_json(DATA_DIR / "portfolio.json", pf) - _save_json(DATA_DIR / "watchlist.json", wl) - - return jsonify({ - "status": "ok", - "updated": updated, - "source": source, - "timestamp": datetime.now().isoformat(), - }) - - -# 注册提示词管理路由 -register_routes(app) - - -if __name__ == "__main__": - port = int(os.environ.get("PORT", 8899)) - print(f"🚀 MoFin Dashboard → http://0.0.0.0:{port}") - app.run(host="0.0.0.0", port=port, debug=False) \ No newline at end of file