diff --git a/get_realtime_prices.py b/get_realtime_prices.py deleted file mode 100644 index 02812b3..0000000 --- a/get_realtime_prices.py +++ /dev/null @@ -1,94 +0,0 @@ -#!/usr/bin/env python3 -import json -import subprocess -import sys -import os -from datetime import datetime - -# 读取持仓数据 -with open('data/portfolio.json', 'r') as f: - portfolio = json.load(f) - -# 读取决策数据 -with open('data/decisions.json', 'r') as f: - decisions_data = json.load(f) - -# 获取所有持仓股票 -holdings = portfolio['holdings'] -active_holdings = [h for h in holdings if h.get('shares', 0) > 0] - -print(f"持仓股票数量: {len(active_holdings)}") - -# 获取实时价格 - 使用curl调用API -# 注意:这里需要根据实际情况调整API调用 -realtime_prices = {} - -for holding in active_holdings: - code = holding['code'] - name = holding['name'] - - # 判断市场类型 - # 港股代码5位数字(如01211),优先判断 - if len(code) == 5: # 港股 - market = 'hk' - price = holding['price'] - change_pct = holding.get('change_pct', 0) - realtime_prices[code] = { - 'name': name, - 'price': price, - 'change_pct': change_pct, - 'market': 'HK' - } - elif code.startswith('6'): # 沪市 - market = 'sh' - price = holding['price'] - change_pct = holding.get('change_pct', 0) - realtime_prices[code] = { - 'name': name, - 'price': price, - 'change_pct': change_pct, - 'market': 'A' - } - elif code.startswith('0') or code.startswith('3'): # 深市 - market = 'sz' - price = holding['price'] - change_pct = holding.get('change_pct', 0) - realtime_prices[code] = { - 'name': name, - 'price': price, - 'change_pct': change_pct, - 'market': 'A' - } - else: - # 其他类型 - price = holding['price'] - change_pct = holding.get('change_pct', 0) - realtime_prices[code] = { - 'name': name, - 'price': price, - 'change_pct': change_pct, - 'market': 'OTHER' - } - -# 获取活跃决策 -active_decisions = [d for d in decisions_data['decisions'] if d.get('status') == 'active'] - -print(f"活跃决策数量: {len(active_decisions)}") - -# 输出结果 -print("\n=== 实时价格数据 ===") -for code, data in realtime_prices.items(): - print(f"{code} {data['name']}: {data['price']} ({data['change_pct']:+.2f}%)") - -# 保存到临时文件供后续使用 -output_data = { - 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - 'prices': realtime_prices, - 'holdings': active_holdings, - 'active_decisions': active_decisions -} - -with open('data/temp_realtime.json', 'w') as f: - json.dump(output_data, f, indent=2, ensure_ascii=False) - -print(f"\n数据已保存到 data/temp_realtime.json") \ No newline at end of file diff --git a/market_watch.py b/market_watch.py index c73053e..0ad39c1 100644 --- a/market_watch.py +++ b/market_watch.py @@ -13,102 +13,12 @@ """ import json -import sqlite3 from datetime import datetime from pathlib import Path +from mofin_db import get_conn, init_all_tables, write_market_snapshot + DATA_DIR = Path(__file__).parent / "data" -DB_PATH = DATA_DIR / "mofin.db" - -# ── 数据库初始化 ────────────────────────────────────── - -def init_db(): - """创建 mofin.db 及所有表(幂等,已存在则跳过)""" - DATA_DIR.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect(str(DB_PATH)) - conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA foreign_keys=ON") - conn.executescript(""" - CREATE TABLE IF NOT EXISTS market_snapshots ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL, - source TEXT NOT NULL DEFAULT 'ths', - up_ratio REAL, - mood TEXT, - created_at TEXT DEFAULT (datetime('now','localtime')) - ); - CREATE INDEX IF NOT EXISTS idx_snapshots_time ON market_snapshots(timestamp); - - CREATE TABLE IF NOT EXISTS sector_snapshots ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - snapshot_id INTEGER NOT NULL REFERENCES market_snapshots(id), - name TEXT NOT NULL, - change_pct REAL, - up_count INTEGER, - down_count INTEGER, - net_inflow REAL, - lead_stock TEXT, - lead_stock_change REAL, - volume REAL, - turnover REAL - ); - CREATE INDEX IF NOT EXISTS idx_sector_name ON sector_snapshots(name); - CREATE INDEX IF NOT EXISTS idx_sector_snapshot ON sector_snapshots(snapshot_id); - CREATE INDEX IF NOT EXISTS idx_sector_name_time ON sector_snapshots(name, snapshot_id); - """) - conn.commit() - return conn - - -def write_snapshot(conn, market_data: dict): - """将一次采集结果双写 SQLite(JSON 写入由 main 负责)""" - try: - # 1. INSERT market_snapshots - cur = conn.execute( - """INSERT INTO market_snapshots (timestamp, source, up_ratio, mood) - VALUES (?, ?, ?, ?)""", - ( - market_data["timestamp"], - market_data.get("source", "unknown"), - market_data.get("up_ratio", 0), - market_data.get("mood", "unknown"), - ), - ) - snapshot_id = cur.lastrowid - - # 2. INSERT sector_snapshots(逐板块) - sectors = market_data.get("sectors", []) - rows = [] - for s in sectors: - rows.append(( - snapshot_id, - s.get("name", ""), - s.get("change", 0), - s.get("up_count"), - s.get("down_count"), - s.get("net_inflow"), - s.get("lead_stock"), - s.get("lead_stock_change"), - s.get("volume"), - s.get("turnover"), - )) - if rows: - conn.executemany( - """INSERT INTO sector_snapshots - (snapshot_id, name, change_pct, up_count, down_count, - net_inflow, lead_stock, lead_stock_change, volume, turnover) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - rows, - ) - conn.commit() - return snapshot_id, len(rows) - except Exception as e: - print(f"[DB] SQLite 写入失败(JSON 不受影响): {e}", flush=True) - try: - conn.rollback() - except Exception: - pass - return None, 0 # ── 後端A:東方財富 push2 API(首選,有板塊代碼+實時指數) ── @@ -252,10 +162,13 @@ def main(): json.dump(market_data, f, ensure_ascii=False, indent=2) # ── SQLite 双写 ── - conn = init_db() - sid, count = write_snapshot(conn, market_data) - if sid: - print(f"[DB] snapshot_id={sid}, sectors={count}", flush=True) + conn = get_conn() + init_all_tables(conn) + ok, msg, sid = write_market_snapshot(conn, market_data) + if ok: + print(f"[DB] {msg}", flush=True) + else: + print(f"[DB] 写入失败(JSON 不受影响): {msg}", flush=True) conn.close() # 靜默:只寫文件,不輸出到stdout,避免cron推送 diff --git a/migrate_sectors.py b/migrate_sectors.py new file mode 100644 index 0000000..c0271f3 --- /dev/null +++ b/migrate_sectors.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +"""一次性迁移脚本:stock_sector_map.json → stock_sectors 表 + +运行方式: + python3 migrate_sectors.py +""" +from mofin_db import get_conn, init_all_tables, migrate_stock_sectors + +conn = get_conn() +init_all_tables(conn) +stocks, mappings = migrate_stock_sectors(conn) +print(f"迁移完成: {stocks} 只股票, {mappings} 条板块映射") + +# 验证 +rows = conn.execute("SELECT COUNT(*) FROM stock_sectors").fetchone()[0] +print(f"stock_sectors 表总行数: {rows}") + +# 样例 +for r in conn.execute("SELECT * FROM stock_sectors LIMIT 5").fetchall(): + print(f" {r[0]} → {r[1]}") + +conn.close() diff --git a/mofin_db.py b/mofin_db.py new file mode 100644 index 0000000..eb39b8b --- /dev/null +++ b/mofin_db.py @@ -0,0 +1,462 @@ +#!/usr/bin/env python3 +"""mofin_db.py — MoFin 统一数据库访问层 + +所有脚本通过此模块访问 mofin.db,避免重复建表/连接逻辑。 + +用法: + from mofin_db import get_conn, write_market_snapshot, write_klines, ... + +设计原则: + - 幂等建表(CREATE TABLE IF NOT EXISTS) + - WAL 模式 + 外键约束 + - 所有写操作返回 (success: bool, detail: str) + - JSON 写入由调用方负责,本模块只写 SQLite +""" + +import sqlite3 +import json +from datetime import datetime +from pathlib import Path +from typing import Optional + +DATA_DIR = Path(__file__).parent / "data" +DB_PATH = DATA_DIR / "mofin.db" + +# ═══════════════════════════════════════════════════════════ +# 连接管理 +# ═══════════════════════════════════════════════════════════ + +def get_conn() -> sqlite3.Connection: + """获取数据库连接(WAL 模式,外键约束,Row 工厂)""" + DATA_DIR.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + return conn + + +# ═══════════════════════════════════════════════════════════ +# 建表(幂等) +# ═══════════════════════════════════════════════════════════ + +def init_all_tables(conn: sqlite3.Connection): + """创建全部表(幂等,已存在则跳过)""" + conn.executescript(""" + -- 市场快照 + CREATE TABLE IF NOT EXISTS market_snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + source TEXT NOT NULL DEFAULT 'ths', + up_ratio REAL, + mood TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_snapshots_time ON market_snapshots(timestamp); + + -- 板块快照 + CREATE TABLE IF NOT EXISTS sector_snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + snapshot_id INTEGER NOT NULL REFERENCES market_snapshots(id), + name TEXT NOT NULL, + change_pct REAL, + up_count INTEGER, + down_count INTEGER, + net_inflow REAL, + lead_stock TEXT, + lead_stock_change REAL, + volume REAL, + turnover REAL + ); + CREATE INDEX IF NOT EXISTS idx_sector_name ON sector_snapshots(name); + CREATE INDEX IF NOT EXISTS idx_sector_snapshot ON sector_snapshots(snapshot_id); + CREATE INDEX IF NOT EXISTS idx_sector_name_time ON sector_snapshots(name, snapshot_id); + + -- 个股 + CREATE TABLE IF NOT EXISTS stocks ( + code TEXT PRIMARY KEY, + name TEXT NOT NULL, + exchange TEXT DEFAULT 'SH', + type TEXT DEFAULT 'A', + updated_at TEXT + ); + + -- K线(日/周/月) + CREATE TABLE IF NOT EXISTS stock_daily ( + code TEXT NOT NULL REFERENCES stocks(code), + date TEXT NOT NULL, + open REAL, close REAL, high REAL, low REAL, + volume REAL, amount REAL, + PRIMARY KEY (code, date) + ); + CREATE TABLE IF NOT EXISTS stock_weekly ( + code TEXT NOT NULL REFERENCES stocks(code), + date TEXT NOT NULL, + open REAL, close REAL, high REAL, low REAL, + volume REAL, + PRIMARY KEY (code, date) + ); + CREATE TABLE IF NOT EXISTS stock_monthly ( + code TEXT NOT NULL REFERENCES stocks(code), + date TEXT NOT NULL, + open REAL, close REAL, high REAL, low REAL, + volume REAL, + PRIMARY KEY (code, date) + ); + + -- 基本面 + CREATE TABLE IF NOT EXISTS stock_fundamentals ( + code TEXT PRIMARY KEY REFERENCES stocks(code), + pe REAL, pb REAL, eps REAL, + mcap_total REAL, mcap_flow REAL, + updated_at TEXT + ); + + -- 板块成分映射 + CREATE TABLE IF NOT EXISTS stock_sectors ( + code TEXT NOT NULL REFERENCES stocks(code), + sector_name TEXT NOT NULL, + source TEXT DEFAULT 'ths', + updated_at TEXT DEFAULT (datetime('now','localtime')), + PRIMARY KEY (code, sector_name) + ); + CREATE INDEX IF NOT EXISTS idx_stock_sector ON stock_sectors(sector_name); + + -- 持仓 + CREATE TABLE IF NOT EXISTS holdings ( + code TEXT PRIMARY KEY REFERENCES stocks(code), + name TEXT NOT NULL, + shares INTEGER NOT NULL, + cost REAL, + position_pct REAL, + added_at TEXT, + is_active INTEGER DEFAULT 1, + closed_at TEXT, + close_pnl REAL + ); + + -- 持仓策略 + CREATE TABLE IF NOT EXISTS holding_strategies ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL REFERENCES holdings(code), + version INTEGER DEFAULT 1, + stop_loss REAL, + take_profit REAL, + entry_low REAL, + entry_high REAL, + strategy_type TEXT DEFAULT 'holding', + source TEXT, + reason TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')), + superseded_at TEXT + ); + CREATE INDEX IF NOT EXISTS idx_strategy_code ON holding_strategies(code); + + -- 自选股 + CREATE TABLE IF NOT EXISTS watchlist_stocks ( + code TEXT PRIMARY KEY REFERENCES stocks(code), + name TEXT NOT NULL, + added_at TEXT DEFAULT (datetime('now','localtime')), + is_active INTEGER DEFAULT 1 + ); + + -- 候选池 + CREATE TABLE IF NOT EXISTS candidates ( + code TEXT PRIMARY KEY REFERENCES stocks(code), + name TEXT NOT NULL, + sector TEXT, + reason TEXT, + entry_range TEXT, + stop_loss REAL, + target REAL, + zhiwei_star REAL, + zhiwei_reviewed INTEGER DEFAULT 0, + zhiwei_reviewed_at TEXT, + promoted INTEGER DEFAULT 0, + promoted_at TEXT, + dropped INTEGER DEFAULT 0, + drop_reason TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + + -- 候选评分历史 + CREATE TABLE IF NOT EXISTS candidate_score_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL REFERENCES candidates(code), + score REAL NOT NULL, + source TEXT NOT NULL, + reason TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_candidate_history ON candidate_score_history(code, created_at); + + -- 价格事件 + CREATE TABLE IF NOT EXISTS price_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL REFERENCES stocks(code), + name TEXT, + event_type TEXT NOT NULL, + price REAL, + trigger_value TEXT, + event_label TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')), + date TEXT + ); + CREATE INDEX IF NOT EXISTS idx_events_code ON price_events(code); + CREATE INDEX IF NOT EXISTS idx_events_date ON price_events(date); + + -- 策略评估记录 + CREATE TABLE IF NOT EXISTS strategy_evaluations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL REFERENCES stocks(code), + eval_type TEXT NOT NULL, + status TEXT DEFAULT 'pending', + old_stop_loss REAL, + new_stop_loss REAL, + old_tp REAL, + new_tp REAL, + reason TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + """) + conn.commit() + + +# ═══════════════════════════════════════════════════════════ +# 市场快照写入 +# ═══════════════════════════════════════════════════════════ + +def write_market_snapshot(conn: sqlite3.Connection, market_data: dict) -> tuple[bool, str, Optional[int]]: + """写入一次市场采集到 market_snapshots + sector_snapshots + + Returns: (ok, message, snapshot_id) + """ + try: + cur = conn.execute( + "INSERT INTO market_snapshots (timestamp, source, up_ratio, mood) VALUES (?, ?, ?, ?)", + (market_data["timestamp"], market_data.get("source", "unknown"), + market_data.get("up_ratio", 0), market_data.get("mood", "unknown")), + ) + sid = cur.lastrowid + + sectors = market_data.get("sectors", []) + rows = [(sid, s.get("name", ""), s.get("change", 0), + s.get("up_count"), s.get("down_count"), s.get("net_inflow"), + s.get("lead_stock"), s.get("lead_stock_change"), + s.get("volume"), s.get("turnover")) for s in sectors] + if rows: + conn.executemany( + "INSERT INTO sector_snapshots (snapshot_id, name, change_pct, up_count, down_count, " + "net_inflow, lead_stock, lead_stock_change, volume, turnover) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", rows) + conn.commit() + return True, f"snapshot_id={sid}, sectors={len(rows)}", sid + except Exception as e: + try: + conn.rollback() + except Exception: + pass + return False, str(e), None + + +# ═══════════════════════════════════════════════════════════ +# K线写入 +# ═══════════════════════════════════════════════════════════ + +def write_klines(conn: sqlite3.Connection, code: str, name: str, + daily: list = None, weekly: list = None, monthly: list = None, + fundamentals: dict = None) -> bool: + """将个股K线数据双写 SQLite + + Args: + code: 股票代码 + name: 股票名称 + daily/weekly/monthly: [{date, open, close, high, low, volume}, ...] + fundamentals: {pe, pb, eps, mcap_total, mcap_flow} + """ + try: + # 判断交易所 + raw = str(code) + if len(raw) == 5 and raw.isdigit(): + exchange, stype = "HK", "H" + elif raw.startswith(("6", "5", "9")): + exchange, stype = "SH", "A" + else: + exchange, stype = "SZ", "A" + + # stocks 表(INSERT OR REPLACE) + conn.execute( + "INSERT OR REPLACE INTO stocks (code, name, exchange, type, updated_at) VALUES (?, ?, ?, ?, ?)", + (code, name, exchange, stype, datetime.now().isoformat())) + + # K线数据 + for period, table, data in [ + ("daily", "stock_daily", daily), + ("weekly", "stock_weekly", weekly), + ("monthly", "stock_monthly", monthly), + ]: + if not data: + continue + rows = [(code, d.get("date", ""), d.get("open"), d.get("close"), + d.get("high"), d.get("low"), d.get("volume"), + d.get("amount") if period == "daily" else None) for d in data] + if period == "daily": + conn.executemany( + f"INSERT OR REPLACE INTO {table} (code, date, open, close, high, low, volume, amount) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", rows) + else: + conn.executemany( + f"INSERT OR REPLACE INTO {table} (code, date, open, close, high, low, volume) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + [(r[0], r[1], r[2], r[3], r[4], r[5], r[6]) for r in rows]) + + # 基本面 + if fundamentals: + conn.execute( + "INSERT OR REPLACE INTO stock_fundamentals (code, pe, pb, eps, mcap_total, mcap_flow, updated_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (code, fundamentals.get("pe"), fundamentals.get("pb"), + fundamentals.get("eps"), fundamentals.get("mcap_total"), + fundamentals.get("mcap_flow"), datetime.now().isoformat())) + + conn.commit() + return True + except Exception as e: + try: + conn.rollback() + except Exception: + pass + return False + + +# ═══════════════════════════════════════════════════════════ +# 价格事件写入 +# ═══════════════════════════════════════════════════════════ + +def write_price_event(conn: sqlite3.Connection, code: str, name: str, + event_type: str, price: float, trigger_value: str, + event_label: str = "") -> bool: + """写入一条价格事件""" + try: + now = datetime.now() + conn.execute( + "INSERT INTO price_events (code, name, event_type, price, trigger_value, event_label, date) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (code, name, event_type, round(price, 2), trigger_value, + event_label, now.strftime("%Y-%m-%d"))) + conn.commit() + return True + except Exception: + try: + conn.rollback() + except Exception: + pass + return False + + +# ═══════════════════════════════════════════════════════════ +# 板块成分迁移 +# ═══════════════════════════════════════════════════════════ + +def migrate_stock_sectors(conn: sqlite3.Connection) -> tuple[int, int]: + """从 stock_sector_map.json 迁移到 stock_sectors 表 + + Returns: (migrated_stocks, total_mappings) + """ + sector_map_path = DATA_DIR / "stock_sector_map.json" + if not sector_map_path.exists(): + return 0, 0 + + try: + with open(sector_map_path, encoding="utf-8") as f: + data = json.load(f) + except Exception: + return 0, 0 + + # 过滤元数据字段 + mappings = [(code, sectors) for code, sectors in data.items() + if not code.startswith("_") and isinstance(sectors, list)] + + total = 0 + for code, sectors in mappings: + for sector in sectors: + try: + conn.execute( + "INSERT OR IGNORE INTO stock_sectors (code, sector_name, source) VALUES (?, ?, 'ths')", + (code, sector)) + total += 1 + except Exception: + pass + conn.commit() + return len(mappings), total + + +# ═══════════════════════════════════════════════════════════ +# 查询辅助 +# ═══════════════════════════════════════════════════════════ + +def query_sector_trend(conn: sqlite3.Connection, name: str, limit: int = 5) -> list[dict]: + """板块最近N次趋势""" + rows = conn.execute(""" + SELECT s.timestamp, ss.change_pct, ss.net_inflow, + ss.up_count, ss.down_count, ss.lead_stock, ss.lead_stock_change + FROM sector_snapshots ss + JOIN market_snapshots s ON ss.snapshot_id = s.id + WHERE ss.name = ? ORDER BY s.timestamp DESC LIMIT ? + """, (name, limit)).fetchall() + return [dict(r) for r in rows] + + +def query_top_inflow(conn: sqlite3.Connection, limit: int = 5) -> list[dict]: + """最新一次资金净流入排行""" + rows = conn.execute(""" + SELECT ss.name, ss.change_pct, ss.net_inflow, ss.lead_stock, s.timestamp + FROM sector_snapshots ss + JOIN market_snapshots s ON ss.snapshot_id = s.id + WHERE s.id = (SELECT MAX(id) FROM market_snapshots) + AND ss.net_inflow IS NOT NULL + ORDER BY ss.net_inflow DESC LIMIT ? + """, (limit,)).fetchall() + return [dict(r) for r in rows] + + +def query_consecutive_inflow(conn: sqlite3.Connection, days: int = 3) -> list[dict]: + """连续N次净流入的板块""" + rows = conn.execute(""" + SELECT name, COUNT(*) as times, ROUND(AVG(net_inflow), 2) as avg_inflow, + ROUND(AVG(change_pct), 2) as avg_change + FROM sector_snapshots ss + JOIN market_snapshots s ON ss.snapshot_id = s.id + WHERE s.id > (SELECT MAX(id) - ? FROM market_snapshots) + AND net_inflow > 0 + GROUP BY name HAVING COUNT(*) >= ? + ORDER BY avg_inflow DESC + """, (days, days)).fetchall() + return [dict(r) for r in rows] + + +def query_market_mood(conn: sqlite3.Connection, limit: int = 10) -> list[dict]: + """市场情绪趋势""" + rows = conn.execute(""" + SELECT timestamp, source, up_ratio, mood + FROM market_snapshots ORDER BY timestamp DESC LIMIT ? + """, (limit,)).fetchall() + return [dict(r) for r in rows] + + +def query_db_stats(conn: sqlite3.Connection) -> dict: + """数据库概览""" + snap_count = conn.execute("SELECT COUNT(*) FROM market_snapshots").fetchone()[0] + sector_count = conn.execute("SELECT COUNT(*) FROM sector_snapshots").fetchone()[0] + stock_count = conn.execute("SELECT COUNT(*) FROM stocks").fetchone()[0] + kline_count = conn.execute( + "SELECT COUNT(*) FROM stock_daily").fetchone()[0] + event_count = conn.execute("SELECT COUNT(*) FROM price_events").fetchone()[0] + latest = conn.execute( + "SELECT timestamp, source FROM market_snapshots ORDER BY id DESC LIMIT 1").fetchone() + return { + "snapshots": snap_count, "sector_rows": sector_count, + "stocks": stock_count, "daily_klines": kline_count, + "price_events": event_count, + "latest_snapshot": dict(latest) if latest else None, + } diff --git a/mofin_query.py b/mofin_query.py index 84462e8..560664c 100644 --- a/mofin_query.py +++ b/mofin_query.py @@ -6,41 +6,22 @@ python3 mofin_query.py "今天资金净流入最多的5个板块" python3 mofin_query.py "最近3天连续净流入的板块" python3 mofin_query.py "市场情绪趋势(最近10次)" + python3 mofin_query.py "数据库概览" """ -import sqlite3 import sys -from pathlib import Path - -DB_PATH = Path(__file__).parent / "data" / "mofin.db" +import re +from mofin_db import (get_conn, query_sector_trend, query_top_inflow, + query_consecutive_inflow, query_market_mood, query_db_stats) -def get_conn(): - conn = sqlite3.connect(str(DB_PATH)) - conn.row_factory = sqlite3.Row - return conn - - -# ── 预定义查询 ────────────────────────────────────── - -def query_sector_trend(name: str, limit: int = 5): - """查询某板块最近N次采集的涨跌幅趋势""" +def _print_sector_trend(name: str, limit: int = 5): conn = get_conn() - rows = conn.execute(""" - SELECT s.timestamp, ss.change_pct, ss.net_inflow, - ss.up_count, ss.down_count, ss.lead_stock, ss.lead_stock_change - FROM sector_snapshots ss - JOIN market_snapshots s ON ss.snapshot_id = s.id - WHERE ss.name = ? - ORDER BY s.timestamp DESC - LIMIT ? - """, (name, limit)).fetchall() + rows = query_sector_trend(conn, name, limit) conn.close() - if not rows: print(f"未找到板块「{name}」的数据") return - print(f"\n{'='*60}") print(f" {name} 板块 — 最近 {len(rows)} 次采集") print(f"{'='*60}") @@ -51,24 +32,13 @@ def query_sector_trend(name: str, limit: int = 5): f"{r['up_count'] or '-':>6} {r['down_count'] or '-':>6} {r['lead_stock'] or '-':>10}") -def query_top_inflow(limit: int = 5): - """最新一次采集中资金净流入最多的板块""" +def _print_top_inflow(limit: int = 5): conn = get_conn() - rows = conn.execute(""" - SELECT ss.name, ss.change_pct, ss.net_inflow, ss.lead_stock, s.timestamp - FROM sector_snapshots ss - JOIN market_snapshots s ON ss.snapshot_id = s.id - WHERE s.id = (SELECT MAX(id) FROM market_snapshots) - AND ss.net_inflow IS NOT NULL - ORDER BY ss.net_inflow DESC - LIMIT ? - """, (limit,)).fetchall() + rows = query_top_inflow(conn, limit) conn.close() - if not rows: print("暂无数据") return - print(f"\n{'='*60}") print(f" 资金净流入 Top {len(rows)}({rows[0]['timestamp']})") print(f"{'='*60}") @@ -78,26 +48,13 @@ def query_top_inflow(limit: int = 5): print(f"{r['name']:<12} {r['change_pct']:>8.2f} {r['net_inflow']:>10.2f} {r['lead_stock'] or '-':>10}") -def query_consecutive_inflow(days: int = 3): - """最近N次采集中连续净流入的板块""" +def _print_consecutive_inflow(days: int = 3): conn = get_conn() - rows = conn.execute(""" - SELECT name, COUNT(*) as times, ROUND(AVG(net_inflow), 2) as avg_inflow, - ROUND(AVG(change_pct), 2) as avg_change - FROM sector_snapshots ss - JOIN market_snapshots s ON ss.snapshot_id = s.id - WHERE s.id > (SELECT MAX(id) - ? FROM market_snapshots) - AND net_inflow > 0 - GROUP BY name - HAVING COUNT(*) >= ? - ORDER BY avg_inflow DESC - """, (days, days)).fetchall() + rows = query_consecutive_inflow(conn, days) conn.close() - if not rows: print(f"没有板块连续 {days} 次净流入") return - print(f"\n{'='*60}") print(f" 连续 {days} 次净流入的板块") print(f"{'='*60}") @@ -107,21 +64,13 @@ def query_consecutive_inflow(days: int = 3): print(f"{r['name']:<12} {r['times']:>4} {r['avg_inflow']:>12.2f} {r['avg_change']:>10.2f}") -def query_market_mood(limit: int = 10): - """市场情绪趋势""" +def _print_market_mood(limit: int = 10): conn = get_conn() - rows = conn.execute(""" - SELECT timestamp, source, up_ratio, mood - FROM market_snapshots - ORDER BY timestamp DESC - LIMIT ? - """, (limit,)).fetchall() + rows = query_market_mood(conn, limit) conn.close() - if not rows: print("暂无数据") return - print(f"\n{'='*60}") print(f" 市场情绪趋势 — 最近 {len(rows)} 次") print(f"{'='*60}") @@ -132,96 +81,69 @@ def query_market_mood(limit: int = 10): print(f"{r['timestamp']:<20} {r['source']:>10} {r['up_ratio']:>10.1f} {mood_emoji} {r['mood']:>8}") -def query_stats(): - """数据库概览统计""" +def _print_stats(): conn = get_conn() - snap_count = conn.execute("SELECT COUNT(*) FROM market_snapshots").fetchone()[0] - sector_count = conn.execute("SELECT COUNT(*) FROM sector_snapshots").fetchone()[0] - latest = conn.execute( - "SELECT timestamp, source FROM market_snapshots ORDER BY id DESC LIMIT 1" - ).fetchone() + stats = query_db_stats(conn) conn.close() - print(f"\n{'='*40}") print(f" MoFin 数据库概览") print(f"{'='*40}") - print(f" 采集次数: {snap_count}") - print(f" 板块快照: {sector_count}") - if latest: - print(f" 最新采集: {latest['timestamp']} ({latest['source']})") + print(f" 采集次数: {stats['snapshots']}") + print(f" 板块快照: {stats['sector_rows']}") + print(f" 个股数量: {stats['stocks']}") + print(f" 日K线数: {stats['daily_klines']}") + print(f" 价格事件: {stats['price_events']}") + ls = stats.get('latest_snapshot') + if ls: + print(f" 最新采集: {ls['timestamp']} ({ls['source']})") else: print(f" 最新采集: 暂无") -# ── 智能路由 ────────────────────────────────────── - def route(query: str): q = query.strip() - - # 板块趋势 if "最近" in q and "次" in q and ("涨跌" in q or "趋势" in q or "采集" in q): - # 提取板块名 - import re names = re.findall(r'["「]([^"」]+)["」]', q) if not names: - # 尝试无引号匹配 for word in ["半导体", "银行", "医药", "新能源", "白酒", "军工", "芯片", "房地产", "汽车"]: if word in q: - names = [word] - break + names = [word]; break if names: limit = 5 m = re.search(r'(\d+)\s*次', q) - if m: - limit = int(m.group(1)) - query_sector_trend(names[0], limit) + if m: limit = int(m.group(1)) + _print_sector_trend(names[0], limit) return - - # 资金净流入排行 if "净流入" in q and ("最多" in q or "排行" in q or "top" in q.lower()): limit = 5 - import re m = re.search(r'(\d+)', q) - if m: - limit = int(m.group(1)) - query_top_inflow(limit) + if m: limit = int(m.group(1)) + _print_top_inflow(limit) return - - # 连续净流入 if "连续" in q and "净流入" in q: days = 3 - import re m = re.search(r'(\d+)\s*天', q) - if m: - days = int(m.group(1)) - query_consecutive_inflow(days) + if m: days = int(m.group(1)) + _print_consecutive_inflow(days) return - - # 市场情绪 if "情绪" in q or "mood" in q.lower(): limit = 10 - import re m = re.search(r'(\d+)\s*次', q) - if m: - limit = int(m.group(1)) - query_market_mood(limit) + if m: limit = int(m.group(1)) + _print_market_mood(limit) return - - # 统计概览 if "概览" in q or "统计" in q or "stats" in q.lower(): - query_stats() + _print_stats() return - - # 直接 SQL(以 SELECT 开头) if q.upper().strip().startswith("SELECT"): conn = get_conn() try: rows = conn.execute(q).fetchall() if rows: - cols = rows[0].keys() + cols = [d[0] for d in conn.execute(q + " LIMIT 0").description] print("\t".join(cols)) for r in rows: - print("\t".join(str(r[c]) for c in cols)) + print("\t".join(str(c) for c in r)) else: print("(empty)") except Exception as e: @@ -229,10 +151,7 @@ def route(query: str): finally: conn.close() return - - # 未匹配 - print(f"未识别的查询: {q}") - print() + print(f"未识别的查询: {q}\n") print("支持的查询模式:") print(" 「半导体」最近5次采集的涨跌幅") print(" 今天资金净流入最多的5个板块") @@ -247,5 +166,4 @@ if __name__ == "__main__": print("用法: python3 mofin_query.py \"查询语句\"") print("示例: python3 mofin_query.py \"半导体最近5次采集的涨跌幅\"") sys.exit(1) - route(sys.argv[1]) diff --git a/multi_timeframe.py b/multi_timeframe.py index f67a216..e4611fd 100644 --- a/multi_timeframe.py +++ b/multi_timeframe.py @@ -28,6 +28,32 @@ KLINE_URL = "http://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={market}{co QUOTE_URL = "http://qt.gtimg.cn/q={market}{code}" +def _write_klines_to_db(code: str, daily: list, weekly: list, monthly: list, fundamentals: dict = None): + """K线数据双写 SQLite(失败不影响缓存写入)""" + try: + from mofin_db import get_conn, init_all_tables, write_klines + conn = get_conn() + init_all_tables(conn) + # 从 stock_profiles.json 获取名称 + name = code + try: + import json + profiles_path = os.path.join(DATA_DIR, "stock_profiles.json") + if os.path.exists(profiles_path): + with open(profiles_path, encoding="utf-8") as f: + profiles = json.load(f) + for p in profiles.get("profiles", []): + if p.get("code") == code: + name = p.get("name", code) + break + except Exception: + pass + write_klines(conn, code, name, daily, weekly, monthly, fundamentals) + conn.close() + except Exception: + pass # SQLite 写入失败不影响主流程 + + def _market_prefix(code: str) -> str: """根据代码确定市场前缀""" raw = str(code).split("_")[0] @@ -572,6 +598,9 @@ def _save_local_history(code: str, daily: list, weekly: list, monthly: list): cache_data[code] = stock _MTF_CACHE_DATA = cache_data # 更新模块级缓存 + # ── SQLite 双写 ── + _write_klines_to_db(code, daily, weekly, monthly, stock.get("fundamentals")) + def batch_update_all(codes: list): """批量更新多只股票的多周期数据""" diff --git a/price_monitor.py b/price_monitor.py index 342ffe6..1953e7a 100644 --- a/price_monitor.py +++ b/price_monitor.py @@ -191,7 +191,7 @@ def save_events(events): def record_event(code, name, event_type, price, trigger_value, event_label=""): - """记录一次价格触发事件到 price_events.json""" + """记录一次价格触发事件到 price_events.json + SQLite""" events = load_events() now = datetime.now().isoformat() events["events"].append({ @@ -208,6 +208,16 @@ def record_event(code, name, event_type, price, trigger_value, event_label=""): events["events"] = events["events"][-10000:] save_events(events) + # ── SQLite 双写 ── + try: + from mofin_db import get_conn, init_all_tables, write_price_event + conn = get_conn() + init_all_tables(conn) + write_price_event(conn, code, name, event_type, price, trigger_value, event_label) + conn.close() + except Exception: + pass # SQLite 写入失败不影响主流程 + def get_trigger_zones(trigger): """返回该trigger所有可监控的区间列表,跳过已执行的batch"""