#!/usr/bin/env python3 """update_data.py — 解析cron输出,更新dashboard数据层""" import json import os import re from datetime import datetime from pathlib import Path DATA_DIR = Path(__file__).parent / "data" def _save(name, data): path = DATA_DIR / name os.makedirs(path.parent, exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def parse_report(markdown_text, source_file=None): """解析cron输出的markdown报告,提取结构化数据""" report = { "title": "", "type": "未知", "created_at": datetime.now().isoformat(), "summary": "", "content": markdown_text, "stocks_mentioned": [], "structured": None, # 结构化数据优先 } lines = markdown_text.split("\n") # 提取标题 for line in lines: m = re.match(r"^#\s+(.+)", line) if m: report["title"] = m.group(1).strip() break m = re.match(r"^📊\s+(.+)", line) if m: report["title"] = m.group(1).strip() break # 判断类型 if "盘中" in report["title"]: report["type"] = "盘中" elif "盘后" in report["title"] or "复盘" in report["title"]: report["type"] = "盘后" elif "盯盘" in report["title"]: report["type"] = "盯盘" elif "扫描" in report["title"]: report["type"] = "盘前" # ★ 优先提取结构化JSON(如果知微输出了的话) struct_match = re.search(r'\s*(\{.*?\})\s*', markdown_text, re.DOTALL) if struct_match: try: parsed = json.loads(struct_match.group(1)) report["structured"] = parsed # 从结构化数据中直接取stock codes codes = set() for h in parsed.get("holdings", []): c = h.get("code", "") if c: codes.add(c) report["stocks_mentioned"] = sorted(codes) except (json.JSONDecodeError, Exception) as e: pass # JSON解析失败→走NLP兜底 # 摘要(前3非空行) body_lines = [l.strip() for l in lines if l.strip() and not l.strip().startswith("#") and not l.strip().startswith("##")] report["summary"] = "\n".join(body_lines[:5])[:200] # NLP兜底(仅当结构化数据没取到code时) if not report["stocks_mentioned"]: codes = set(re.findall(r'\b\d{6}\b', markdown_text)) hk_codes = set(re.findall(r'\b\d{5}\b', markdown_text)) report["stocks_mentioned"] = sorted(codes | hk_codes) return report def import_cron_outputs(): """从cron输出目录导入最新报告""" cron_dir = Path.home() / ".hermes" / "cron" / "output" reports_dir = DATA_DIR / "reports" os.makedirs(reports_dir, exist_ok=True) count = 0 if not cron_dir.exists(): return count for job_dir in sorted(cron_dir.iterdir()): if not job_dir.is_dir(): continue for f in sorted(job_dir.iterdir(), reverse=True)[:5]: # 每个job最近5个 if f.suffix != ".md": continue # Skip if already imported export_name = f"cron_{job_dir.name}_{f.stem}.json" if (reports_dir / export_name).exists(): continue content = f.read_text(encoding="utf-8", errors="replace") report = parse_report(content, source_file=str(f)) # Extract response section resp_match = re.search(r"## Response\n+(.*)", content, re.DOTALL) if resp_match: resp = resp_match.group(1).strip() if resp == "[SILENT]": continue # Skip SILENT reports report["_id"] = export_name.replace(".json", "") _save(f"reports/{export_name}", report) count += 1 return count def extract_stock_mentions(): """从报告中提取个股操作建议""" reports_dir = DATA_DIR / "reports" stocks_dir = DATA_DIR / "stocks" os.makedirs(stocks_dir, exist_ok=True) stock_data = {} for f in sorted(reports_dir.iterdir()): if f.suffix != ".json": continue try: report = json.loads(f.read_text(encoding="utf-8")) except: continue content = report.get("content", "") codes = report.get("stocks_mentioned", []) for code in codes: if code not in stock_data: stock_data[code] = {"code": code, "history": []} # Try to extract recommendation from content # Look for patterns like "建议|止盈|止损|补仓|持有" pattern = re.compile( rf'.*?({code}).*?(建议|止盈|止损|补仓|持有|减仓|加仓|卖出|买入).*?(?:\n|$)', re.IGNORECASE, ) for m in pattern.finditer(content): stock_data[code]["history"].append({ "time": report.get("created_at", ""), "content": m.group(0).strip()[:100], "report_id": report.get("_id", ""), }) for code, data in stock_data.items(): _save(f"stocks/{code}.json", data) return len(stock_data) def sync_to_decisions(): """将个股建议同步到决策库(advice_timeline),自动去重""" decisions_path = DATA_DIR / "decisions.json" if not decisions_path.exists(): return 0 decisions = json.loads(decisions_path.read_text(encoding="utf-8")) stocks_dir = DATA_DIR / "stocks" synced = 0 for f in sorted(stocks_dir.iterdir()): if f.suffix != ".json": continue try: stock = json.loads(f.read_text(encoding="utf-8")) except: continue code = stock.get("code", "") history = stock.get("history", []) if not code or not history: continue # 找决策库中是否有此股 existing = None for d in decisions["decisions"]: if d["code"] == code: existing = d break if not existing: # 无决策记录→生成inactive记录 existing = { "code": code, "name": stock.get("name", ""), "timestamp": datetime.now().isoformat(), "type": "历史建议汇总", "current": "自动从update_data同步", "status": "inactive", "updated_by": "system(update_data)", "advice_timeline": [] } decisions["decisions"].append(existing) # 去重合并 timeline = existing.setdefault("advice_timeline", []) existing_keys = {(e["date"], e["direction"], e["summary"]) for e in timeline if "date" in e and "direction" in e and "summary" in e} new_count = 0 for entry in history: content = entry.get("content", "") # 判断方向 direction = "其他" if any(w in content for w in ["买入", "加仓", "入场", "🟢", "可加", "可入"]): direction = "买入" elif any(w in content for w in ["卖出", "止盈", "减仓", "止损", "清仓", "🔴", "锁定利润"]): direction = "卖出" elif any(w in content for w in ["持有", "观望", "👀", "🤝", "暂持", "继续持有"]): direction = "持有" if direction == "其他": continue # 提取日期 rid = entry.get("report_id", "") m = re.search(r'(\d{4}-\d{2}-\d{2})', rid) date = m.group(1) if m else "unknown" key = (date, direction, content.strip()[:80]) if key not in existing_keys: existing_keys.add(key) timeline.append({ "date": date, "direction": direction, "summary": content.strip()[:120], "report_id": rid }) new_count += 1 if new_count > 0: # 按日期排序 timeline.sort(key=lambda e: e.get("date", "")) synced += new_count decisions_path.write_text( json.dumps(decisions, ensure_ascii=False, indent=2), encoding="utf-8" ) return synced def build_portfolio_from_obsidian(): """读取Obsidian持仓数据,生成portfolio.json""" import subprocess # Attempt to read from Obsidian obsidian_path = Path.home() / "Obsidian" / "knowledge" / "finance" portfolio_file = obsidian_path / "dad-portfolio.md" holdings = [] total_assets = 0 stock_value = 0 cash = 0 if portfolio_file.exists(): content = portfolio_file.read_text(encoding="utf-8", errors="replace") lines = content.split("\n") for line in lines: m = re.match(r'\|.*?\|.*?(\d+)@(\d+\.?\d*)@.*?\|(\d+\.?\d*)%?\|', line) if m: # Parse holding lines from markdown table parts = [p.strip() for p in line.split("|")] if len(parts) >= 8: name = parts[1] if len(parts) > 1 else "" code = parts[2] if len(parts) > 2 else "" if code: holdings.append({ "code": code, "name": name, "position_pct": 0, "cost": 0, "shares": 0, "price": 0, "change_pct": 0, }) return { "holdings": holdings, "total_assets": total_assets, "stock_value": stock_value, "cash": cash, "position_pct": 0, "total_pnl": 0, "updated_at": datetime.now().isoformat(), } if __name__ == "__main__": count = import_cron_outputs() if count > 0: print(f"📥 新增报告: {count}篇") stocks = extract_stock_mentions() if stocks > 0: print(f"📊 个股数据: {stocks}条") synced = sync_to_decisions() if synced > 0: print(f"📋 决策库: 新增{synced}条建议") # 有实质更新才发汇总,否则安静 if count > 0 or stocks > 0 or synced > 0: print(f"✅ {datetime.now().strftime('%m/%d %H:%M')} 数据同步完成") # 什么新数据都没有→安静,不输出任何内容