From dc68739ac4ffd5c088febb2446d08c230af8d65e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Tue, 30 Jun 2026 23:46:08 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20DB=20migration=20=E2=80=94=20enforce=20?= =?UTF-8?q?currency=20constraints=20on=20holdings/strategies/watchlist/sum?= =?UTF-8?q?mary=20tables=20+=20price=5Fmonitor=20DB=20writes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mofin_db.py | 1736 +++++++++++++++++++++++++--------------------- price_monitor.py | 1402 +++++++++++++++++++------------------ 2 files changed, 1660 insertions(+), 1478 deletions(-) diff --git a/mofin_db.py b/mofin_db.py index 68e2d00..2f97574 100644 --- a/mofin_db.py +++ b/mofin_db.py @@ -1,790 +1,946 @@ -#!/usr/bin/env python3 -"""mofin_db.py — MoFin 统一数据库访问层 - -所有脚本通过此模块访问 mofin.db,避免重复建表/连接逻辑。 - -用法: - from mofin_db import get_conn, write_market_snapshot, write_klines, ... - -设计原则: - - 幂等建表(CREATE TABLE IF NOT EXISTS) - - WAL 模式 + 外键约束 - - 所有写操作返回 (success: bool, detail: str) - - JSON 写入由调用方负责,本模块只写 SQLite -""" - -import sqlite3 -import json -from datetime import datetime -from pathlib import Path -from typing import Optional - -DATA_DIR = Path(__file__).parent / "data" -DB_PATH = DATA_DIR / "mofin.db" - -# ═══════════════════════════════════════════════════════════ -# 连接管理 -# ═══════════════════════════════════════════════════════════ - -def get_conn() -> sqlite3.Connection: - """获取数据库连接(WAL 模式,外键约束,Row 工厂)""" - DATA_DIR.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect(str(DB_PATH)) - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA foreign_keys=ON") - return conn - - -# ═══════════════════════════════════════════════════════════ -# 建表(幂等) -# ═══════════════════════════════════════════════════════════ - -def init_all_tables(conn: sqlite3.Connection): - """创建全部表(幂等,已存在则跳过)""" - conn.executescript(""" - -- 市场快照 - CREATE TABLE IF NOT EXISTS market_snapshots ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL, - source TEXT NOT NULL DEFAULT 'ths', - up_ratio REAL, - mood TEXT, - created_at TEXT DEFAULT (datetime('now','localtime')) - ); - CREATE INDEX IF NOT EXISTS idx_snapshots_time ON market_snapshots(timestamp); - - -- 板块快照 - CREATE TABLE IF NOT EXISTS sector_snapshots ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - snapshot_id INTEGER NOT NULL REFERENCES market_snapshots(id), - name TEXT NOT NULL, - change_pct REAL, - up_count INTEGER, - down_count INTEGER, - net_inflow REAL, - lead_stock TEXT, - lead_stock_change REAL, - volume REAL, - turnover REAL - ); - CREATE INDEX IF NOT EXISTS idx_sector_name ON sector_snapshots(name); - CREATE INDEX IF NOT EXISTS idx_sector_snapshot ON sector_snapshots(snapshot_id); - CREATE INDEX IF NOT EXISTS idx_sector_name_time ON sector_snapshots(name, snapshot_id); - - -- 个股 - CREATE TABLE IF NOT EXISTS stocks ( - code TEXT PRIMARY KEY, - name TEXT NOT NULL, - exchange TEXT DEFAULT 'SH', - type TEXT DEFAULT 'A', - updated_at TEXT - ); - - -- K线(日/周/月) - CREATE TABLE IF NOT EXISTS stock_daily ( - code TEXT NOT NULL REFERENCES stocks(code), - date TEXT NOT NULL, - open REAL, close REAL, high REAL, low REAL, - volume REAL, amount REAL, - PRIMARY KEY (code, date) - ); - CREATE TABLE IF NOT EXISTS stock_weekly ( - code TEXT NOT NULL REFERENCES stocks(code), - date TEXT NOT NULL, - open REAL, close REAL, high REAL, low REAL, - volume REAL, - PRIMARY KEY (code, date) - ); - CREATE TABLE IF NOT EXISTS stock_monthly ( - code TEXT NOT NULL REFERENCES stocks(code), - date TEXT NOT NULL, - open REAL, close REAL, high REAL, low REAL, - volume REAL, - PRIMARY KEY (code, date) - ); - - -- 基本面 - CREATE TABLE IF NOT EXISTS stock_fundamentals ( - code TEXT PRIMARY KEY REFERENCES stocks(code), - pe REAL, pb REAL, eps REAL, - mcap_total REAL, mcap_flow REAL, - updated_at TEXT - ); - - -- 板块成分映射 - CREATE TABLE IF NOT EXISTS stock_sectors ( - code TEXT NOT NULL REFERENCES stocks(code), - sector_name TEXT NOT NULL, - source TEXT DEFAULT 'ths', - updated_at TEXT DEFAULT (datetime('now','localtime')), - PRIMARY KEY (code, sector_name) - ); - CREATE INDEX IF NOT EXISTS idx_stock_sector ON stock_sectors(sector_name); - - -- 持仓 - CREATE TABLE IF NOT EXISTS holdings ( - code TEXT PRIMARY KEY REFERENCES stocks(code), - name TEXT NOT NULL, - shares INTEGER NOT NULL, - cost REAL, - position_pct REAL, - added_at TEXT, - is_active INTEGER DEFAULT 1, - closed_at TEXT, - close_pnl REAL - ); - - -- 持仓策略 - CREATE TABLE IF NOT EXISTS holding_strategies ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - code TEXT NOT NULL REFERENCES holdings(code), - version INTEGER DEFAULT 1, - stop_loss REAL, - take_profit REAL, - entry_low REAL, - entry_high REAL, - strategy_type TEXT DEFAULT 'holding', - source TEXT, - reason TEXT, - created_at TEXT DEFAULT (datetime('now','localtime')), - superseded_at TEXT - ); - CREATE INDEX IF NOT EXISTS idx_strategy_code ON holding_strategies(code); - - -- 自选股 - CREATE TABLE IF NOT EXISTS watchlist_stocks ( - code TEXT PRIMARY KEY REFERENCES stocks(code), - name TEXT NOT NULL, - added_at TEXT DEFAULT (datetime('now','localtime')), - is_active INTEGER DEFAULT 1 - ); - - -- 候选池 - CREATE TABLE IF NOT EXISTS candidates ( - code TEXT PRIMARY KEY REFERENCES stocks(code), - name TEXT NOT NULL, - sector TEXT, - reason TEXT, - entry_range TEXT, - stop_loss REAL, - target REAL, - zhiwei_star REAL, - zhiwei_reviewed INTEGER DEFAULT 0, - zhiwei_reviewed_at TEXT, - promoted INTEGER DEFAULT 0, - promoted_at TEXT, - dropped INTEGER DEFAULT 0, - drop_reason TEXT, - created_at TEXT DEFAULT (datetime('now','localtime')) - ); - - -- 候选评分历史 - CREATE TABLE IF NOT EXISTS candidate_score_history ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - code TEXT NOT NULL REFERENCES candidates(code), - score REAL NOT NULL, - source TEXT NOT NULL, - reason TEXT, - created_at TEXT DEFAULT (datetime('now','localtime')) - ); - CREATE INDEX IF NOT EXISTS idx_candidate_history ON candidate_score_history(code, created_at); - - -- 价格事件 - CREATE TABLE IF NOT EXISTS price_events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - code TEXT NOT NULL REFERENCES stocks(code), - name TEXT, - event_type TEXT NOT NULL, - price REAL, - trigger_value TEXT, - event_label TEXT, - created_at TEXT DEFAULT (datetime('now','localtime')), - date TEXT - ); - CREATE INDEX IF NOT EXISTS idx_events_code ON price_events(code); - CREATE INDEX IF NOT EXISTS idx_events_date ON price_events(date); - - -- 策略评估记录 - CREATE TABLE IF NOT EXISTS strategy_evaluations ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - code TEXT NOT NULL REFERENCES stocks(code), - eval_type TEXT NOT NULL, - status TEXT DEFAULT 'pending', - old_stop_loss REAL, - new_stop_loss REAL, - old_tp REAL, - new_tp REAL, - reason TEXT, - created_at TEXT DEFAULT (datetime('now','localtime')) - ); - - -- 持仓汇总(portfolio.json 顶层字段) - CREATE TABLE IF NOT EXISTS portfolio_summary ( - id INTEGER PRIMARY KEY CHECK (id = 1), - total_assets REAL, - stock_value REAL, - cash REAL, - position_pct REAL, - total_pnl REAL, - updated_at TEXT - ); - - -- 建议时间线(decisions.json advice_timeline[]) - CREATE TABLE IF NOT EXISTS advice_timeline ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - code TEXT NOT NULL REFERENCES stocks(code), - date TEXT, - direction TEXT, - price REAL, - summary TEXT, - status TEXT, - evaluated INTEGER DEFAULT 0, - result TEXT, - evaluated_at TEXT, - report_id TEXT, - created_at TEXT DEFAULT (datetime('now','localtime')) - ); - CREATE INDEX IF NOT EXISTS idx_advice_code ON advice_timeline(code); - - -- 准确率统计(accuracy_stats.json) - CREATE TABLE IF NOT EXISTS accuracy_stats ( - id INTEGER PRIMARY KEY CHECK (id = 1), - period_start TEXT, - period_end TEXT, - total_advice INTEGER DEFAULT 0, - correct INTEGER DEFAULT 0, - wrong INTEGER DEFAULT 0, - partial INTEGER DEFAULT 0, - unknown INTEGER DEFAULT 0, - pending INTEGER DEFAULT 0, - ignored INTEGER DEFAULT 0, - evaluated INTEGER DEFAULT 0, - accuracy_pct REAL, - phase1_correct INTEGER DEFAULT 0, - phase1_wrong INTEGER DEFAULT 0, - phase1_pending INTEGER DEFAULT 0, - phase1_accuracy REAL, - phase2_correct INTEGER DEFAULT 0, - phase2_wrong INTEGER DEFAULT 0, - phase2_pending INTEGER DEFAULT 0, - phase2_accuracy REAL, - total_evaluated INTEGER DEFAULT 0, - updated_at TEXT - ); - - -- 策略反馈(strategy_feedback.json) - CREATE TABLE IF NOT EXISTS strategy_feedback ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - code TEXT NOT NULL REFERENCES stocks(code), - name TEXT, - evaluated_at TEXT, - phase1_completed INTEGER DEFAULT 0, - phase1_result TEXT, - phase1_completed_at TEXT, - phase1_price REAL, - phase2_completed INTEGER DEFAULT 0, - phase2_result TEXT, - phase2_completed_at TEXT, - days_in_phase1 INTEGER, - adjustments_json TEXT, - created_at TEXT DEFAULT (datetime('now','localtime')) - ); - CREATE INDEX IF NOT EXISTS idx_feedback_code ON strategy_feedback(code); - - -- 板块信号(trend_detector 产出) - CREATE TABLE IF NOT EXISTS sector_signals ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - signal_type TEXT NOT NULL, - sector TEXT NOT NULL, - severity TEXT DEFAULT 'medium', - related_stocks TEXT, - holdings_in_sector TEXT, - watchlist_in_sector TEXT, - trigger_reason TEXT, - snapshot_id INTEGER, - processed INTEGER DEFAULT 0, - detected_at TEXT DEFAULT (datetime('now','localtime')) - ); - CREATE INDEX IF NOT EXISTS idx_signal_processed ON sector_signals(processed); - CREATE INDEX IF NOT EXISTS idx_signal_sector ON sector_signals(sector); - - -- 小果情报(xiaoguo_news_processor 产出) - CREATE TABLE IF NOT EXISTS signal_news ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - signal_id INTEGER REFERENCES sector_signals(id), - sector TEXT NOT NULL, - overall_sentiment TEXT, - summary TEXT, - key_articles TEXT, - searched_stocks TEXT, - created_at TEXT DEFAULT (datetime('now','localtime')) - ); - CREATE INDEX IF NOT EXISTS idx_signal_news_signal ON signal_news(signal_id); - - -- 小果扫描跟踪(去重用) - CREATE TABLE IF NOT EXISTS xiaoguo_scan_tracker ( - code TEXT PRIMARY KEY, - name TEXT, - last_scanned_at TEXT, - found_count INTEGER DEFAULT 0 - ); - """) - conn.commit() - - # 迁移:给 signal_news 加 source 字段(幂等) - try: - conn.execute("ALTER TABLE signal_news ADD COLUMN source TEXT DEFAULT 'trend'") - except sqlite3.OperationalError: - pass - conn.commit() - - -# ═══════════════════════════════════════════════════════════ -# 市场快照写入 -# ═══════════════════════════════════════════════════════════ - -def write_market_snapshot(conn: sqlite3.Connection, market_data: dict) -> tuple[bool, str, Optional[int]]: - """写入一次市场采集到 market_snapshots + sector_snapshots - - Returns: (ok, message, snapshot_id) - """ - try: - cur = conn.execute( - "INSERT INTO market_snapshots (timestamp, source, up_ratio, mood) VALUES (?, ?, ?, ?)", - (market_data["timestamp"], market_data.get("source", "unknown"), - market_data.get("up_ratio", 0), market_data.get("mood", "unknown")), - ) - sid = cur.lastrowid - - sectors = market_data.get("sectors", []) - rows = [(sid, s.get("name", ""), s.get("change", 0), - s.get("up_count"), s.get("down_count"), s.get("net_inflow"), - s.get("lead_stock"), s.get("lead_stock_change"), - s.get("volume"), s.get("turnover")) for s in sectors] - if rows: - conn.executemany( - "INSERT INTO sector_snapshots (snapshot_id, name, change_pct, up_count, down_count, " - "net_inflow, lead_stock, lead_stock_change, volume, turnover) " - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", rows) - conn.commit() - return True, f"snapshot_id={sid}, sectors={len(rows)}", sid - except Exception as e: - try: - conn.rollback() - except Exception: - pass - return False, str(e), None - - -# ═══════════════════════════════════════════════════════════ -# K线写入 -# ═══════════════════════════════════════════════════════════ - -def write_klines(conn: sqlite3.Connection, code: str, name: str, - daily: list = None, weekly: list = None, monthly: list = None, - fundamentals: dict = None) -> bool: - """将个股K线数据双写 SQLite - - Args: - code: 股票代码 - name: 股票名称 - daily/weekly/monthly: [{date, open, close, high, low, volume}, ...] - fundamentals: {pe, pb, eps, mcap_total, mcap_flow} - """ - try: - # 判断交易所 - raw = str(code) - if len(raw) == 5 and raw.isdigit(): - exchange, stype = "HK", "H" - elif raw.startswith(("6", "5", "9")): - exchange, stype = "SH", "A" - else: - exchange, stype = "SZ", "A" - - # stocks 表(INSERT OR REPLACE) - conn.execute( - "INSERT OR REPLACE INTO stocks (code, name, exchange, type, updated_at) VALUES (?, ?, ?, ?, ?)", - (code, name, exchange, stype, datetime.now().isoformat())) - - # K线数据 - for period, table, data in [ - ("daily", "stock_daily", daily), - ("weekly", "stock_weekly", weekly), - ("monthly", "stock_monthly", monthly), - ]: - if not data: - continue - rows = [(code, d.get("date", ""), d.get("open"), d.get("close"), - d.get("high"), d.get("low"), d.get("volume"), - d.get("amount") if period == "daily" else None) for d in data] - if period == "daily": - conn.executemany( - f"INSERT OR REPLACE INTO {table} (code, date, open, close, high, low, volume, amount) " - "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", rows) - else: - conn.executemany( - f"INSERT OR REPLACE INTO {table} (code, date, open, close, high, low, volume) " - "VALUES (?, ?, ?, ?, ?, ?, ?)", - [(r[0], r[1], r[2], r[3], r[4], r[5], r[6]) for r in rows]) - - # 基本面 - if fundamentals: - conn.execute( - "INSERT OR REPLACE INTO stock_fundamentals (code, pe, pb, eps, mcap_total, mcap_flow, updated_at) " - "VALUES (?, ?, ?, ?, ?, ?, ?)", - (code, fundamentals.get("pe"), fundamentals.get("pb"), - fundamentals.get("eps"), fundamentals.get("mcap_total"), - fundamentals.get("mcap_flow"), datetime.now().isoformat())) - - conn.commit() - return True - except Exception as e: - try: - conn.rollback() - except Exception: - pass - return False - - -# ═══════════════════════════════════════════════════════════ -# 价格事件写入 -# ═══════════════════════════════════════════════════════════ - -def write_price_event(conn: sqlite3.Connection, code: str, name: str, - event_type: str, price: float, trigger_value: str, - event_label: str = "") -> bool: - """写入一条价格事件""" - try: - now = datetime.now() - conn.execute( - "INSERT INTO price_events (code, name, event_type, price, trigger_value, event_label, date) " - "VALUES (?, ?, ?, ?, ?, ?, ?)", - (code, name, event_type, round(price, 2), trigger_value, - event_label, now.strftime("%Y-%m-%d"))) - conn.commit() - return True - except Exception: - try: - conn.rollback() - except Exception: - pass - return False - - -# ═══════════════════════════════════════════════════════════ -# 板块成分迁移 -# ═══════════════════════════════════════════════════════════ - -def migrate_stock_sectors(conn: sqlite3.Connection) -> tuple[int, int]: - """从 stock_sector_map.json 迁移到 stock_sectors 表 - - Returns: (migrated_stocks, total_mappings) - """ - sector_map_path = DATA_DIR / "stock_sector_map.json" - if not sector_map_path.exists(): - return 0, 0 - - try: - with open(sector_map_path, encoding="utf-8") as f: - data = json.load(f) - except Exception: - return 0, 0 - - # 过滤元数据字段 - mappings = [(code, sectors) for code, sectors in data.items() - if not code.startswith("_") and isinstance(sectors, list)] - - total = 0 - for code, sectors in mappings: - for sector in sectors: - try: - conn.execute( - "INSERT OR IGNORE INTO stock_sectors (code, sector_name, source) VALUES (?, ?, 'ths')", - (code, sector)) - total += 1 - except Exception: - pass - conn.commit() - return len(mappings), total - - -# ═══════════════════════════════════════════════════════════ -# 查询辅助 -# ═══════════════════════════════════════════════════════════ - -def query_sector_trend(conn: sqlite3.Connection, name: str, limit: int = 5) -> list[dict]: - """板块最近N次趋势""" - rows = conn.execute(""" - SELECT s.timestamp, ss.change_pct, ss.net_inflow, - ss.up_count, ss.down_count, ss.lead_stock, ss.lead_stock_change - FROM sector_snapshots ss - JOIN market_snapshots s ON ss.snapshot_id = s.id - WHERE ss.name = ? ORDER BY s.timestamp DESC LIMIT ? - """, (name, limit)).fetchall() - return [dict(r) for r in rows] - - -def query_top_inflow(conn: sqlite3.Connection, limit: int = 5) -> list[dict]: - """最新一次资金净流入排行""" - rows = conn.execute(""" - SELECT ss.name, ss.change_pct, ss.net_inflow, ss.lead_stock, s.timestamp - FROM sector_snapshots ss - JOIN market_snapshots s ON ss.snapshot_id = s.id - WHERE s.id = (SELECT MAX(id) FROM market_snapshots) - AND ss.net_inflow IS NOT NULL - ORDER BY ss.net_inflow DESC LIMIT ? - """, (limit,)).fetchall() - return [dict(r) for r in rows] - - -def query_consecutive_inflow(conn: sqlite3.Connection, days: int = 3) -> list[dict]: - """连续N次净流入的板块""" - rows = conn.execute(""" - SELECT name, COUNT(*) as times, ROUND(AVG(net_inflow), 2) as avg_inflow, - ROUND(AVG(change_pct), 2) as avg_change - FROM sector_snapshots ss - JOIN market_snapshots s ON ss.snapshot_id = s.id - WHERE s.id > (SELECT MAX(id) - ? FROM market_snapshots) - AND net_inflow > 0 - GROUP BY name HAVING COUNT(*) >= ? - ORDER BY avg_inflow DESC - """, (days, days)).fetchall() - return [dict(r) for r in rows] - - -def query_market_mood(conn: sqlite3.Connection, limit: int = 10) -> list[dict]: - """市场情绪趋势""" - rows = conn.execute(""" - SELECT timestamp, source, up_ratio, mood - FROM market_snapshots ORDER BY timestamp DESC LIMIT ? - """, (limit,)).fetchall() - return [dict(r) for r in rows] - - -def query_db_stats(conn: sqlite3.Connection) -> dict: - """数据库概览""" - snap_count = conn.execute("SELECT COUNT(*) FROM market_snapshots").fetchone()[0] - sector_count = conn.execute("SELECT COUNT(*) FROM sector_snapshots").fetchone()[0] - stock_count = conn.execute("SELECT COUNT(*) FROM stocks").fetchone()[0] - kline_count = conn.execute("SELECT COUNT(*) FROM stock_daily").fetchone()[0] - event_count = conn.execute("SELECT COUNT(*) FROM price_events").fetchone()[0] - holding_count = conn.execute("SELECT COUNT(*) FROM holdings").fetchone()[0] - candidate_count = conn.execute("SELECT COUNT(*) FROM candidates").fetchone()[0] - latest = conn.execute( - "SELECT timestamp, source FROM market_snapshots ORDER BY id DESC LIMIT 1").fetchone() - return { - "snapshots": snap_count, "sector_rows": sector_count, - "stocks": stock_count, "daily_klines": kline_count, - "price_events": event_count, "holdings": holding_count, - "candidates": candidate_count, - "latest_snapshot": dict(latest) if latest else None, - } - - -# ═══════════════════════════════════════════════════════════ -# 持仓查询 -# ═══════════════════════════════════════════════════════════ - -def query_holdings(conn: sqlite3.Connection) -> list[dict]: - """持仓列表(含最新策略)""" - rows = conn.execute(""" - SELECT h.code, h.name, h.shares, h.cost, h.position_pct, h.is_active, - hs.stop_loss, hs.take_profit, hs.entry_low, hs.entry_high, - hs.reason as action, hs.created_at as strategy_updated - FROM holdings h - LEFT JOIN holding_strategies hs ON h.code = hs.code - AND hs.id = (SELECT MAX(id) FROM holding_strategies WHERE code = h.code AND strategy_type = 'holding') - WHERE h.is_active = 1 - """).fetchall() - return [dict(r) for r in rows] - - -def query_holding_by_code(conn: sqlite3.Connection, code: str) -> dict | None: - """单只持仓""" - row = conn.execute(""" - SELECT h.*, hs.stop_loss, hs.take_profit, hs.entry_low, hs.entry_high, - hs.reason as action - FROM holdings h - LEFT JOIN holding_strategies hs ON h.code = hs.code - AND hs.id = (SELECT MAX(id) FROM holding_strategies WHERE code = h.code AND strategy_type = 'holding') - WHERE h.code = ? - """, (code,)).fetchone() - return dict(row) if row else None - - -def query_portfolio_summary(conn: sqlite3.Connection) -> dict: - """持仓汇总""" - row = conn.execute("SELECT * FROM portfolio_summary WHERE id = 1").fetchone() - return dict(row) if row else {} - - -# ═══════════════════════════════════════════════════════════ -# 自选股查询 -# ═══════════════════════════════════════════════════════════ - -def query_watchlist(conn: sqlite3.Connection) -> list[dict]: - """自选股列表(含策略)""" - rows = conn.execute(""" - SELECT w.code, w.name, w.added_at, - hs.stop_loss, hs.take_profit, hs.entry_low, hs.entry_high, - hs.reason as action - FROM watchlist_stocks w - LEFT JOIN holding_strategies hs ON w.code = hs.code - AND hs.id = (SELECT MAX(id) FROM holding_strategies WHERE code = w.code AND strategy_type = 'watch') - WHERE w.is_active = 1 - """).fetchall() - return [dict(r) for r in rows] - - -# ═══════════════════════════════════════════════════════════ -# 决策/策略查询 -# ═══════════════════════════════════════════════════════════ - -def query_strategies(conn: sqlite3.Connection, code: str = None) -> list[dict]: - """策略列表(按版本倒序)""" - if code: - rows = conn.execute( - "SELECT * FROM holding_strategies WHERE code = ? ORDER BY version DESC", (code,)).fetchall() - else: - rows = conn.execute( - "SELECT * FROM holding_strategies ORDER BY code, version DESC").fetchall() - return [dict(r) for r in rows] - - -def query_advice_timeline(conn: sqlite3.Connection, code: str = None, limit: int = 50) -> list[dict]: - """建议时间线""" - if code: - rows = conn.execute( - "SELECT * FROM advice_timeline WHERE code = ? ORDER BY date DESC LIMIT ?", - (code, limit)).fetchall() - else: - rows = conn.execute( - "SELECT * FROM advice_timeline ORDER BY date DESC LIMIT ?", (limit,)).fetchall() - return [dict(r) for r in rows] - - -# ═══════════════════════════════════════════════════════════ -# 候选池查询 -# ═══════════════════════════════════════════════════════════ - -def query_candidates(conn: sqlite3.Connection, active_only: bool = True) -> list[dict]: - """候选池列表(含最新评分)""" - where = "WHERE c.dropped = 0" if active_only else "" - rows = conn.execute(f""" - SELECT c.*, (SELECT score FROM candidate_score_history - WHERE code = c.code ORDER BY created_at DESC LIMIT 1) as latest_score - FROM candidates c {where} - ORDER BY c.zhiwei_star DESC NULLS LAST - """).fetchall() - return [dict(r) for r in rows] - - -def query_candidate_scores(conn: sqlite3.Connection, code: str) -> list[dict]: - """某候选的评分历史""" - rows = conn.execute( - "SELECT * FROM candidate_score_history WHERE code = ? ORDER BY created_at", - (code,)).fetchall() - return [dict(r) for r in rows] - - -# ═══════════════════════════════════════════════════════════ -# 价格事件查询 -# ═══════════════════════════════════════════════════════════ - -def query_price_events(conn: sqlite3.Connection, code: str = None, limit: int = 100) -> list[dict]: - """价格事件""" - if code: - rows = conn.execute( - "SELECT * FROM price_events WHERE code = ? ORDER BY created_at DESC LIMIT ?", - (code, limit)).fetchall() - else: - rows = conn.execute( - "SELECT * FROM price_events ORDER BY created_at DESC LIMIT ?", (limit,)).fetchall() - return [dict(r) for r in rows] - - -def query_price_events_by_date(conn: sqlite3.Connection, date: str) -> list[dict]: - """某天的价格事件""" - rows = conn.execute( - "SELECT * FROM price_events WHERE date = ? ORDER BY created_at DESC", (date,)).fetchall() - return [dict(r) for r in rows] - - -# ═══════════════════════════════════════════════════════════ -# 板块成分查询 -# ═══════════════════════════════════════════════════════════ - -def query_stock_sectors(conn: sqlite3.Connection, code: str) -> list[str]: - """某只股票所属板块""" - rows = conn.execute( - "SELECT sector_name FROM stock_sectors WHERE code = ?", (code,)).fetchall() - return [r[0] for r in rows] - - -def query_sector_stocks(conn: sqlite3.Connection, sector_name: str) -> list[str]: - """某板块包含的股票""" - rows = conn.execute( - "SELECT code FROM stock_sectors WHERE sector_name = ?", (sector_name,)).fetchall() - return [r[0] for r in rows] - - -# ═══════════════════════════════════════════════════════════ -# 准确率统计查询 -# ═══════════════════════════════════════════════════════════ - -def query_accuracy_stats(conn: sqlite3.Connection) -> dict: - """准确率统计""" - row = conn.execute("SELECT * FROM accuracy_stats WHERE id = 1").fetchone() - return dict(row) if row else {} - - -# ═══════════════════════════════════════════════════════════ -# 策略反馈查询 -# ═══════════════════════════════════════════════════════════ - -def query_strategy_feedback(conn: sqlite3.Connection, code: str = None) -> list[dict]: - """策略反馈""" - if code: - rows = conn.execute( - "SELECT * FROM strategy_feedback WHERE code = ? ORDER BY evaluated_at DESC", (code,)).fetchall() - else: - rows = conn.execute( - "SELECT * FROM strategy_feedback ORDER BY evaluated_at DESC").fetchall() - return [dict(r) for r in rows] - - -# ═══════════════════════════════════════════════════════════ -# 策略评估查询 -# ═══════════════════════════════════════════════════════════ - -def query_strategy_evaluations(conn: sqlite3.Connection, code: str = None) -> list[dict]: - """策略评估记录""" - if code: - rows = conn.execute( - "SELECT * FROM strategy_evaluations WHERE code = ? ORDER BY created_at DESC", (code,)).fetchall() - else: - rows = conn.execute( - "SELECT * FROM strategy_evaluations ORDER BY created_at DESC").fetchall() - return [dict(r) for r in rows] - - -# ═══════════════════════════════════════════════════════════ -# 市场快照查询(最新) -# ═══════════════════════════════════════════════════════════ - -def query_latest_market(conn: sqlite3.Connection) -> dict: - """最新一次市场快照(含板块数据)""" - snap = conn.execute( - "SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 1").fetchone() - if not snap: - return {} - snap = dict(snap) - sectors = conn.execute( - "SELECT * FROM sector_snapshots WHERE snapshot_id = ? ORDER BY change_pct DESC", - (snap["id"],)).fetchall() - snap["sectors"] = [dict(r) for r in sectors] - # 计算 top_gainers / top_losers - snap["top_gainers"] = [dict(r) for r in sectors[:5]] - snap["top_losers"] = [dict(r) for r in sectors[-3:]] - return snap +#!/usr/bin/env python3 +"""mofin_db.py — MoFin 统一数据库访问层 + +所有脚本通过此模块访问 mofin.db,避免重复建表/连接逻辑。 + +用法: + from mofin_db import get_conn, write_market_snapshot, write_klines, ... + +设计原则: + - 幂等建表(CREATE TABLE IF NOT EXISTS) + - WAL 模式 + 外键约束 + - 所有写操作返回 (success: bool, detail: str) + - JSON 写入由调用方负责,本模块只写 SQLite +""" + +import sqlite3 +import json +from datetime import datetime +from pathlib import Path +from typing import Optional + +DATA_DIR = Path(__file__).parent / "data" +DB_PATH = DATA_DIR / "mofin.db" + +# ═══════════════════════════════════════════════════════════ +# 连接管理 +# ═══════════════════════════════════════════════════════════ + +def get_conn() -> sqlite3.Connection: + """获取数据库连接(WAL 模式,外键约束,Row 工厂)""" + DATA_DIR.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + return conn + + +# ═══════════════════════════════════════════════════════════ +# 建表(幂等) +# ═══════════════════════════════════════════════════════════ + +def init_all_tables(conn: sqlite3.Connection): + """创建全部表(幂等,已存在则跳过)""" + conn.executescript(""" + -- 市场快照 + CREATE TABLE IF NOT EXISTS market_snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + source TEXT NOT NULL DEFAULT 'ths', + up_ratio REAL, + mood TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_snapshots_time ON market_snapshots(timestamp); + + -- 板块快照 + CREATE TABLE IF NOT EXISTS sector_snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + snapshot_id INTEGER NOT NULL REFERENCES market_snapshots(id), + name TEXT NOT NULL, + change_pct REAL, + up_count INTEGER, + down_count INTEGER, + net_inflow REAL, + lead_stock TEXT, + lead_stock_change REAL, + volume REAL, + turnover REAL + ); + CREATE INDEX IF NOT EXISTS idx_sector_name ON sector_snapshots(name); + CREATE INDEX IF NOT EXISTS idx_sector_snapshot ON sector_snapshots(snapshot_id); + CREATE INDEX IF NOT EXISTS idx_sector_name_time ON sector_snapshots(name, snapshot_id); + + -- 个股 + CREATE TABLE IF NOT EXISTS stocks ( + code TEXT PRIMARY KEY, + name TEXT NOT NULL, + exchange TEXT DEFAULT 'SH', + type TEXT DEFAULT 'A', + updated_at TEXT + ); + + -- K线(日/周/月) + CREATE TABLE IF NOT EXISTS stock_daily ( + code TEXT NOT NULL REFERENCES stocks(code), + date TEXT NOT NULL, + open REAL, close REAL, high REAL, low REAL, + volume REAL, amount REAL, + PRIMARY KEY (code, date) + ); + CREATE TABLE IF NOT EXISTS stock_weekly ( + code TEXT NOT NULL REFERENCES stocks(code), + date TEXT NOT NULL, + open REAL, close REAL, high REAL, low REAL, + volume REAL, + PRIMARY KEY (code, date) + ); + CREATE TABLE IF NOT EXISTS stock_monthly ( + code TEXT NOT NULL REFERENCES stocks(code), + date TEXT NOT NULL, + open REAL, close REAL, high REAL, low REAL, + volume REAL, + PRIMARY KEY (code, date) + ); + + -- 基本面 + CREATE TABLE IF NOT EXISTS stock_fundamentals ( + code TEXT PRIMARY KEY REFERENCES stocks(code), + pe REAL, pb REAL, eps REAL, + mcap_total REAL, mcap_flow REAL, + updated_at TEXT + ); + + -- 板块成分映射 + CREATE TABLE IF NOT EXISTS stock_sectors ( + code TEXT NOT NULL REFERENCES stocks(code), + sector_name TEXT NOT NULL, + source TEXT DEFAULT 'ths', + updated_at TEXT DEFAULT (datetime('now','localtime')), + PRIMARY KEY (code, sector_name) + ); + CREATE INDEX IF NOT EXISTS idx_stock_sector ON stock_sectors(sector_name); + + -- 持仓 + CREATE TABLE IF NOT EXISTS holdings ( + code TEXT PRIMARY KEY REFERENCES stocks(code), + name TEXT NOT NULL, + shares INTEGER NOT NULL, + cost REAL, + price REAL, -- 当前价格 (CNY) + market_value REAL, -- 市值 = shares * price + change_pct REAL, -- 涨跌幅 + currency TEXT NOT NULL DEFAULT 'CNY' CHECK(currency IN ('CNY','HKD')), + position_pct REAL, + added_at TEXT, + is_active INTEGER DEFAULT 1, + closed_at TEXT, + close_pnl REAL + ); + + -- 持仓策略(对应 decisions.json decisions[]) + CREATE TABLE IF NOT EXISTS holding_strategies ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL REFERENCES holdings(code), + name TEXT, + version INTEGER DEFAULT 1, + price REAL, -- 当前价格 + cost REAL, -- 成本价 + shares INTEGER DEFAULT 0, + stop_loss REAL, + take_profit REAL, + entry_low REAL, + entry_high REAL, + currency TEXT NOT NULL DEFAULT 'CNY' CHECK(currency IN ('CNY','HKD')), + strategy_type TEXT DEFAULT 'holding', + action TEXT, -- 买入/持有/卖出/观望 + timing_signal TEXT, -- 时机信号 + rr_ratio REAL, -- 盈亏比 + tech_snapshot TEXT, -- 技术面快照 + stock_category TEXT, -- 股票分类 + sector_context TEXT, -- 板块背景 + status TEXT DEFAULT 'active', -- active/updated/closed + trigger_json TEXT, -- trigger JSON (entry_zone/stop_loss/take_profit_zone) + changelog_json TEXT, -- changelog JSON 数组 + source TEXT, + reason TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')), + updated_at TEXT, + superseded_at TEXT + ); + CREATE INDEX IF NOT EXISTS idx_strategy_code ON holding_strategies(code); + CREATE INDEX IF NOT EXISTS idx_strategy_status ON holding_strategies(status); + + -- 自选股 + CREATE TABLE IF NOT EXISTS watchlist_stocks ( + code TEXT PRIMARY KEY REFERENCES stocks(code), + name TEXT NOT NULL, + price REAL, -- 当前价格 + entry_low REAL, -- 买入区下限 + entry_high REAL, -- 买入区上限 + stop_loss REAL, -- 止损 + currency TEXT NOT NULL DEFAULT 'CNY' CHECK(currency IN ('CNY','HKD')), + source TEXT, -- 来源: alpha_sift/xiaoguo/manual + source_detail TEXT, -- 来源详情 JSON + notes TEXT, -- 备注 + added_by TEXT, -- 谁加的 + added_at TEXT DEFAULT (datetime('now','localtime')), + is_active INTEGER DEFAULT 1, + analysis_json TEXT -- 分析结果 JSON + ); + + -- 候选池 + CREATE TABLE IF NOT EXISTS candidates ( + code TEXT PRIMARY KEY REFERENCES stocks(code), + name TEXT NOT NULL, + sector TEXT, + reason TEXT, + entry_range TEXT, + stop_loss REAL, + target REAL, + zhiwei_star REAL, + zhiwei_reviewed INTEGER DEFAULT 0, + zhiwei_reviewed_at TEXT, + promoted INTEGER DEFAULT 0, + promoted_at TEXT, + dropped INTEGER DEFAULT 0, + drop_reason TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + + -- 候选评分历史 + CREATE TABLE IF NOT EXISTS candidate_score_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL REFERENCES candidates(code), + score REAL NOT NULL, + source TEXT NOT NULL, + reason TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_candidate_history ON candidate_score_history(code, created_at); + + -- 价格事件 + CREATE TABLE IF NOT EXISTS price_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL REFERENCES stocks(code), + name TEXT, + event_type TEXT NOT NULL, + price REAL, + trigger_value TEXT, + event_label TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')), + date TEXT + ); + CREATE INDEX IF NOT EXISTS idx_events_code ON price_events(code); + CREATE INDEX IF NOT EXISTS idx_events_date ON price_events(date); + + -- 策略评估记录 + CREATE TABLE IF NOT EXISTS strategy_evaluations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL REFERENCES stocks(code), + eval_type TEXT NOT NULL, + status TEXT DEFAULT 'pending', + old_stop_loss REAL, + new_stop_loss REAL, + old_tp REAL, + new_tp REAL, + reason TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + + -- 持仓汇总(portfolio.json 顶层字段) + CREATE TABLE IF NOT EXISTS portfolio_summary ( + id INTEGER PRIMARY KEY CHECK (id = 1), + total_assets REAL, + total_mv REAL, -- 持仓总市值 + stock_value REAL, + cash REAL, -- 可用现金 + frozen_cash REAL DEFAULT 0, -- 冻结资金 + position_pct REAL, + total_pnl REAL, + currency TEXT NOT NULL DEFAULT 'CNY' CHECK(currency IN ('CNY','HKD')), + updated_at TEXT + ); + + -- 建议时间线(decisions.json advice_timeline[]) + CREATE TABLE IF NOT EXISTS advice_timeline ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL REFERENCES stocks(code), + date TEXT, + direction TEXT, + price REAL, + summary TEXT, + status TEXT, + evaluated INTEGER DEFAULT 0, + result TEXT, + evaluated_at TEXT, + report_id TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_advice_code ON advice_timeline(code); + + -- 准确率统计(accuracy_stats.json) + CREATE TABLE IF NOT EXISTS accuracy_stats ( + id INTEGER PRIMARY KEY CHECK (id = 1), + period_start TEXT, + period_end TEXT, + total_advice INTEGER DEFAULT 0, + correct INTEGER DEFAULT 0, + wrong INTEGER DEFAULT 0, + partial INTEGER DEFAULT 0, + unknown INTEGER DEFAULT 0, + pending INTEGER DEFAULT 0, + ignored INTEGER DEFAULT 0, + evaluated INTEGER DEFAULT 0, + accuracy_pct REAL, + phase1_correct INTEGER DEFAULT 0, + phase1_wrong INTEGER DEFAULT 0, + phase1_pending INTEGER DEFAULT 0, + phase1_accuracy REAL, + phase2_correct INTEGER DEFAULT 0, + phase2_wrong INTEGER DEFAULT 0, + phase2_pending INTEGER DEFAULT 0, + phase2_accuracy REAL, + total_evaluated INTEGER DEFAULT 0, + updated_at TEXT + ); + + -- 策略反馈(strategy_feedback.json) + CREATE TABLE IF NOT EXISTS strategy_feedback ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL REFERENCES stocks(code), + name TEXT, + evaluated_at TEXT, + phase1_completed INTEGER DEFAULT 0, + phase1_result TEXT, + phase1_completed_at TEXT, + phase1_price REAL, + phase2_completed INTEGER DEFAULT 0, + phase2_result TEXT, + phase2_completed_at TEXT, + days_in_phase1 INTEGER, + adjustments_json TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_feedback_code ON strategy_feedback(code); + + -- 板块信号(trend_detector 产出) + CREATE TABLE IF NOT EXISTS sector_signals ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + signal_type TEXT NOT NULL, + sector TEXT NOT NULL, + severity TEXT DEFAULT 'medium', + related_stocks TEXT, + holdings_in_sector TEXT, + watchlist_in_sector TEXT, + trigger_reason TEXT, + snapshot_id INTEGER, + processed INTEGER DEFAULT 0, + detected_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_signal_processed ON sector_signals(processed); + CREATE INDEX IF NOT EXISTS idx_signal_sector ON sector_signals(sector); + + -- 小果情报(xiaoguo_news_processor 产出) + CREATE TABLE IF NOT EXISTS signal_news ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + signal_id INTEGER REFERENCES sector_signals(id), + sector TEXT NOT NULL, + overall_sentiment TEXT, + summary TEXT, + key_articles TEXT, + searched_stocks TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_signal_news_signal ON signal_news(signal_id); + + -- 小果扫描跟踪(去重用) + CREATE TABLE IF NOT EXISTS xiaoguo_scan_tracker ( + code TEXT PRIMARY KEY, + name TEXT, + last_scanned_at TEXT, + found_count INTEGER DEFAULT 0 + ); + """) + conn.commit() + + # 迁移:给 signal_news 加 source 字段(幂等) + try: + conn.execute("ALTER TABLE signal_news ADD COLUMN source TEXT DEFAULT 'trend'") + except sqlite3.OperationalError: + pass + conn.commit() + + +# ═══════════════════════════════════════════════════════════ +# 市场快照写入 +# ═══════════════════════════════════════════════════════════ + +def write_market_snapshot(conn: sqlite3.Connection, market_data: dict) -> tuple[bool, str, Optional[int]]: + """写入一次市场采集到 market_snapshots + sector_snapshots + + Returns: (ok, message, snapshot_id) + """ + try: + cur = conn.execute( + "INSERT INTO market_snapshots (timestamp, source, up_ratio, mood) VALUES (?, ?, ?, ?)", + (market_data["timestamp"], market_data.get("source", "unknown"), + market_data.get("up_ratio", 0), market_data.get("mood", "unknown")), + ) + sid = cur.lastrowid + + sectors = market_data.get("sectors", []) + rows = [(sid, s.get("name", ""), s.get("change", 0), + s.get("up_count"), s.get("down_count"), s.get("net_inflow"), + s.get("lead_stock"), s.get("lead_stock_change"), + s.get("volume"), s.get("turnover")) for s in sectors] + if rows: + conn.executemany( + "INSERT INTO sector_snapshots (snapshot_id, name, change_pct, up_count, down_count, " + "net_inflow, lead_stock, lead_stock_change, volume, turnover) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", rows) + conn.commit() + return True, f"snapshot_id={sid}, sectors={len(rows)}", sid + except Exception as e: + try: + conn.rollback() + except Exception: + pass + return False, str(e), None + + +# ═══════════════════════════════════════════════════════════ +# K线写入 +# ═══════════════════════════════════════════════════════════ + +def write_klines(conn: sqlite3.Connection, code: str, name: str, + daily: list = None, weekly: list = None, monthly: list = None, + fundamentals: dict = None) -> bool: + """将个股K线数据双写 SQLite + + Args: + code: 股票代码 + name: 股票名称 + daily/weekly/monthly: [{date, open, close, high, low, volume}, ...] + fundamentals: {pe, pb, eps, mcap_total, mcap_flow} + """ + try: + # 判断交易所 + raw = str(code) + if len(raw) == 5 and raw.isdigit(): + exchange, stype = "HK", "H" + elif raw.startswith(("6", "5", "9")): + exchange, stype = "SH", "A" + else: + exchange, stype = "SZ", "A" + + # stocks 表(INSERT OR REPLACE) + conn.execute( + "INSERT OR REPLACE INTO stocks (code, name, exchange, type, updated_at) VALUES (?, ?, ?, ?, ?)", + (code, name, exchange, stype, datetime.now().isoformat())) + + # K线数据 + for period, table, data in [ + ("daily", "stock_daily", daily), + ("weekly", "stock_weekly", weekly), + ("monthly", "stock_monthly", monthly), + ]: + if not data: + continue + rows = [(code, d.get("date", ""), d.get("open"), d.get("close"), + d.get("high"), d.get("low"), d.get("volume"), + d.get("amount") if period == "daily" else None) for d in data] + if period == "daily": + conn.executemany( + f"INSERT OR REPLACE INTO {table} (code, date, open, close, high, low, volume, amount) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", rows) + else: + conn.executemany( + f"INSERT OR REPLACE INTO {table} (code, date, open, close, high, low, volume) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + [(r[0], r[1], r[2], r[3], r[4], r[5], r[6]) for r in rows]) + + # 基本面 + if fundamentals: + conn.execute( + "INSERT OR REPLACE INTO stock_fundamentals (code, pe, pb, eps, mcap_total, mcap_flow, updated_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (code, fundamentals.get("pe"), fundamentals.get("pb"), + fundamentals.get("eps"), fundamentals.get("mcap_total"), + fundamentals.get("mcap_flow"), datetime.now().isoformat())) + + conn.commit() + return True + except Exception as e: + try: + conn.rollback() + except Exception: + pass + return False + + +# ═══════════════════════════════════════════════════════════ +# 价格事件写入 +# ═══════════════════════════════════════════════════════════ + +def write_price_event(conn: sqlite3.Connection, code: str, name: str, + event_type: str, price: float, trigger_value: str, + event_label: str = "") -> bool: + """写入一条价格事件""" + try: + now = datetime.now() + conn.execute( + "INSERT INTO price_events (code, name, event_type, price, trigger_value, event_label, date) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (code, name, event_type, round(price, 2), trigger_value, + event_label, now.strftime("%Y-%m-%d"))) + conn.commit() + return True + except Exception: + try: + conn.rollback() + except Exception: + pass + return False + + +# ═══════════════════════════════════════════════════════════ +# 板块成分迁移 +# ═══════════════════════════════════════════════════════════ + +def migrate_stock_sectors(conn: sqlite3.Connection) -> tuple[int, int]: + """从 stock_sector_map.json 迁移到 stock_sectors 表 + + Returns: (migrated_stocks, total_mappings) + """ + sector_map_path = DATA_DIR / "stock_sector_map.json" + if not sector_map_path.exists(): + return 0, 0 + + try: + with open(sector_map_path, encoding="utf-8") as f: + data = json.load(f) + except Exception: + return 0, 0 + + # 过滤元数据字段 + mappings = [(code, sectors) for code, sectors in data.items() + if not code.startswith("_") and isinstance(sectors, list)] + + total = 0 + for code, sectors in mappings: + for sector in sectors: + try: + conn.execute( + "INSERT OR IGNORE INTO stock_sectors (code, sector_name, source) VALUES (?, ?, 'ths')", + (code, sector)) + total += 1 + except Exception: + pass + conn.commit() + return len(mappings), total + + +# ═══════════════════════════════════════════════════════════ +# 查询辅助 +# ═══════════════════════════════════════════════════════════ + +def query_sector_trend(conn: sqlite3.Connection, name: str, limit: int = 5) -> list[dict]: + """板块最近N次趋势""" + rows = conn.execute(""" + SELECT s.timestamp, ss.change_pct, ss.net_inflow, + ss.up_count, ss.down_count, ss.lead_stock, ss.lead_stock_change + FROM sector_snapshots ss + JOIN market_snapshots s ON ss.snapshot_id = s.id + WHERE ss.name = ? ORDER BY s.timestamp DESC LIMIT ? + """, (name, limit)).fetchall() + return [dict(r) for r in rows] + + +def query_top_inflow(conn: sqlite3.Connection, limit: int = 5) -> list[dict]: + """最新一次资金净流入排行""" + rows = conn.execute(""" + SELECT ss.name, ss.change_pct, ss.net_inflow, ss.lead_stock, s.timestamp + FROM sector_snapshots ss + JOIN market_snapshots s ON ss.snapshot_id = s.id + WHERE s.id = (SELECT MAX(id) FROM market_snapshots) + AND ss.net_inflow IS NOT NULL + ORDER BY ss.net_inflow DESC LIMIT ? + """, (limit,)).fetchall() + return [dict(r) for r in rows] + + +def query_consecutive_inflow(conn: sqlite3.Connection, days: int = 3) -> list[dict]: + """连续N次净流入的板块""" + rows = conn.execute(""" + SELECT name, COUNT(*) as times, ROUND(AVG(net_inflow), 2) as avg_inflow, + ROUND(AVG(change_pct), 2) as avg_change + FROM sector_snapshots ss + JOIN market_snapshots s ON ss.snapshot_id = s.id + WHERE s.id > (SELECT MAX(id) - ? FROM market_snapshots) + AND net_inflow > 0 + GROUP BY name HAVING COUNT(*) >= ? + ORDER BY avg_inflow DESC + """, (days, days)).fetchall() + return [dict(r) for r in rows] + + +def query_market_mood(conn: sqlite3.Connection, limit: int = 10) -> list[dict]: + """市场情绪趋势""" + rows = conn.execute(""" + SELECT timestamp, source, up_ratio, mood + FROM market_snapshots ORDER BY timestamp DESC LIMIT ? + """, (limit,)).fetchall() + return [dict(r) for r in rows] + + +def query_db_stats(conn: sqlite3.Connection) -> dict: + """数据库概览""" + snap_count = conn.execute("SELECT COUNT(*) FROM market_snapshots").fetchone()[0] + sector_count = conn.execute("SELECT COUNT(*) FROM sector_snapshots").fetchone()[0] + stock_count = conn.execute("SELECT COUNT(*) FROM stocks").fetchone()[0] + kline_count = conn.execute("SELECT COUNT(*) FROM stock_daily").fetchone()[0] + event_count = conn.execute("SELECT COUNT(*) FROM price_events").fetchone()[0] + holding_count = conn.execute("SELECT COUNT(*) FROM holdings").fetchone()[0] + candidate_count = conn.execute("SELECT COUNT(*) FROM candidates").fetchone()[0] + latest = conn.execute( + "SELECT timestamp, source FROM market_snapshots ORDER BY id DESC LIMIT 1").fetchone() + return { + "snapshots": snap_count, "sector_rows": sector_count, + "stocks": stock_count, "daily_klines": kline_count, + "price_events": event_count, "holdings": holding_count, + "candidates": candidate_count, + "latest_snapshot": dict(latest) if latest else None, + } + + +# ═══════════════════════════════════════════════════════════ +# 持仓查询 +# ═══════════════════════════════════════════════════════════ + +def query_holdings(conn: sqlite3.Connection) -> list[dict]: + """持仓列表(含最新策略)""" + rows = conn.execute(""" + SELECT h.code, h.name, h.shares, h.cost, h.position_pct, h.is_active, + hs.stop_loss, hs.take_profit, hs.entry_low, hs.entry_high, + hs.reason as action, hs.created_at as strategy_updated + FROM holdings h + LEFT JOIN holding_strategies hs ON h.code = hs.code + AND hs.id = (SELECT MAX(id) FROM holding_strategies WHERE code = h.code AND strategy_type = 'holding') + WHERE h.is_active = 1 + """).fetchall() + return [dict(r) for r in rows] + + +def query_holding_by_code(conn: sqlite3.Connection, code: str) -> dict | None: + """单只持仓""" + row = conn.execute(""" + SELECT h.*, hs.stop_loss, hs.take_profit, hs.entry_low, hs.entry_high, + hs.reason as action + FROM holdings h + LEFT JOIN holding_strategies hs ON h.code = hs.code + AND hs.id = (SELECT MAX(id) FROM holding_strategies WHERE code = h.code AND strategy_type = 'holding') + WHERE h.code = ? + """, (code,)).fetchone() + return dict(row) if row else None + + +def query_portfolio_summary(conn: sqlite3.Connection) -> dict: + """持仓汇总""" + row = conn.execute("SELECT * FROM portfolio_summary WHERE id = 1").fetchone() + return dict(row) if row else {} + + +# ═══════════════════════════════════════════════════════════ +# 自选股查询 +# ═══════════════════════════════════════════════════════════ + +def query_watchlist(conn: sqlite3.Connection) -> list[dict]: + """自选股列表(含策略)""" + rows = conn.execute(""" + SELECT w.code, w.name, w.added_at, + hs.stop_loss, hs.take_profit, hs.entry_low, hs.entry_high, + hs.reason as action + FROM watchlist_stocks w + LEFT JOIN holding_strategies hs ON w.code = hs.code + AND hs.id = (SELECT MAX(id) FROM holding_strategies WHERE code = w.code AND strategy_type = 'watch') + WHERE w.is_active = 1 + """).fetchall() + return [dict(r) for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 决策/策略查询 +# ═══════════════════════════════════════════════════════════ + +def query_strategies(conn: sqlite3.Connection, code: str = None) -> list[dict]: + """策略列表(按版本倒序)""" + if code: + rows = conn.execute( + "SELECT * FROM holding_strategies WHERE code = ? ORDER BY version DESC", (code,)).fetchall() + else: + rows = conn.execute( + "SELECT * FROM holding_strategies ORDER BY code, version DESC").fetchall() + return [dict(r) for r in rows] + + +def query_advice_timeline(conn: sqlite3.Connection, code: str = None, limit: int = 50) -> list[dict]: + """建议时间线""" + if code: + rows = conn.execute( + "SELECT * FROM advice_timeline WHERE code = ? ORDER BY date DESC LIMIT ?", + (code, limit)).fetchall() + else: + rows = conn.execute( + "SELECT * FROM advice_timeline ORDER BY date DESC LIMIT ?", (limit,)).fetchall() + return [dict(r) for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 候选池查询 +# ═══════════════════════════════════════════════════════════ + +def query_candidates(conn: sqlite3.Connection, active_only: bool = True) -> list[dict]: + """候选池列表(含最新评分)""" + where = "WHERE c.dropped = 0" if active_only else "" + rows = conn.execute(f""" + SELECT c.*, (SELECT score FROM candidate_score_history + WHERE code = c.code ORDER BY created_at DESC LIMIT 1) as latest_score + FROM candidates c {where} + ORDER BY c.zhiwei_star DESC NULLS LAST + """).fetchall() + return [dict(r) for r in rows] + + +def query_candidate_scores(conn: sqlite3.Connection, code: str) -> list[dict]: + """某候选的评分历史""" + rows = conn.execute( + "SELECT * FROM candidate_score_history WHERE code = ? ORDER BY created_at", + (code,)).fetchall() + return [dict(r) for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 价格事件查询 +# ═══════════════════════════════════════════════════════════ + +def query_price_events(conn: sqlite3.Connection, code: str = None, limit: int = 100) -> list[dict]: + """价格事件""" + if code: + rows = conn.execute( + "SELECT * FROM price_events WHERE code = ? ORDER BY created_at DESC LIMIT ?", + (code, limit)).fetchall() + else: + rows = conn.execute( + "SELECT * FROM price_events ORDER BY created_at DESC LIMIT ?", (limit,)).fetchall() + return [dict(r) for r in rows] + + +def query_price_events_by_date(conn: sqlite3.Connection, date: str) -> list[dict]: + """某天的价格事件""" + rows = conn.execute( + "SELECT * FROM price_events WHERE date = ? ORDER BY created_at DESC", (date,)).fetchall() + return [dict(r) for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 板块成分查询 +# ═══════════════════════════════════════════════════════════ + +def query_stock_sectors(conn: sqlite3.Connection, code: str) -> list[str]: + """某只股票所属板块""" + rows = conn.execute( + "SELECT sector_name FROM stock_sectors WHERE code = ?", (code,)).fetchall() + return [r[0] for r in rows] + + +def query_sector_stocks(conn: sqlite3.Connection, sector_name: str) -> list[str]: + """某板块包含的股票""" + rows = conn.execute( + "SELECT code FROM stock_sectors WHERE sector_name = ?", (sector_name,)).fetchall() + return [r[0] for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 准确率统计查询 +# ═══════════════════════════════════════════════════════════ + +def query_accuracy_stats(conn: sqlite3.Connection) -> dict: + """准确率统计""" + row = conn.execute("SELECT * FROM accuracy_stats WHERE id = 1").fetchone() + return dict(row) if row else {} + + +# ═══════════════════════════════════════════════════════════ +# 策略反馈查询 +# ═══════════════════════════════════════════════════════════ + +def query_strategy_feedback(conn: sqlite3.Connection, code: str = None) -> list[dict]: + """策略反馈""" + if code: + rows = conn.execute( + "SELECT * FROM strategy_feedback WHERE code = ? ORDER BY evaluated_at DESC", (code,)).fetchall() + else: + rows = conn.execute( + "SELECT * FROM strategy_feedback ORDER BY evaluated_at DESC").fetchall() + return [dict(r) for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 策略评估查询 +# ═══════════════════════════════════════════════════════════ + +def query_strategy_evaluations(conn: sqlite3.Connection, code: str = None) -> list[dict]: + """策略评估记录""" + if code: + rows = conn.execute( + "SELECT * FROM strategy_evaluations WHERE code = ? ORDER BY created_at DESC", (code,)).fetchall() + else: + rows = conn.execute( + "SELECT * FROM strategy_evaluations ORDER BY created_at DESC").fetchall() + return [dict(r) for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 市场快照查询(最新) +# ═══════════════════════════════════════════════════════════ + +def query_latest_market(conn: sqlite3.Connection) -> dict: + """最新一次市场快照(含板块数据)""" + snap = conn.execute( + "SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 1").fetchone() + if not snap: + return {} + snap = dict(snap) + sectors = conn.execute( + "SELECT * FROM sector_snapshots WHERE snapshot_id = ? ORDER BY change_pct DESC", + (snap["id"],)).fetchall() + snap["sectors"] = [dict(r) for r in sectors] + # 计算 top_gainers / top_losers + snap["top_gainers"] = [dict(r) for r in sectors[:5]] + snap["top_losers"] = [dict(r) for r in sectors[-3:]] + return snap + + +# ═══════════════════════════════════════════════════════════════════ +# 核心写函数 — 替代 json.dump(),强制币种约束 +# ═══════════════════════════════════════════════════════════════════ + +def write_holding_strategy(conn, code: str, name: str, data: dict) -> tuple[bool, str]: + """写入持仓策略(替代 decisions.json 单条写入)。data 必须包含 currency。""" + try: + currency = data.get('currency', 'CNY') + conn.execute(""" + INSERT INTO holding_strategies + (code, name, version, price, cost, shares, stop_loss, take_profit, + entry_low, entry_high, currency, strategy_type, action, + timing_signal, rr_ratio, tech_snapshot, stock_category, + sector_context, status, trigger_json, changelog_json, + source, reason, updated_at) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,datetime('now','localtime')) + ON CONFLICT(code) DO UPDATE SET + name=excluded.name, price=excluded.price, cost=excluded.cost, + shares=excluded.shares, stop_loss=excluded.stop_loss, + take_profit=excluded.take_profit, entry_low=excluded.entry_low, + entry_high=excluded.entry_high, currency=excluded.currency, + action=excluded.action, timing_signal=excluded.timing_signal, + rr_ratio=excluded.rr_ratio, tech_snapshot=excluded.tech_snapshot, + stock_category=excluded.stock_category, + sector_context=excluded.sector_context, status=excluded.status, + trigger_json=excluded.trigger_json, changelog_json=excluded.changelog_json, + source=excluded.source, reason=excluded.reason, + updated_at=datetime('now','localtime') + """, ( + code, name, + data.get('version', 1), data.get('price'), data.get('cost'), + data.get('shares', 0), data.get('stop_loss'), data.get('take_profit'), + data.get('entry_low'), data.get('entry_high'), currency, + data.get('strategy_type', 'holding'), data.get('action'), + data.get('timing_signal'), data.get('rr_ratio'), + data.get('tech_snapshot'), data.get('stock_category'), + data.get('sector_context'), data.get('status', 'active'), + data.get('trigger_json'), data.get('changelog_json'), + data.get('source'), data.get('reason'), + )) + conn.commit() + return True, f"策略 {code} 已写入" + except sqlite3.IntegrityError as e: + return False, f"币种约束: {e}" + except Exception as e: + return False, str(e) + + +def write_holdings_batch(conn, holdings: list[dict]) -> tuple[bool, str]: + """批量写入持仓(替代 portfolio.json holdings[])""" + try: + for h in holdings: + conn.execute(""" + INSERT INTO holdings (code, name, shares, cost, price, market_value, + change_pct, currency, position_pct, added_at, is_active) + VALUES (?,?,?,?,?,?,?,?,?,datetime('now','localtime'),1) + ON CONFLICT(code) DO UPDATE SET + name=excluded.name, shares=excluded.shares, cost=excluded.cost, + price=excluded.price, market_value=excluded.market_value, + change_pct=excluded.change_pct, currency=excluded.currency, + position_pct=excluded.position_pct + """, ( + h.get('code'), h.get('name'), h.get('shares', 0), + h.get('cost'), h.get('price'), + h.get('market_value'), h.get('change_pct'), + h.get('currency', 'CNY'), h.get('position_pct'), + )) + conn.commit() + return True, f"已写入 {len(holdings)} 条持仓" + except sqlite3.IntegrityError as e: + conn.rollback() + return False, f"币种约束: {e}" + + +def write_portfolio_summary(conn, data: dict) -> tuple[bool, str]: + """写入持仓汇总(替代 portfolio.json 顶层)""" + try: + conn.execute(""" + INSERT INTO portfolio_summary (id, total_assets, total_mv, stock_value, + cash, frozen_cash, position_pct, total_pnl, currency, updated_at) + VALUES (1,?,?,?,?,?,?,?,?,datetime('now','localtime')) + ON CONFLICT(id) DO UPDATE SET + total_assets=excluded.total_assets, total_mv=excluded.total_mv, + stock_value=excluded.stock_value, cash=excluded.cash, + frozen_cash=excluded.frozen_cash, position_pct=excluded.position_pct, + total_pnl=excluded.total_pnl, currency=excluded.currency, + updated_at=datetime('now','localtime') + """, ( + data.get('total_assets'), data.get('total_mv'), data.get('stock_value'), + data.get('cash'), data.get('frozen_cash', 0), data.get('position_pct'), + data.get('total_pnl'), data.get('currency', 'CNY'), + )) + conn.commit() + return True, "汇总已写入" + except sqlite3.IntegrityError as e: + return False, f"约束: {e}" + + +def write_watchlist_stock(conn, stock: dict) -> tuple[bool, str]: + """写入自选股(替代 watchlist.json)""" + try: + conn.execute(""" + INSERT INTO watchlist_stocks (code, name, price, entry_low, entry_high, + stop_loss, currency, source, source_detail, notes, added_by, added_at) + VALUES (?,?,?,?,?,?,?,?,?,?,?,datetime('now','localtime')) + ON CONFLICT(code) DO UPDATE SET + name=excluded.name, price=excluded.price, entry_low=excluded.entry_low, + entry_high=excluded.entry_high, stop_loss=excluded.stop_loss, + currency=excluded.currency, source=excluded.source, + source_detail=excluded.source_detail, notes=excluded.notes, + added_by=excluded.added_by + """, ( + stock.get('code'), stock.get('name'), stock.get('price'), + stock.get('entry_low'), stock.get('entry_high'), stock.get('stop_loss'), + stock.get('currency', 'CNY'), stock.get('source'), stock.get('source_detail'), + stock.get('notes'), stock.get('added_by'), + )) + conn.commit() + return True, f"自选 {stock.get('code')} 已写入" + except sqlite3.IntegrityError as e: + return False, f"约束: {e}" diff --git a/price_monitor.py b/price_monitor.py index 7b2e7a9..0be0a5f 100644 --- a/price_monitor.py +++ b/price_monitor.py @@ -1,688 +1,714 @@ -#!/usr/bin/env python3 -"""price_monitor.py — 高频价格监控脚本(批量版) -规则:进入区间报一次,离开区间报一次,中间不重复。 -每次运行时一次性刷新所有持仓+自选股的实时价。 -""" -import json -import urllib.request -import os -import sys -import time -from datetime import datetime - -# ── MoFin unified model ────────────────────────────────────────────── -from mo_models import is_hk_stock, get_hk_rate, calc_total_assets, calc_total_mv, calc_position_pct - -DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" -PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" -WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json" -BREACH_PATH = "/home/hmo/.hermes/zone_breach.json" -STATE_PATH = os.path.expanduser("~/.hermes/price_trigger_state.json") -EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json" - -# 策略重评依赖(技术面驱动,非机械百分比) -sys.path.insert(0, "/home/hmo/web-dashboard") -try: - from strategy_lifecycle import reassess_strategy - HAS_REASSESS = True -except ImportError: - HAS_REASSESS = False - -try: - HK_RATE = get_hk_rate() -except Exception: - HK_RATE = 0.87 # ultimate fallback - -# 分支系统与情景检测 -try: - sys.path.insert(0, '/home/hmo/MoFin') - from strategy_tree import detect_scenario, evaluate_branches - HAS_TREE = True -except Exception: - HAS_TREE = False - def detect_scenario(): return {} - def evaluate_branches(*a, **kw): return [] - -# 情景缓存(每次run_once刷新) -_SCENARIO_CACHE = {} -_BRANCH_CACHE = {} # code -> branches list - -UA = "Mozilla/5.0" - -# ── 批量拉取价格 ────────────────────────────────────────────────────────── - -def fetch_all_prices(codes): - """腾讯批量行情API:仅用于A股(沪市/深市) - A股:sh600110 / sz000001 - 港股已迁移至 fetch_hk_eastmoney()(东方财富实时行情) - 返回 {code: (price, change, change_pct)} - """ - if not codes: - return {} - - # 只处理A股(6位代码),港股走东方财富 - a_codes = [c for c in codes if len(str(c).strip()) == 6] - if not a_codes: - return {} - - symbols = [] - code_map = {} - for code in a_codes: - code_s = str(code).strip() - if code_s.startswith(('5', '6', '9')): - sym = f"sh{code_s}" - else: - sym = f"sz{code_s}" - symbols.append(sym) - code_map[sym] = code_s - - url = f"http://qt.gtimg.cn/q={','.join(symbols)}" - try: - req = urllib.request.Request(url, headers={"User-Agent": UA}) - with urllib.request.urlopen(req, timeout=10) as r: - text = r.read().decode("gbk") - except Exception as e: - print(f"⚠️ 腾讯A股拉取失败: {e}", file=sys.stderr) - return {} - - results = {} - for line in text.strip().split("\n"): - line = line.strip() - if not line or "=" not in line: - continue - try: - raw_value = line.split("=", 1)[1].strip().strip('"').strip(";") - fields = raw_value.split("~") - if len(fields) < 6: - continue - sym = line.split("=", 1)[0].strip().lstrip("v_") - orig_code = code_map.get(sym) - if not orig_code: - continue - price = float(fields[3]) if fields[3] else 0 - prev_close = float(fields[4]) if fields[4] else 0 - change = price - prev_close if prev_close > 0 else 0 - change_pct = fields[32] if len(fields) > 32 and fields[32] else "0" - results[orig_code] = (price, change, change_pct) - except (ValueError, IndexError): - continue - - return results - - -# ── 港股实时行情(东方财富,无15分钟延迟)────────────────────────────── - -def fetch_hk_eastmoney(codes): - """东方财富港股实时行情 API — 免费、实时、无15分钟延迟。 - - API: push2.eastmoney.com - 市场代码: 116 (港交所) - 格式: 116.00700 - - 返回 {code: (price, change, change_pct)} - - Fallback: 失败时回退到腾讯 qt.gtimg.cn(15分钟延迟) - """ - if not codes: - return {} - - hk_codes = [str(c).strip() for c in codes if len(str(c).strip()) <= 5] - if not hk_codes: - return {} - - results = {} - - # 主通道:东方财富实时行情(逐股查询,港股最多~10只,可接受) - for code in hk_codes: - try: - url = (f"https://push2.eastmoney.com/api/qt/stock/get" - f"?secid=116.{code}" - f"&fields=f43,f170,f60,f57,f58" - f"&fltt=2") - req = urllib.request.Request(url, headers={"User-Agent": UA}) - with urllib.request.urlopen(req, timeout=5) as r: - resp = json.loads(r.read().decode("utf-8")) - - if resp.get("rc") != 0: - continue - item = resp.get("data", {}) - if not item: - continue - price = float(item.get("f43", 0)) if item.get("f43") else 0 - prev_close = float(item.get("f60", 0)) if item.get("f60") else 0 - change = round(price - prev_close, 2) if prev_close > 0 else 0 - change_pct = str(item.get("f170", "0")) - if price > 0: - results[code] = (price, change, change_pct) - # 东方财富有频率限制,每请求间隔 0.2s - time.sleep(0.2) - except Exception as e: - print(f"⚠️ 东方财富 {code} 拉取失败: {e}", file=sys.stderr) - continue - - # Fallback: 腾讯 qt.gtimg.cn(15分钟延迟) - missing = [c for c in hk_codes if c not in results] - if missing: - try: - fallback = _fetch_hk_tencent_fallback(missing) - results.update(fallback) - except Exception: - pass - - return results - - -def _fetch_hk_tencent_fallback(codes): - """腾讯港股行情(15分钟延迟,仅作 fallback)""" - symbols = [f"hk{c}" for c in codes] - url = f"http://qt.gtimg.cn/q={','.join(symbols)}" - req = urllib.request.Request(url, headers={"User-Agent": UA}) - with urllib.request.urlopen(req, timeout=10) as r: - text = r.read().decode("gbk") - - code_map = {f"hk{c}": c for c in codes} - results = {} - for line in text.strip().split("\n"): - if "=" not in line: - continue - try: - raw = line.split("=", 1)[1].strip().strip('"').strip(";") - fields = raw.split("~") - if len(fields) < 6: - continue - sym = line.split("=", 1)[0].strip().lstrip("v_") - orig = code_map.get(sym) - if not orig: - continue - price = float(fields[3]) if fields[3] else 0 - prev_close = float(fields[4]) if fields[4] else 0 - change = price - prev_close if prev_close > 0 else 0 - change_pct = fields[32] if len(fields) > 32 and fields[32] else "0" - results[orig] = (price, change, change_pct) - except (ValueError, IndexError): - continue - return results - - -def refresh_data_prices(): - """一次性刷新portfolio.json和watchlist.json的所有实时价""" - all_codes = set() - - # 收集所有需要拉取的代码 - try: - pf = json.load(open(PORTFOLIO_PATH)) - for s in pf.get('holdings', []): - all_codes.add(s['code']) - except: - pf = {"holdings": []} - - try: - wl = json.load(open(WATCHLIST_PATH)) - for s in wl.get('stocks', []): - all_codes.add(s['code']) - except: - wl = {"stocks": []} - - if not all_codes: - return 0 - - # 分批拉取:A股走腾讯(实时) + 港股走东方财富(实时) - all_list = list(all_codes) - prices = fetch_all_prices(all_list) # A股(腾讯,实时) - hk_prices = fetch_hk_eastmoney(all_list) # 港股(东方财富,实时) - prices.update(hk_prices) - updated = 0 - - # 保存全量实时价快照(供报告管道消费,确保分析用最新数据) - try: - live = {"updated_at": datetime.now().isoformat(), "prices": {}} - for code in all_codes: - if code in prices: - p, c, chg = prices[code] - live["prices"][code] = {"price": p, "change_pct": chg} - json.dump(live, open("/home/hmo/web-dashboard/data/live_prices.json", "w"), indent=2) - except Exception: - pass - - # 更新portfolio(只在价格变化时写入,避免触发文件变更通知) - changed = False - for s in pf.get('holdings', []): - if s['code'] in prices: - price, _, change_pct = prices[s['code']] - if price > 0: - # 港股:API返回HKD,需转RMB - if is_hk_stock(s['code']): - price = round(price * HK_RATE, 2) - old = s.get('price', 0) - if abs(old - price) > 0.001: - s['price'] = round(price, 2) - s['change_pct'] = float(change_pct) if change_pct else 0 - updated += 1 - changed = True - if changed: - pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M') - # 统一计算总资产(mo_models 唯一公式) - pf['total_mv'] = calc_total_mv(pf.get('holdings', [])) - pf['total_assets'] = calc_total_assets(pf) - pf['position_pct'] = calc_position_pct(pf) - json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2) - elif pf.get('updated_at'): - # 即使价格无变化,每10分钟刷新一次updated_at,防健康检查误报 - try: - last_ts = datetime.strptime(pf['updated_at'], '%Y-%m-%d %H:%M') - if (datetime.now() - last_ts).total_seconds() > 600: - pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M') - json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2) - except: - pass - - # 更新watchlist(只在价格变化时写入) - changed = False - for s in wl.get('stocks', []): - if s['code'] in prices: - price, _, change_pct = prices[s['code']] - if price > 0: - # 港股:API返回HKD,需转RMB - if is_hk_stock(s['code']): - price = round(price * HK_RATE, 2) - old = s.get('price', 0) - if abs(old - price) > 0.001: - s['price'] = round(price, 2) - s['change_pct'] = float(change_pct) if change_pct else 0 - updated += 1 - changed = True - if changed: - wl['updated_at'] = datetime.now().isoformat() - json.dump(wl, open(WATCHLIST_PATH, 'w'), ensure_ascii=False, indent=2) - - # --- 汇总值重算(使用 mo_models 唯一公式)--- - try: - live_market_value = calc_total_mv(pf.get('holdings', [])) - old_mv = pf.get('total_mv', 0) - - if abs(old_mv - live_market_value) > 0.01: - pf['total_mv'] = round(live_market_value, 2) - - pf['total_assets'] = calc_total_assets(pf) - if pf['total_assets'] > 0: - pf['position_pct'] = calc_position_pct(pf) - pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M') - json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2) - except Exception as e: - print(f" [汇总重算失败] {e}", flush=True) - # --- 结束汇总重算 --- - - return updated - - -# ── 分支系统辅助函数 ────────────────────────────────────────────────────── - -def _branch_alert_suffix(code, price, shares=0, cost=0): - """返回分支信息后缀:「 | 情景→动作」""" - if not HAS_TREE or not _SCENARIO_CACHE.get('id'): - return "" - try: - sc_id = _SCENARIO_CACHE['id'] - results = evaluate_branches(code, sc_id, price, shares, cost) - for r in results: - if r.get('applicable'): - _record_branch_trigger(code, r.get('branch_id',''), price) - branch_action = r.get('action_type', r.get('action', 'hold')) - return f" | {sc_id}→{branch_action}" - except Exception: - pass - return "" - - -def _record_branch_trigger(code, branch_id, price): - """记录分支触发事件(自成长:trigger_count+1)""" - try: - raw = json.load(open(DECISIONS_PATH)) - for d in raw.get('decisions', []): - if d.get('code') == code and d.get('strategy_tree',{}).get('branches'): - for b in d['strategy_tree']['branches']: - if b['id'] == branch_id: - b.setdefault('trigger_count', 0) - b['trigger_count'] += 1 - b['last_trigger_price'] = round(price, 2) - b['last_triggered'] = datetime.now().isoformat() - break - json.dump(raw, open(DECISIONS_PATH, 'w'), ensure_ascii=False, indent=2) - except Exception: - pass - - -# ── 区间偏离检测 ────────────────────────────────────────────────────────── - -def load_state(): - try: - with open(STATE_PATH) as f: - return json.load(f) - except: - return {} - -def save_state(state): - os.makedirs(os.path.dirname(STATE_PATH), exist_ok=True) - with open(STATE_PATH, 'w') as f: - json.dump(state, f, ensure_ascii=False, indent=2) - -def load_breaches(): - try: - with open(BREACH_PATH) as f: - return json.load(f) - except: - return {} - -def save_breaches(data): - os.makedirs(os.path.dirname(BREACH_PATH), exist_ok=True) - with open(BREACH_PATH, 'w') as f: - json.dump(data, f, ensure_ascii=False, indent=2) - - -def load_events(): - try: - with open(EVENTS_PATH) as f: - return json.load(f) - except: - return {"events": []} - - -def save_events(events): - os.makedirs(os.path.dirname(EVENTS_PATH), exist_ok=True) - with open(EVENTS_PATH, 'w') as f: - json.dump(events, f, ensure_ascii=False, indent=2) - - -def record_event(code, name, event_type, price, trigger_value, event_label=""): - """记录一次价格触发事件到 price_events.json + SQLite""" - events = load_events() - now = datetime.now().isoformat() - events["events"].append({ - "code": code, - "name": name, - "event_type": event_type, # entry_zone, stop_loss, take_profit, exit_zone - "price": round(price, 2), - "trigger_value": trigger_value, - "event_label": event_label, - "timestamp": now, - "date": datetime.now().strftime("%Y-%m-%d"), - }) - # 保留最近10000条 - events["events"] = events["events"][-10000:] - save_events(events) - - # ── SQLite 双写 ── - try: - from mofin_db import get_conn, init_all_tables, write_price_event - conn = get_conn() - init_all_tables(conn) - write_price_event(conn, code, name, event_type, price, trigger_value, event_label) - conn.close() - except Exception: - pass # SQLite 写入失败不影响主流程 - - -def get_trigger_zones(d): - """返回该decision所有可监控的区间列表,从顶层字段读取""" - zones = [] - is_holding = d.get('shares', 0) > 0 - # 买入区间(自选和持仓都监控) - el = d.get("entry_low", 0) - eh = d.get("entry_high", 0) - if el and eh and float(el) > 0 and float(eh) > 0: - try: - zones.append(("entry_zone", "买入区间", float(el), float(eh))) - except: - pass - # 止损+止盈(只有持仓才监控,自选无意义) - if is_holding: - sl = d.get("stop_loss", 0) - if sl and float(sl) > 0: - try: - zones.append(("stop_loss", "止损", 0, float(sl))) - except: - pass - tp = d.get("take_profit", 0) - if tp and float(tp) > 0: - try: - zones.append(("take_profit_zone", "止盈区间", 0, float(tp))) - except: - pass - return zones - - -def run_once(round_label=""): - """执行一轮完整的监控流程""" - global _SCENARIO_CACHE, _BRANCH_CACHE - label = f" [{round_label}]" if round_label else "" - start = time.time() - - # 刷新情景与分支缓存(每轮更新) - _SCENARIO_CACHE = detect_scenario() if HAS_TREE else {} - _BRANCH_CACHE = {} - try: - raw = json.load(open(DECISIONS_PATH)) - for d in raw.get('decisions', []): - tree = d.get('strategy_tree', {}) - if tree and tree.get('branches'): - _BRANCH_CACHE[d['code']] = tree['branches'] - except Exception: - pass - - # === 第一步:一次性刷新所有价格 === - refreshed = refresh_data_prices() - - # === 第二步:检查触发条件 === - try: - with open(DECISIONS_PATH) as f: - dec = json.load(f) - except: - print(f"❌{label} 无法读取decisions.json", file=sys.stderr) - return - - active = [d for d in dec.get("decisions", []) if d.get("status") in ("active", "updated")] - state = load_state() - outputs = [] - state_updated = False - - # 收集所有需要检查的代码 - check_codes = set() - for d in active: - if get_trigger_zones(d): - check_codes.add(d["code"]) - - # 批量拉取这些股票的价格 - prices = fetch_all_prices(list(check_codes)) - - for d in active: - code = d["code"] - - zones = get_trigger_zones(d) - if not zones: - continue - - price_info = prices.get(code) - if not price_info: - continue - price, _, _ = price_info - if price == 0: - continue - - name = d.get("name", code) - if code not in state: - state[code] = {} - - for key, label, lo, hi in zones: - in_zone = lo <= price <= hi - prev_in_zone = state[code].get(key, None) - - if in_zone and prev_in_zone != True: - if key == "stop_loss": - branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0)) - outputs.append(f"⚠️ {name}({code}) {price} → 跌破止损{hi}!{branch_sfx}") - record_event(code, name, "stop_loss", price, str(hi)) - else: - extra = "" - if "_price" in key: - batch_shares = d.get(key.replace("_price", "_shares"), "") - action = d.get(key.replace("_price", "_action"), "") - if batch_shares: - extra = f" {action}{batch_shares}股" if action else f" {batch_shares}股" - elif key in ("take_profit_zone",): - act = d.get("take_profit_action", "") - if act: - extra = f"({act})" - branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0)) - outputs.append(f"⚡ {name}({code}) {price} → 进入{label}{lo}~{hi}{extra}{branch_sfx}") - record_event(code, name, "entry_zone", price, f"{lo}~{hi}", label) - state[code][key] = True - state_updated = True - - elif not in_zone and prev_in_zone == True: - if key != "stop_loss": - outputs.append(f"📌 {name}({code}) {price} → 离开{label}{lo}~{hi}") - state[code][key] = False - state_updated = True - - # === 第三步:买入区偏离检测 + 自动重评 === - reassesed_codes = [] - for d in active: - code = d["code"] - name = d.get("name", code) - price_info = prices.get(code) - if not price_info: - continue - price, _, _ = price_info - if price == 0: - continue - - # 从 decisions.json 中读取 analysis 的买入区 - entry_low = d.get("entry_low", 0) - entry_high = d.get("entry_high", 0) - if not entry_low or not entry_high: - continue - - in_buy_zone = entry_low <= price <= entry_high - prev_in_buy_zone = state.get(code, {}).get("__buy_zone", None) - - # 状态变化时才触发 - if in_buy_zone and prev_in_buy_zone == False: - # 重新进入买入区 → 重评确认区间是否仍然有效 - outputs.append(f"🔄 {name}({code}) {price} → 重新进入买入区{entry_low}~{entry_high},触发技术面重评") - do_reassess = True - elif not in_buy_zone and prev_in_buy_zone == True: - # 离开买入区 → 立即重评,更新止损/止盈/区间 - outputs.append(f"🔄 {name}({code}) {price} → 离开买入区{entry_low}~{entry_high},立即技术面重评") - do_reassess = True - else: - do_reassess = False - - if do_reassess and HAS_REASSESS: - try: - cost = d.get("cost", 0) or 0 - shares = d.get("shares", 0) or 0 - profit_pct = (price - cost) / cost * 100 if cost else 0 - is_deep_loss = profit_pct < -20 - sentiment = "neutral" - if d.get("tech_snapshot"): - if "bearish" in d["tech_snapshot"]: - sentiment = "bearish" - elif "bullish" in d["tech_snapshot"]: - sentiment = "bullish" - - # 调用技术面驱动重评(非机械百分比) - result = reassess_strategy( - code, name, price, cost, shares, - current_action=d.get("action", ""), - volume_signal="中性", sentiment=sentiment, - ) - outputs.append(f" 📊 新策略: 损{result['stop_loss']} 盈{result['take_profit']} 区{result['entry_low']}~{result['entry_high']} RR={result['rr_ratio']}") - reassesed_codes.append(code) - except Exception as e: - outputs.append(f" ⚠️ 重评失败: {e}") - - # 更新买入区状态 - if "__buy_zone" not in state.get(code, {}): - if code not in state: - state[code] = {} - state[code]["__buy_zone"] = in_buy_zone - state_updated = True - - # 如果有重评过的股票,更新 decisions.json - if reassesed_codes and HAS_REASSESS: - try: - # 重新 regenerate_all 只针对受影响的股票效率太低 - # 直接全量重评(regenerate_all 内部会批量拉价格、做技术分析) - from strategy_lifecycle import regenerate_all - r = regenerate_all(stdout=False) - outputs.append(f" ✅ 策略已全量重评: {r.get('ok',0)}/{r.get('total',0)}成功") - outputs.append(f" 📌 触发股票: {', '.join(reassesed_codes)}") - except Exception as e: - outputs.append(f" ⚠️ 全量重评失败: {e}") - - # === 3.5 资金流异常检测(2026-06-27 新增)=== - try: - cf = json.load(open("/home/hmo/web-dashboard/data/capital_flow_cache.json")) - # 检查所有 active decision 中的资金流异常 - for d in active: - code = d["code"] - stock_cf = cf.get("stocks", {}).get(code, {}) - analysis = stock_cf.get("analysis", {}) - alerts = analysis.get("alerts", []) - if alerts: - name = d.get("name", code) - for a in alerts: - outputs.append(f" 💰 {name}({code}) {a}") - except Exception: - pass - - # === 第四步:情景变化检测 + 输出 → 直接推XMPP === - now_str = datetime.now().strftime("%H:%M:%S") - elapsed = time.time() - start - - # 情景变化检测(跨轮对比) - if HAS_TREE and _SCENARIO_CACHE.get('id'): - prev_scenario = state.get('_system', {}).get('last_scenario', '') - curr_scenario = _SCENARIO_CACHE['id'] - if prev_scenario and curr_scenario != prev_scenario: - combo = _SCENARIO_CACHE.get('combo_action', '') - outputs.insert(0, f"🌀 情景切换: {prev_scenario}→{curr_scenario} | {combo}") - if outputs: - state.setdefault('_system', {})['last_scenario'] = curr_scenario - state_updated = True - elif not prev_scenario: - state.setdefault('_system', {})['last_scenario'] = curr_scenario - state_updated = True - - if outputs: - # 简短一行一个触发 - for o in outputs: - print(o) - # 推送XMPP(只推关键事件:止损跌破+情景切换+资金流异动,不推买入区进出/重评等操作细节) - critical = [o for o in outputs if o.startswith(("⚠️", "🌀", "💰"))] - if critical: - try: - body = "\n".join([f"{now_str}"] + critical) - payload = json.dumps({ - "to": "hmo@yoin.fun", "body": body, "type": "chat", - }).encode("utf-8") - req = urllib.request.Request( - "http://127.0.0.1:5805/", data=payload, - headers={"Content-Type": "application/json"}, - ) - urllib.request.urlopen(req, timeout=5) - except Exception: - pass - # else: SILENT — 无触发,无输出,不推 - - if state_updated: - save_state(state) - - -def main(): - """每cron触发跑一轮""" - run_once() - - -if __name__ == "__main__": - main() +#!/usr/bin/env python3 +"""price_monitor.py — 高频价格监控脚本(批量版) +规则:进入区间报一次,离开区间报一次,中间不重复。 +每次运行时一次性刷新所有持仓+自选股的实时价。 +""" +import json +import urllib.request +import os +import sys +import time +from datetime import datetime + +# ── MoFin unified model ────────────────────────────────────────────── +from mo_models import is_hk_stock, get_hk_rate, calc_total_assets, calc_total_mv, calc_position_pct +from mofin_db import get_conn, write_holdings_batch, write_portfolio_summary, write_price_event, write_watchlist_stock + +DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" +PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" +WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json" +BREACH_PATH = "/home/hmo/.hermes/zone_breach.json" +STATE_PATH = os.path.expanduser("~/.hermes/price_trigger_state.json") +EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json" + +# 策略重评依赖(技术面驱动,非机械百分比) +sys.path.insert(0, "/home/hmo/web-dashboard") +try: + from strategy_lifecycle import reassess_strategy + HAS_REASSESS = True +except ImportError: + HAS_REASSESS = False + +try: + HK_RATE = get_hk_rate() +except Exception: + HK_RATE = 0.87 # ultimate fallback + +# 分支系统与情景检测 +try: + sys.path.insert(0, '/home/hmo/MoFin') + from strategy_tree import detect_scenario, evaluate_branches + HAS_TREE = True +except Exception: + HAS_TREE = False + def detect_scenario(): return {} + def evaluate_branches(*a, **kw): return [] + +# 情景缓存(每次run_once刷新) +_SCENARIO_CACHE = {} +_BRANCH_CACHE = {} # code -> branches list + +UA = "Mozilla/5.0" + +# ── 批量拉取价格 ────────────────────────────────────────────────────────── + +def fetch_all_prices(codes): + """腾讯批量行情API:仅用于A股(沪市/深市) + A股:sh600110 / sz000001 + 港股已迁移至 fetch_hk_eastmoney()(东方财富实时行情) + 返回 {code: (price, change, change_pct)} + """ + if not codes: + return {} + + # 只处理A股(6位代码),港股走东方财富 + a_codes = [c for c in codes if len(str(c).strip()) == 6] + if not a_codes: + return {} + + symbols = [] + code_map = {} + for code in a_codes: + code_s = str(code).strip() + if code_s.startswith(('5', '6', '9')): + sym = f"sh{code_s}" + else: + sym = f"sz{code_s}" + symbols.append(sym) + code_map[sym] = code_s + + url = f"http://qt.gtimg.cn/q={','.join(symbols)}" + try: + req = urllib.request.Request(url, headers={"User-Agent": UA}) + with urllib.request.urlopen(req, timeout=10) as r: + text = r.read().decode("gbk") + except Exception as e: + print(f"⚠️ 腾讯A股拉取失败: {e}", file=sys.stderr) + return {} + + results = {} + for line in text.strip().split("\n"): + line = line.strip() + if not line or "=" not in line: + continue + try: + raw_value = line.split("=", 1)[1].strip().strip('"').strip(";") + fields = raw_value.split("~") + if len(fields) < 6: + continue + sym = line.split("=", 1)[0].strip().lstrip("v_") + orig_code = code_map.get(sym) + if not orig_code: + continue + price = float(fields[3]) if fields[3] else 0 + prev_close = float(fields[4]) if fields[4] else 0 + change = price - prev_close if prev_close > 0 else 0 + change_pct = fields[32] if len(fields) > 32 and fields[32] else "0" + results[orig_code] = (price, change, change_pct) + except (ValueError, IndexError): + continue + + return results + + +# ── 港股实时行情(东方财富,无15分钟延迟)────────────────────────────── + +def fetch_hk_eastmoney(codes): + """东方财富港股实时行情 API — 免费、实时、无15分钟延迟。 + + API: push2.eastmoney.com + 市场代码: 116 (港交所) + 格式: 116.00700 + + 返回 {code: (price, change, change_pct)} + + Fallback: 失败时回退到腾讯 qt.gtimg.cn(15分钟延迟) + """ + if not codes: + return {} + + hk_codes = [str(c).strip() for c in codes if len(str(c).strip()) <= 5] + if not hk_codes: + return {} + + results = {} + + # 主通道:东方财富实时行情(逐股查询,港股最多~10只,可接受) + for code in hk_codes: + try: + url = (f"https://push2.eastmoney.com/api/qt/stock/get" + f"?secid=116.{code}" + f"&fields=f43,f170,f60,f57,f58" + f"&fltt=2") + req = urllib.request.Request(url, headers={"User-Agent": UA}) + with urllib.request.urlopen(req, timeout=5) as r: + resp = json.loads(r.read().decode("utf-8")) + + if resp.get("rc") != 0: + continue + item = resp.get("data", {}) + if not item: + continue + price = float(item.get("f43", 0)) if item.get("f43") else 0 + prev_close = float(item.get("f60", 0)) if item.get("f60") else 0 + change = round(price - prev_close, 2) if prev_close > 0 else 0 + change_pct = str(item.get("f170", "0")) + if price > 0: + results[code] = (price, change, change_pct) + # 东方财富有频率限制,每请求间隔 0.2s + time.sleep(0.2) + except Exception as e: + print(f"⚠️ 东方财富 {code} 拉取失败: {e}", file=sys.stderr) + continue + + # Fallback: 腾讯 qt.gtimg.cn(15分钟延迟) + missing = [c for c in hk_codes if c not in results] + if missing: + try: + fallback = _fetch_hk_tencent_fallback(missing) + results.update(fallback) + except Exception: + pass + + return results + + +def _fetch_hk_tencent_fallback(codes): + """腾讯港股行情(15分钟延迟,仅作 fallback)""" + symbols = [f"hk{c}" for c in codes] + url = f"http://qt.gtimg.cn/q={','.join(symbols)}" + req = urllib.request.Request(url, headers={"User-Agent": UA}) + with urllib.request.urlopen(req, timeout=10) as r: + text = r.read().decode("gbk") + + code_map = {f"hk{c}": c for c in codes} + results = {} + for line in text.strip().split("\n"): + if "=" not in line: + continue + try: + raw = line.split("=", 1)[1].strip().strip('"').strip(";") + fields = raw.split("~") + if len(fields) < 6: + continue + sym = line.split("=", 1)[0].strip().lstrip("v_") + orig = code_map.get(sym) + if not orig: + continue + price = float(fields[3]) if fields[3] else 0 + prev_close = float(fields[4]) if fields[4] else 0 + change = price - prev_close if prev_close > 0 else 0 + change_pct = fields[32] if len(fields) > 32 and fields[32] else "0" + results[orig] = (price, change, change_pct) + except (ValueError, IndexError): + continue + return results + + +def refresh_data_prices(): + """一次性刷新portfolio.json和watchlist.json的所有实时价""" + all_codes = set() + + # 收集所有需要拉取的代码 + try: + pf = json.load(open(PORTFOLIO_PATH)) + for s in pf.get('holdings', []): + all_codes.add(s['code']) + except: + pf = {"holdings": []} + + try: + wl = json.load(open(WATCHLIST_PATH)) + for s in wl.get('stocks', []): + all_codes.add(s['code']) + except: + wl = {"stocks": []} + + if not all_codes: + return 0 + + # 分批拉取:A股走腾讯(实时) + 港股走东方财富(实时) + all_list = list(all_codes) + prices = fetch_all_prices(all_list) # A股(腾讯,实时) + hk_prices = fetch_hk_eastmoney(all_list) # 港股(东方财富,实时) + prices.update(hk_prices) + updated = 0 + + # 保存全量实时价快照(供报告管道消费,确保分析用最新数据) + try: + live = {"updated_at": datetime.now().isoformat(), "prices": {}} + for code in all_codes: + if code in prices: + p, c, chg = prices[code] + live["prices"][code] = {"price": p, "change_pct": chg} + json.dump(live, open("/home/hmo/web-dashboard/data/live_prices.json", "w"), indent=2) + except Exception: + pass + + # 更新portfolio(只在价格变化时写入,避免触发文件变更通知) + changed = False + for s in pf.get('holdings', []): + if s['code'] in prices: + price, _, change_pct = prices[s['code']] + if price > 0: + # 港股:API返回HKD,需转RMB + if is_hk_stock(s['code']): + price = round(price * HK_RATE, 2) + old = s.get('price', 0) + if abs(old - price) > 0.001: + s['price'] = round(price, 2) + s['change_pct'] = float(change_pct) if change_pct else 0 + updated += 1 + changed = True + if changed: + pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M') + pf['total_mv'] = calc_total_mv(pf.get('holdings', [])) + pf['total_assets'] = calc_total_assets(pf) + pf['position_pct'] = calc_position_pct(pf) + # DB 写入(替代 json.dump,强制币种约束) + try: + conn = get_conn() + write_holdings_batch(conn, pf['holdings']) + write_portfolio_summary(conn, pf) + conn.close() + except Exception as e: + print(f" [DB写入失败] {e}", flush=True) + # 保留 JSON 副本作为冷备 + json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2) + elif pf.get('updated_at'): + try: + last_ts = datetime.strptime(pf['updated_at'], '%Y-%m-%d %H:%M') + if (datetime.now() - last_ts).total_seconds() > 600: + pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M') + json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2) + except: + pass + + # 更新watchlist(只在价格变化时写入) + changed = False + for s in wl.get('stocks', []): + if s['code'] in prices: + price, _, change_pct = prices[s['code']] + if price > 0: + # 港股:API返回HKD,需转RMB + if is_hk_stock(s['code']): + price = round(price * HK_RATE, 2) + old = s.get('price', 0) + if abs(old - price) > 0.001: + s['price'] = round(price, 2) + s['change_pct'] = float(change_pct) if change_pct else 0 + updated += 1 + changed = True + if changed: + wl['updated_at'] = datetime.now().isoformat() + # DB 写入(替代 json.dump) + try: + conn = get_conn() + for s in wl.get('stocks', []): + s['currency'] = 'CNY' # 自选股价格统一CNY + write_watchlist_stock(conn, s) + conn.close() + except Exception as e: + print(f" [DB watchlist写入失败] {e}", flush=True) + # 保留 JSON 冷备 + json.dump(wl, open(WATCHLIST_PATH, 'w'), ensure_ascii=False, indent=2) + + # --- 汇总值重算(使用 mo_models 唯一公式)--- + try: + live_market_value = calc_total_mv(pf.get('holdings', [])) + old_mv = pf.get('total_mv', 0) + + if abs(old_mv - live_market_value) > 0.01: + pf['total_mv'] = round(live_market_value, 2) + + pf['total_assets'] = calc_total_assets(pf) + if pf['total_assets'] > 0: + pf['position_pct'] = calc_position_pct(pf) + pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M') + # DB 写入 + try: + conn = get_conn() + write_portfolio_summary(conn, pf) + conn.close() + except Exception as e: + print(f" [DB汇总写入失败] {e}", flush=True) + # JSON 冷备 + json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2) + except Exception as e: + print(f" [汇总重算失败] {e}", flush=True) + # --- 结束汇总重算 --- + + return updated + + +# ── 分支系统辅助函数 ────────────────────────────────────────────────────── + +def _branch_alert_suffix(code, price, shares=0, cost=0): + """返回分支信息后缀:「 | 情景→动作」""" + if not HAS_TREE or not _SCENARIO_CACHE.get('id'): + return "" + try: + sc_id = _SCENARIO_CACHE['id'] + results = evaluate_branches(code, sc_id, price, shares, cost) + for r in results: + if r.get('applicable'): + _record_branch_trigger(code, r.get('branch_id',''), price) + branch_action = r.get('action_type', r.get('action', 'hold')) + return f" | {sc_id}→{branch_action}" + except Exception: + pass + return "" + + +def _record_branch_trigger(code, branch_id, price): + """记录分支触发事件(自成长:trigger_count+1)""" + try: + raw = json.load(open(DECISIONS_PATH)) + for d in raw.get('decisions', []): + if d.get('code') == code and d.get('strategy_tree',{}).get('branches'): + for b in d['strategy_tree']['branches']: + if b['id'] == branch_id: + b.setdefault('trigger_count', 0) + b['trigger_count'] += 1 + b['last_trigger_price'] = round(price, 2) + b['last_triggered'] = datetime.now().isoformat() + break + json.dump(raw, open(DECISIONS_PATH, 'w'), ensure_ascii=False, indent=2) + except Exception: + pass + + +# ── 区间偏离检测 ────────────────────────────────────────────────────────── + +def load_state(): + try: + with open(STATE_PATH) as f: + return json.load(f) + except: + return {} + +def save_state(state): + os.makedirs(os.path.dirname(STATE_PATH), exist_ok=True) + with open(STATE_PATH, 'w') as f: + json.dump(state, f, ensure_ascii=False, indent=2) + +def load_breaches(): + try: + with open(BREACH_PATH) as f: + return json.load(f) + except: + return {} + +def save_breaches(data): + os.makedirs(os.path.dirname(BREACH_PATH), exist_ok=True) + with open(BREACH_PATH, 'w') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + +def load_events(): + try: + with open(EVENTS_PATH) as f: + return json.load(f) + except: + return {"events": []} + + +def save_events(events): + os.makedirs(os.path.dirname(EVENTS_PATH), exist_ok=True) + with open(EVENTS_PATH, 'w') as f: + json.dump(events, f, ensure_ascii=False, indent=2) + + +def record_event(code, name, event_type, price, trigger_value, event_label=""): + """记录一次价格触发事件到 price_events.json + SQLite""" + events = load_events() + now = datetime.now().isoformat() + events["events"].append({ + "code": code, + "name": name, + "event_type": event_type, # entry_zone, stop_loss, take_profit, exit_zone + "price": round(price, 2), + "trigger_value": trigger_value, + "event_label": event_label, + "timestamp": now, + "date": datetime.now().strftime("%Y-%m-%d"), + }) + # 保留最近10000条 + events["events"] = events["events"][-10000:] + save_events(events) + + # ── SQLite 双写 ── + try: + from mofin_db import get_conn, init_all_tables, write_price_event + conn = get_conn() + init_all_tables(conn) + write_price_event(conn, code, name, event_type, price, trigger_value, event_label) + conn.close() + except Exception: + pass # SQLite 写入失败不影响主流程 + + +def get_trigger_zones(d): + """返回该decision所有可监控的区间列表,从顶层字段读取""" + zones = [] + is_holding = d.get('shares', 0) > 0 + # 买入区间(自选和持仓都监控) + el = d.get("entry_low", 0) + eh = d.get("entry_high", 0) + if el and eh and float(el) > 0 and float(eh) > 0: + try: + zones.append(("entry_zone", "买入区间", float(el), float(eh))) + except: + pass + # 止损+止盈(只有持仓才监控,自选无意义) + if is_holding: + sl = d.get("stop_loss", 0) + if sl and float(sl) > 0: + try: + zones.append(("stop_loss", "止损", 0, float(sl))) + except: + pass + tp = d.get("take_profit", 0) + if tp and float(tp) > 0: + try: + zones.append(("take_profit_zone", "止盈区间", 0, float(tp))) + except: + pass + return zones + + +def run_once(round_label=""): + """执行一轮完整的监控流程""" + global _SCENARIO_CACHE, _BRANCH_CACHE + label = f" [{round_label}]" if round_label else "" + start = time.time() + + # 刷新情景与分支缓存(每轮更新) + _SCENARIO_CACHE = detect_scenario() if HAS_TREE else {} + _BRANCH_CACHE = {} + try: + raw = json.load(open(DECISIONS_PATH)) + for d in raw.get('decisions', []): + tree = d.get('strategy_tree', {}) + if tree and tree.get('branches'): + _BRANCH_CACHE[d['code']] = tree['branches'] + except Exception: + pass + + # === 第一步:一次性刷新所有价格 === + refreshed = refresh_data_prices() + + # === 第二步:检查触发条件 === + try: + with open(DECISIONS_PATH) as f: + dec = json.load(f) + except: + print(f"❌{label} 无法读取decisions.json", file=sys.stderr) + return + + active = [d for d in dec.get("decisions", []) if d.get("status") in ("active", "updated")] + state = load_state() + outputs = [] + state_updated = False + + # 收集所有需要检查的代码 + check_codes = set() + for d in active: + if get_trigger_zones(d): + check_codes.add(d["code"]) + + # 批量拉取这些股票的价格 + prices = fetch_all_prices(list(check_codes)) + + for d in active: + code = d["code"] + + zones = get_trigger_zones(d) + if not zones: + continue + + price_info = prices.get(code) + if not price_info: + continue + price, _, _ = price_info + if price == 0: + continue + + name = d.get("name", code) + if code not in state: + state[code] = {} + + for key, label, lo, hi in zones: + in_zone = lo <= price <= hi + prev_in_zone = state[code].get(key, None) + + if in_zone and prev_in_zone != True: + if key == "stop_loss": + branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0)) + outputs.append(f"⚠️ {name}({code}) {price} → 跌破止损{hi}!{branch_sfx}") + record_event(code, name, "stop_loss", price, str(hi)) + else: + extra = "" + if "_price" in key: + batch_shares = d.get(key.replace("_price", "_shares"), "") + action = d.get(key.replace("_price", "_action"), "") + if batch_shares: + extra = f" {action}{batch_shares}股" if action else f" {batch_shares}股" + elif key in ("take_profit_zone",): + act = d.get("take_profit_action", "") + if act: + extra = f"({act})" + branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0)) + outputs.append(f"⚡ {name}({code}) {price} → 进入{label}{lo}~{hi}{extra}{branch_sfx}") + record_event(code, name, "entry_zone", price, f"{lo}~{hi}", label) + state[code][key] = True + state_updated = True + + elif not in_zone and prev_in_zone == True: + if key != "stop_loss": + outputs.append(f"📌 {name}({code}) {price} → 离开{label}{lo}~{hi}") + state[code][key] = False + state_updated = True + + # === 第三步:买入区偏离检测 + 自动重评 === + reassesed_codes = [] + for d in active: + code = d["code"] + name = d.get("name", code) + price_info = prices.get(code) + if not price_info: + continue + price, _, _ = price_info + if price == 0: + continue + + # 从 decisions.json 中读取 analysis 的买入区 + entry_low = d.get("entry_low", 0) + entry_high = d.get("entry_high", 0) + if not entry_low or not entry_high: + continue + + in_buy_zone = entry_low <= price <= entry_high + prev_in_buy_zone = state.get(code, {}).get("__buy_zone", None) + + # 状态变化时才触发 + if in_buy_zone and prev_in_buy_zone == False: + # 重新进入买入区 → 重评确认区间是否仍然有效 + outputs.append(f"🔄 {name}({code}) {price} → 重新进入买入区{entry_low}~{entry_high},触发技术面重评") + do_reassess = True + elif not in_buy_zone and prev_in_buy_zone == True: + # 离开买入区 → 立即重评,更新止损/止盈/区间 + outputs.append(f"🔄 {name}({code}) {price} → 离开买入区{entry_low}~{entry_high},立即技术面重评") + do_reassess = True + else: + do_reassess = False + + if do_reassess and HAS_REASSESS: + try: + cost = d.get("cost", 0) or 0 + shares = d.get("shares", 0) or 0 + profit_pct = (price - cost) / cost * 100 if cost else 0 + is_deep_loss = profit_pct < -20 + sentiment = "neutral" + if d.get("tech_snapshot"): + if "bearish" in d["tech_snapshot"]: + sentiment = "bearish" + elif "bullish" in d["tech_snapshot"]: + sentiment = "bullish" + + # 调用技术面驱动重评(非机械百分比) + result = reassess_strategy( + code, name, price, cost, shares, + current_action=d.get("action", ""), + volume_signal="中性", sentiment=sentiment, + ) + outputs.append(f" 📊 新策略: 损{result['stop_loss']} 盈{result['take_profit']} 区{result['entry_low']}~{result['entry_high']} RR={result['rr_ratio']}") + reassesed_codes.append(code) + except Exception as e: + outputs.append(f" ⚠️ 重评失败: {e}") + + # 更新买入区状态 + if "__buy_zone" not in state.get(code, {}): + if code not in state: + state[code] = {} + state[code]["__buy_zone"] = in_buy_zone + state_updated = True + + # 如果有重评过的股票,更新 decisions.json + if reassesed_codes and HAS_REASSESS: + try: + # 重新 regenerate_all 只针对受影响的股票效率太低 + # 直接全量重评(regenerate_all 内部会批量拉价格、做技术分析) + from strategy_lifecycle import regenerate_all + r = regenerate_all(stdout=False) + outputs.append(f" ✅ 策略已全量重评: {r.get('ok',0)}/{r.get('total',0)}成功") + outputs.append(f" 📌 触发股票: {', '.join(reassesed_codes)}") + except Exception as e: + outputs.append(f" ⚠️ 全量重评失败: {e}") + + # === 3.5 资金流异常检测(2026-06-27 新增)=== + try: + cf = json.load(open("/home/hmo/web-dashboard/data/capital_flow_cache.json")) + # 检查所有 active decision 中的资金流异常 + for d in active: + code = d["code"] + stock_cf = cf.get("stocks", {}).get(code, {}) + analysis = stock_cf.get("analysis", {}) + alerts = analysis.get("alerts", []) + if alerts: + name = d.get("name", code) + for a in alerts: + outputs.append(f" 💰 {name}({code}) {a}") + except Exception: + pass + + # === 第四步:情景变化检测 + 输出 → 直接推XMPP === + now_str = datetime.now().strftime("%H:%M:%S") + elapsed = time.time() - start + + # 情景变化检测(跨轮对比) + if HAS_TREE and _SCENARIO_CACHE.get('id'): + prev_scenario = state.get('_system', {}).get('last_scenario', '') + curr_scenario = _SCENARIO_CACHE['id'] + if prev_scenario and curr_scenario != prev_scenario: + combo = _SCENARIO_CACHE.get('combo_action', '') + outputs.insert(0, f"🌀 情景切换: {prev_scenario}→{curr_scenario} | {combo}") + if outputs: + state.setdefault('_system', {})['last_scenario'] = curr_scenario + state_updated = True + elif not prev_scenario: + state.setdefault('_system', {})['last_scenario'] = curr_scenario + state_updated = True + + if outputs: + # 简短一行一个触发 + for o in outputs: + print(o) + # 推送XMPP(只推关键事件:止损跌破+情景切换+资金流异动,不推买入区进出/重评等操作细节) + critical = [o for o in outputs if o.startswith(("⚠️", "🌀", "💰"))] + if critical: + try: + body = "\n".join([f"{now_str}"] + critical) + payload = json.dumps({ + "to": "hmo@yoin.fun", "body": body, "type": "chat", + }).encode("utf-8") + req = urllib.request.Request( + "http://127.0.0.1:5805/", data=payload, + headers={"Content-Type": "application/json"}, + ) + urllib.request.urlopen(req, timeout=5) + except Exception: + pass + # else: SILENT — 无触发,无输出,不推 + + if state_updated: + save_state(state) + + +def main(): + """每cron触发跑一轮""" + run_once() + + +if __name__ == "__main__": + main()