refactor: 数据层重构 — 统一 SQLite 访问层 + 多脚本双写

新建 mofin_db.py 共享数据库模块:
- get_conn() 统一连接管理 (WAL + Row factory + 外键)
- init_all_tables() 幂等建表 (12张表: market/sector/stock/kline/fundamentals/sectors/holdings/strategies/watchlist/candidates/score_history/events/evaluations)
- write_market_snapshot() 市场快照双写
- write_klines() K线数据双写 (stocks + daily/weekly/monthly + fundamentals)
- write_price_event() 价格事件双写
- migrate_stock_sectors() 一次性迁移 stock_sector_map.json
- query_*() 通用查询函数 (sector_trend/top_inflow/consecutive_inflow/market_mood/db_stats)

重构现有脚本:
- market_watch.py: 删除内联 DB 代码,改用 mofin_db
- multi_timeframe.py: _save_local_history() 加 SQLite 双写
- price_monitor.py: record_event() 加 SQLite 双写
- mofin_query.py: 改用 mofin_db 查询函数

新增:
- migrate_sectors.py: 一次性迁移脚本

清理:
- get_realtime_prices.py: 死代码 (只读 portfolio.json,不调API)
This commit is contained in:
hmo
2026-06-20 16:25:36 +08:00
parent 8926b11090
commit 0924cf3124
7 changed files with 568 additions and 308 deletions
-94
View File
@@ -1,94 +0,0 @@
#!/usr/bin/env python3
import json
import subprocess
import sys
import os
from datetime import datetime
# 读取持仓数据
with open('data/portfolio.json', 'r') as f:
portfolio = json.load(f)
# 读取决策数据
with open('data/decisions.json', 'r') as f:
decisions_data = json.load(f)
# 获取所有持仓股票
holdings = portfolio['holdings']
active_holdings = [h for h in holdings if h.get('shares', 0) > 0]
print(f"持仓股票数量: {len(active_holdings)}")
# 获取实时价格 - 使用curl调用API
# 注意:这里需要根据实际情况调整API调用
realtime_prices = {}
for holding in active_holdings:
code = holding['code']
name = holding['name']
# 判断市场类型
# 港股代码5位数字(如01211),优先判断
if len(code) == 5: # 港股
market = 'hk'
price = holding['price']
change_pct = holding.get('change_pct', 0)
realtime_prices[code] = {
'name': name,
'price': price,
'change_pct': change_pct,
'market': 'HK'
}
elif code.startswith('6'): # 沪市
market = 'sh'
price = holding['price']
change_pct = holding.get('change_pct', 0)
realtime_prices[code] = {
'name': name,
'price': price,
'change_pct': change_pct,
'market': 'A'
}
elif code.startswith('0') or code.startswith('3'): # 深市
market = 'sz'
price = holding['price']
change_pct = holding.get('change_pct', 0)
realtime_prices[code] = {
'name': name,
'price': price,
'change_pct': change_pct,
'market': 'A'
}
else:
# 其他类型
price = holding['price']
change_pct = holding.get('change_pct', 0)
realtime_prices[code] = {
'name': name,
'price': price,
'change_pct': change_pct,
'market': 'OTHER'
}
# 获取活跃决策
active_decisions = [d for d in decisions_data['decisions'] if d.get('status') == 'active']
print(f"活跃决策数量: {len(active_decisions)}")
# 输出结果
print("\n=== 实时价格数据 ===")
for code, data in realtime_prices.items():
print(f"{code} {data['name']}: {data['price']} ({data['change_pct']:+.2f}%)")
# 保存到临时文件供后续使用
output_data = {
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'prices': realtime_prices,
'holdings': active_holdings,
'active_decisions': active_decisions
}
with open('data/temp_realtime.json', 'w') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"\n数据已保存到 data/temp_realtime.json")
+9 -96
View File
@@ -13,102 +13,12 @@
"""
import json
import sqlite3
from datetime import datetime
from pathlib import Path
from mofin_db import get_conn, init_all_tables, write_market_snapshot
DATA_DIR = Path(__file__).parent / "data"
DB_PATH = DATA_DIR / "mofin.db"
# ── 数据库初始化 ──────────────────────────────────────
def init_db():
"""创建 mofin.db 及所有表(幂等,已存在则跳过)"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.executescript("""
CREATE TABLE IF NOT EXISTS market_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'ths',
up_ratio REAL,
mood TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE INDEX IF NOT EXISTS idx_snapshots_time ON market_snapshots(timestamp);
CREATE TABLE IF NOT EXISTS sector_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
snapshot_id INTEGER NOT NULL REFERENCES market_snapshots(id),
name TEXT NOT NULL,
change_pct REAL,
up_count INTEGER,
down_count INTEGER,
net_inflow REAL,
lead_stock TEXT,
lead_stock_change REAL,
volume REAL,
turnover REAL
);
CREATE INDEX IF NOT EXISTS idx_sector_name ON sector_snapshots(name);
CREATE INDEX IF NOT EXISTS idx_sector_snapshot ON sector_snapshots(snapshot_id);
CREATE INDEX IF NOT EXISTS idx_sector_name_time ON sector_snapshots(name, snapshot_id);
""")
conn.commit()
return conn
def write_snapshot(conn, market_data: dict):
"""将一次采集结果双写 SQLite(JSON 写入由 main 负责)"""
try:
# 1. INSERT market_snapshots
cur = conn.execute(
"""INSERT INTO market_snapshots (timestamp, source, up_ratio, mood)
VALUES (?, ?, ?, ?)""",
(
market_data["timestamp"],
market_data.get("source", "unknown"),
market_data.get("up_ratio", 0),
market_data.get("mood", "unknown"),
),
)
snapshot_id = cur.lastrowid
# 2. INSERT sector_snapshots(逐板块)
sectors = market_data.get("sectors", [])
rows = []
for s in sectors:
rows.append((
snapshot_id,
s.get("name", ""),
s.get("change", 0),
s.get("up_count"),
s.get("down_count"),
s.get("net_inflow"),
s.get("lead_stock"),
s.get("lead_stock_change"),
s.get("volume"),
s.get("turnover"),
))
if rows:
conn.executemany(
"""INSERT INTO sector_snapshots
(snapshot_id, name, change_pct, up_count, down_count,
net_inflow, lead_stock, lead_stock_change, volume, turnover)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
rows,
)
conn.commit()
return snapshot_id, len(rows)
except Exception as e:
print(f"[DB] SQLite 写入失败(JSON 不受影响): {e}", flush=True)
try:
conn.rollback()
except Exception:
pass
return None, 0
# ── 後端A:東方財富 push2 API(首選,有板塊代碼+實時指數) ──
@@ -252,10 +162,13 @@ def main():
json.dump(market_data, f, ensure_ascii=False, indent=2)
# ── SQLite 双写 ──
conn = init_db()
sid, count = write_snapshot(conn, market_data)
if sid:
print(f"[DB] snapshot_id={sid}, sectors={count}", flush=True)
conn = get_conn()
init_all_tables(conn)
ok, msg, sid = write_market_snapshot(conn, market_data)
if ok:
print(f"[DB] {msg}", flush=True)
else:
print(f"[DB] 写入失败(JSON 不受影响): {msg}", flush=True)
conn.close()
# 靜默:只寫文件,不輸出到stdout,避免cron推送
+22
View File
@@ -0,0 +1,22 @@
#!/usr/bin/env python3
"""一次性迁移脚本:stock_sector_map.json → stock_sectors 表
运行方式:
python3 migrate_sectors.py
"""
from mofin_db import get_conn, init_all_tables, migrate_stock_sectors
conn = get_conn()
init_all_tables(conn)
stocks, mappings = migrate_stock_sectors(conn)
print(f"迁移完成: {stocks} 只股票, {mappings} 条板块映射")
# 验证
rows = conn.execute("SELECT COUNT(*) FROM stock_sectors").fetchone()[0]
print(f"stock_sectors 表总行数: {rows}")
# 样例
for r in conn.execute("SELECT * FROM stock_sectors LIMIT 5").fetchall():
print(f" {r[0]}{r[1]}")
conn.close()
+462
View File
@@ -0,0 +1,462 @@
#!/usr/bin/env python3
"""mofin_db.py — MoFin 统一数据库访问层
所有脚本通过此模块访问 mofin.db,避免重复建表/连接逻辑。
用法:
from mofin_db import get_conn, write_market_snapshot, write_klines, ...
设计原则:
- 幂等建表(CREATE TABLE IF NOT EXISTS
- WAL 模式 + 外键约束
- 所有写操作返回 (success: bool, detail: str)
- JSON 写入由调用方负责,本模块只写 SQLite
"""
import sqlite3
import json
from datetime import datetime
from pathlib import Path
from typing import Optional
DATA_DIR = Path(__file__).parent / "data"
DB_PATH = DATA_DIR / "mofin.db"
# ═══════════════════════════════════════════════════════════
# 连接管理
# ═══════════════════════════════════════════════════════════
def get_conn() -> sqlite3.Connection:
"""获取数据库连接(WAL 模式,外键约束,Row 工厂)"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
# ═══════════════════════════════════════════════════════════
# 建表(幂等)
# ═══════════════════════════════════════════════════════════
def init_all_tables(conn: sqlite3.Connection):
"""创建全部表(幂等,已存在则跳过)"""
conn.executescript("""
-- 市场快照
CREATE TABLE IF NOT EXISTS market_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'ths',
up_ratio REAL,
mood TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE INDEX IF NOT EXISTS idx_snapshots_time ON market_snapshots(timestamp);
-- 板块快照
CREATE TABLE IF NOT EXISTS sector_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
snapshot_id INTEGER NOT NULL REFERENCES market_snapshots(id),
name TEXT NOT NULL,
change_pct REAL,
up_count INTEGER,
down_count INTEGER,
net_inflow REAL,
lead_stock TEXT,
lead_stock_change REAL,
volume REAL,
turnover REAL
);
CREATE INDEX IF NOT EXISTS idx_sector_name ON sector_snapshots(name);
CREATE INDEX IF NOT EXISTS idx_sector_snapshot ON sector_snapshots(snapshot_id);
CREATE INDEX IF NOT EXISTS idx_sector_name_time ON sector_snapshots(name, snapshot_id);
-- 个股
CREATE TABLE IF NOT EXISTS stocks (
code TEXT PRIMARY KEY,
name TEXT NOT NULL,
exchange TEXT DEFAULT 'SH',
type TEXT DEFAULT 'A',
updated_at TEXT
);
-- K线(日/周/月)
CREATE TABLE IF NOT EXISTS stock_daily (
code TEXT NOT NULL REFERENCES stocks(code),
date TEXT NOT NULL,
open REAL, close REAL, high REAL, low REAL,
volume REAL, amount REAL,
PRIMARY KEY (code, date)
);
CREATE TABLE IF NOT EXISTS stock_weekly (
code TEXT NOT NULL REFERENCES stocks(code),
date TEXT NOT NULL,
open REAL, close REAL, high REAL, low REAL,
volume REAL,
PRIMARY KEY (code, date)
);
CREATE TABLE IF NOT EXISTS stock_monthly (
code TEXT NOT NULL REFERENCES stocks(code),
date TEXT NOT NULL,
open REAL, close REAL, high REAL, low REAL,
volume REAL,
PRIMARY KEY (code, date)
);
-- 基本面
CREATE TABLE IF NOT EXISTS stock_fundamentals (
code TEXT PRIMARY KEY REFERENCES stocks(code),
pe REAL, pb REAL, eps REAL,
mcap_total REAL, mcap_flow REAL,
updated_at TEXT
);
-- 板块成分映射
CREATE TABLE IF NOT EXISTS stock_sectors (
code TEXT NOT NULL REFERENCES stocks(code),
sector_name TEXT NOT NULL,
source TEXT DEFAULT 'ths',
updated_at TEXT DEFAULT (datetime('now','localtime')),
PRIMARY KEY (code, sector_name)
);
CREATE INDEX IF NOT EXISTS idx_stock_sector ON stock_sectors(sector_name);
-- 持仓
CREATE TABLE IF NOT EXISTS holdings (
code TEXT PRIMARY KEY REFERENCES stocks(code),
name TEXT NOT NULL,
shares INTEGER NOT NULL,
cost REAL,
position_pct REAL,
added_at TEXT,
is_active INTEGER DEFAULT 1,
closed_at TEXT,
close_pnl REAL
);
-- 持仓策略
CREATE TABLE IF NOT EXISTS holding_strategies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL REFERENCES holdings(code),
version INTEGER DEFAULT 1,
stop_loss REAL,
take_profit REAL,
entry_low REAL,
entry_high REAL,
strategy_type TEXT DEFAULT 'holding',
source TEXT,
reason TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
superseded_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_strategy_code ON holding_strategies(code);
-- 自选股
CREATE TABLE IF NOT EXISTS watchlist_stocks (
code TEXT PRIMARY KEY REFERENCES stocks(code),
name TEXT NOT NULL,
added_at TEXT DEFAULT (datetime('now','localtime')),
is_active INTEGER DEFAULT 1
);
-- 候选池
CREATE TABLE IF NOT EXISTS candidates (
code TEXT PRIMARY KEY REFERENCES stocks(code),
name TEXT NOT NULL,
sector TEXT,
reason TEXT,
entry_range TEXT,
stop_loss REAL,
target REAL,
zhiwei_star REAL,
zhiwei_reviewed INTEGER DEFAULT 0,
zhiwei_reviewed_at TEXT,
promoted INTEGER DEFAULT 0,
promoted_at TEXT,
dropped INTEGER DEFAULT 0,
drop_reason TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
-- 候选评分历史
CREATE TABLE IF NOT EXISTS candidate_score_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL REFERENCES candidates(code),
score REAL NOT NULL,
source TEXT NOT NULL,
reason TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE INDEX IF NOT EXISTS idx_candidate_history ON candidate_score_history(code, created_at);
-- 价格事件
CREATE TABLE IF NOT EXISTS price_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL REFERENCES stocks(code),
name TEXT,
event_type TEXT NOT NULL,
price REAL,
trigger_value TEXT,
event_label TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
date TEXT
);
CREATE INDEX IF NOT EXISTS idx_events_code ON price_events(code);
CREATE INDEX IF NOT EXISTS idx_events_date ON price_events(date);
-- 策略评估记录
CREATE TABLE IF NOT EXISTS strategy_evaluations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL REFERENCES stocks(code),
eval_type TEXT NOT NULL,
status TEXT DEFAULT 'pending',
old_stop_loss REAL,
new_stop_loss REAL,
old_tp REAL,
new_tp REAL,
reason TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
""")
conn.commit()
# ═══════════════════════════════════════════════════════════
# 市场快照写入
# ═══════════════════════════════════════════════════════════
def write_market_snapshot(conn: sqlite3.Connection, market_data: dict) -> tuple[bool, str, Optional[int]]:
"""写入一次市场采集到 market_snapshots + sector_snapshots
Returns: (ok, message, snapshot_id)
"""
try:
cur = conn.execute(
"INSERT INTO market_snapshots (timestamp, source, up_ratio, mood) VALUES (?, ?, ?, ?)",
(market_data["timestamp"], market_data.get("source", "unknown"),
market_data.get("up_ratio", 0), market_data.get("mood", "unknown")),
)
sid = cur.lastrowid
sectors = market_data.get("sectors", [])
rows = [(sid, s.get("name", ""), s.get("change", 0),
s.get("up_count"), s.get("down_count"), s.get("net_inflow"),
s.get("lead_stock"), s.get("lead_stock_change"),
s.get("volume"), s.get("turnover")) for s in sectors]
if rows:
conn.executemany(
"INSERT INTO sector_snapshots (snapshot_id, name, change_pct, up_count, down_count, "
"net_inflow, lead_stock, lead_stock_change, volume, turnover) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", rows)
conn.commit()
return True, f"snapshot_id={sid}, sectors={len(rows)}", sid
except Exception as e:
try:
conn.rollback()
except Exception:
pass
return False, str(e), None
# ═══════════════════════════════════════════════════════════
# K线写入
# ═══════════════════════════════════════════════════════════
def write_klines(conn: sqlite3.Connection, code: str, name: str,
daily: list = None, weekly: list = None, monthly: list = None,
fundamentals: dict = None) -> bool:
"""将个股K线数据双写 SQLite
Args:
code: 股票代码
name: 股票名称
daily/weekly/monthly: [{date, open, close, high, low, volume}, ...]
fundamentals: {pe, pb, eps, mcap_total, mcap_flow}
"""
try:
# 判断交易所
raw = str(code)
if len(raw) == 5 and raw.isdigit():
exchange, stype = "HK", "H"
elif raw.startswith(("6", "5", "9")):
exchange, stype = "SH", "A"
else:
exchange, stype = "SZ", "A"
# stocks 表(INSERT OR REPLACE
conn.execute(
"INSERT OR REPLACE INTO stocks (code, name, exchange, type, updated_at) VALUES (?, ?, ?, ?, ?)",
(code, name, exchange, stype, datetime.now().isoformat()))
# K线数据
for period, table, data in [
("daily", "stock_daily", daily),
("weekly", "stock_weekly", weekly),
("monthly", "stock_monthly", monthly),
]:
if not data:
continue
rows = [(code, d.get("date", ""), d.get("open"), d.get("close"),
d.get("high"), d.get("low"), d.get("volume"),
d.get("amount") if period == "daily" else None) for d in data]
if period == "daily":
conn.executemany(
f"INSERT OR REPLACE INTO {table} (code, date, open, close, high, low, volume, amount) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)", rows)
else:
conn.executemany(
f"INSERT OR REPLACE INTO {table} (code, date, open, close, high, low, volume) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
[(r[0], r[1], r[2], r[3], r[4], r[5], r[6]) for r in rows])
# 基本面
if fundamentals:
conn.execute(
"INSERT OR REPLACE INTO stock_fundamentals (code, pe, pb, eps, mcap_total, mcap_flow, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(code, fundamentals.get("pe"), fundamentals.get("pb"),
fundamentals.get("eps"), fundamentals.get("mcap_total"),
fundamentals.get("mcap_flow"), datetime.now().isoformat()))
conn.commit()
return True
except Exception as e:
try:
conn.rollback()
except Exception:
pass
return False
# ═══════════════════════════════════════════════════════════
# 价格事件写入
# ═══════════════════════════════════════════════════════════
def write_price_event(conn: sqlite3.Connection, code: str, name: str,
event_type: str, price: float, trigger_value: str,
event_label: str = "") -> bool:
"""写入一条价格事件"""
try:
now = datetime.now()
conn.execute(
"INSERT INTO price_events (code, name, event_type, price, trigger_value, event_label, date) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(code, name, event_type, round(price, 2), trigger_value,
event_label, now.strftime("%Y-%m-%d")))
conn.commit()
return True
except Exception:
try:
conn.rollback()
except Exception:
pass
return False
# ═══════════════════════════════════════════════════════════
# 板块成分迁移
# ═══════════════════════════════════════════════════════════
def migrate_stock_sectors(conn: sqlite3.Connection) -> tuple[int, int]:
"""从 stock_sector_map.json 迁移到 stock_sectors 表
Returns: (migrated_stocks, total_mappings)
"""
sector_map_path = DATA_DIR / "stock_sector_map.json"
if not sector_map_path.exists():
return 0, 0
try:
with open(sector_map_path, encoding="utf-8") as f:
data = json.load(f)
except Exception:
return 0, 0
# 过滤元数据字段
mappings = [(code, sectors) for code, sectors in data.items()
if not code.startswith("_") and isinstance(sectors, list)]
total = 0
for code, sectors in mappings:
for sector in sectors:
try:
conn.execute(
"INSERT OR IGNORE INTO stock_sectors (code, sector_name, source) VALUES (?, ?, 'ths')",
(code, sector))
total += 1
except Exception:
pass
conn.commit()
return len(mappings), total
# ═══════════════════════════════════════════════════════════
# 查询辅助
# ═══════════════════════════════════════════════════════════
def query_sector_trend(conn: sqlite3.Connection, name: str, limit: int = 5) -> list[dict]:
"""板块最近N次趋势"""
rows = conn.execute("""
SELECT s.timestamp, ss.change_pct, ss.net_inflow,
ss.up_count, ss.down_count, ss.lead_stock, ss.lead_stock_change
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE ss.name = ? ORDER BY s.timestamp DESC LIMIT ?
""", (name, limit)).fetchall()
return [dict(r) for r in rows]
def query_top_inflow(conn: sqlite3.Connection, limit: int = 5) -> list[dict]:
"""最新一次资金净流入排行"""
rows = conn.execute("""
SELECT ss.name, ss.change_pct, ss.net_inflow, ss.lead_stock, s.timestamp
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE s.id = (SELECT MAX(id) FROM market_snapshots)
AND ss.net_inflow IS NOT NULL
ORDER BY ss.net_inflow DESC LIMIT ?
""", (limit,)).fetchall()
return [dict(r) for r in rows]
def query_consecutive_inflow(conn: sqlite3.Connection, days: int = 3) -> list[dict]:
"""连续N次净流入的板块"""
rows = conn.execute("""
SELECT name, COUNT(*) as times, ROUND(AVG(net_inflow), 2) as avg_inflow,
ROUND(AVG(change_pct), 2) as avg_change
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE s.id > (SELECT MAX(id) - ? FROM market_snapshots)
AND net_inflow > 0
GROUP BY name HAVING COUNT(*) >= ?
ORDER BY avg_inflow DESC
""", (days, days)).fetchall()
return [dict(r) for r in rows]
def query_market_mood(conn: sqlite3.Connection, limit: int = 10) -> list[dict]:
"""市场情绪趋势"""
rows = conn.execute("""
SELECT timestamp, source, up_ratio, mood
FROM market_snapshots ORDER BY timestamp DESC LIMIT ?
""", (limit,)).fetchall()
return [dict(r) for r in rows]
def query_db_stats(conn: sqlite3.Connection) -> dict:
"""数据库概览"""
snap_count = conn.execute("SELECT COUNT(*) FROM market_snapshots").fetchone()[0]
sector_count = conn.execute("SELECT COUNT(*) FROM sector_snapshots").fetchone()[0]
stock_count = conn.execute("SELECT COUNT(*) FROM stocks").fetchone()[0]
kline_count = conn.execute(
"SELECT COUNT(*) FROM stock_daily").fetchone()[0]
event_count = conn.execute("SELECT COUNT(*) FROM price_events").fetchone()[0]
latest = conn.execute(
"SELECT timestamp, source FROM market_snapshots ORDER BY id DESC LIMIT 1").fetchone()
return {
"snapshots": snap_count, "sector_rows": sector_count,
"stocks": stock_count, "daily_klines": kline_count,
"price_events": event_count,
"latest_snapshot": dict(latest) if latest else None,
}
+35 -117
View File
@@ -6,41 +6,22 @@
python3 mofin_query.py "今天资金净流入最多的5个板块"
python3 mofin_query.py "最近3天连续净流入的板块"
python3 mofin_query.py "市场情绪趋势(最近10次)"
python3 mofin_query.py "数据库概览"
"""
import sqlite3
import sys
from pathlib import Path
DB_PATH = Path(__file__).parent / "data" / "mofin.db"
import re
from mofin_db import (get_conn, query_sector_trend, query_top_inflow,
query_consecutive_inflow, query_market_mood, query_db_stats)
def get_conn():
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
# ── 预定义查询 ──────────────────────────────────────
def query_sector_trend(name: str, limit: int = 5):
"""查询某板块最近N次采集的涨跌幅趋势"""
def _print_sector_trend(name: str, limit: int = 5):
conn = get_conn()
rows = conn.execute("""
SELECT s.timestamp, ss.change_pct, ss.net_inflow,
ss.up_count, ss.down_count, ss.lead_stock, ss.lead_stock_change
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE ss.name = ?
ORDER BY s.timestamp DESC
LIMIT ?
""", (name, limit)).fetchall()
rows = query_sector_trend(conn, name, limit)
conn.close()
if not rows:
print(f"未找到板块「{name}」的数据")
return
print(f"\n{'='*60}")
print(f" {name} 板块 — 最近 {len(rows)} 次采集")
print(f"{'='*60}")
@@ -51,24 +32,13 @@ def query_sector_trend(name: str, limit: int = 5):
f"{r['up_count'] or '-':>6} {r['down_count'] or '-':>6} {r['lead_stock'] or '-':>10}")
def query_top_inflow(limit: int = 5):
"""最新一次采集中资金净流入最多的板块"""
def _print_top_inflow(limit: int = 5):
conn = get_conn()
rows = conn.execute("""
SELECT ss.name, ss.change_pct, ss.net_inflow, ss.lead_stock, s.timestamp
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE s.id = (SELECT MAX(id) FROM market_snapshots)
AND ss.net_inflow IS NOT NULL
ORDER BY ss.net_inflow DESC
LIMIT ?
""", (limit,)).fetchall()
rows = query_top_inflow(conn, limit)
conn.close()
if not rows:
print("暂无数据")
return
print(f"\n{'='*60}")
print(f" 资金净流入 Top {len(rows)}{rows[0]['timestamp']}")
print(f"{'='*60}")
@@ -78,26 +48,13 @@ def query_top_inflow(limit: int = 5):
print(f"{r['name']:<12} {r['change_pct']:>8.2f} {r['net_inflow']:>10.2f} {r['lead_stock'] or '-':>10}")
def query_consecutive_inflow(days: int = 3):
"""最近N次采集中连续净流入的板块"""
def _print_consecutive_inflow(days: int = 3):
conn = get_conn()
rows = conn.execute("""
SELECT name, COUNT(*) as times, ROUND(AVG(net_inflow), 2) as avg_inflow,
ROUND(AVG(change_pct), 2) as avg_change
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE s.id > (SELECT MAX(id) - ? FROM market_snapshots)
AND net_inflow > 0
GROUP BY name
HAVING COUNT(*) >= ?
ORDER BY avg_inflow DESC
""", (days, days)).fetchall()
rows = query_consecutive_inflow(conn, days)
conn.close()
if not rows:
print(f"没有板块连续 {days} 次净流入")
return
print(f"\n{'='*60}")
print(f" 连续 {days} 次净流入的板块")
print(f"{'='*60}")
@@ -107,21 +64,13 @@ def query_consecutive_inflow(days: int = 3):
print(f"{r['name']:<12} {r['times']:>4} {r['avg_inflow']:>12.2f} {r['avg_change']:>10.2f}")
def query_market_mood(limit: int = 10):
"""市场情绪趋势"""
def _print_market_mood(limit: int = 10):
conn = get_conn()
rows = conn.execute("""
SELECT timestamp, source, up_ratio, mood
FROM market_snapshots
ORDER BY timestamp DESC
LIMIT ?
""", (limit,)).fetchall()
rows = query_market_mood(conn, limit)
conn.close()
if not rows:
print("暂无数据")
return
print(f"\n{'='*60}")
print(f" 市场情绪趋势 — 最近 {len(rows)}")
print(f"{'='*60}")
@@ -132,96 +81,69 @@ def query_market_mood(limit: int = 10):
print(f"{r['timestamp']:<20} {r['source']:>10} {r['up_ratio']:>10.1f} {mood_emoji} {r['mood']:>8}")
def query_stats():
"""数据库概览统计"""
def _print_stats():
conn = get_conn()
snap_count = conn.execute("SELECT COUNT(*) FROM market_snapshots").fetchone()[0]
sector_count = conn.execute("SELECT COUNT(*) FROM sector_snapshots").fetchone()[0]
latest = conn.execute(
"SELECT timestamp, source FROM market_snapshots ORDER BY id DESC LIMIT 1"
).fetchone()
stats = query_db_stats(conn)
conn.close()
print(f"\n{'='*40}")
print(f" MoFin 数据库概览")
print(f"{'='*40}")
print(f" 采集次数: {snap_count}")
print(f" 板块快照: {sector_count}")
if latest:
print(f" 最新采集: {latest['timestamp']} ({latest['source']})")
print(f" 采集次数: {stats['snapshots']}")
print(f" 板块快照: {stats['sector_rows']}")
print(f" 个股数量: {stats['stocks']}")
print(f" 日K线数: {stats['daily_klines']}")
print(f" 价格事件: {stats['price_events']}")
ls = stats.get('latest_snapshot')
if ls:
print(f" 最新采集: {ls['timestamp']} ({ls['source']})")
else:
print(f" 最新采集: 暂无")
# ── 智能路由 ──────────────────────────────────────
def route(query: str):
q = query.strip()
# 板块趋势
if "最近" in q and "" in q and ("涨跌" in q or "趋势" in q or "采集" in q):
# 提取板块名
import re
names = re.findall(r'["「]([^"」]+)["」]', q)
if not names:
# 尝试无引号匹配
for word in ["半导体", "银行", "医药", "新能源", "白酒", "军工", "芯片", "房地产", "汽车"]:
if word in q:
names = [word]
break
names = [word]; break
if names:
limit = 5
m = re.search(r'(\d+)\s*次', q)
if m:
limit = int(m.group(1))
query_sector_trend(names[0], limit)
if m: limit = int(m.group(1))
_print_sector_trend(names[0], limit)
return
# 资金净流入排行
if "净流入" in q and ("最多" in q or "排行" in q or "top" in q.lower()):
limit = 5
import re
m = re.search(r'(\d+)', q)
if m:
limit = int(m.group(1))
query_top_inflow(limit)
if m: limit = int(m.group(1))
_print_top_inflow(limit)
return
# 连续净流入
if "连续" in q and "净流入" in q:
days = 3
import re
m = re.search(r'(\d+)\s*天', q)
if m:
days = int(m.group(1))
query_consecutive_inflow(days)
if m: days = int(m.group(1))
_print_consecutive_inflow(days)
return
# 市场情绪
if "情绪" in q or "mood" in q.lower():
limit = 10
import re
m = re.search(r'(\d+)\s*次', q)
if m:
limit = int(m.group(1))
query_market_mood(limit)
if m: limit = int(m.group(1))
_print_market_mood(limit)
return
# 统计概览
if "概览" in q or "统计" in q or "stats" in q.lower():
query_stats()
_print_stats()
return
# 直接 SQL(以 SELECT 开头)
if q.upper().strip().startswith("SELECT"):
conn = get_conn()
try:
rows = conn.execute(q).fetchall()
if rows:
cols = rows[0].keys()
cols = [d[0] for d in conn.execute(q + " LIMIT 0").description]
print("\t".join(cols))
for r in rows:
print("\t".join(str(r[c]) for c in cols))
print("\t".join(str(c) for c in r))
else:
print("(empty)")
except Exception as e:
@@ -229,10 +151,7 @@ def route(query: str):
finally:
conn.close()
return
# 未匹配
print(f"未识别的查询: {q}")
print()
print(f"未识别的查询: {q}\n")
print("支持的查询模式:")
print(" 「半导体」最近5次采集的涨跌幅")
print(" 今天资金净流入最多的5个板块")
@@ -247,5 +166,4 @@ if __name__ == "__main__":
print("用法: python3 mofin_query.py \"查询语句\"")
print("示例: python3 mofin_query.py \"半导体最近5次采集的涨跌幅\"")
sys.exit(1)
route(sys.argv[1])
+29
View File
@@ -28,6 +28,32 @@ KLINE_URL = "http://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={market}{co
QUOTE_URL = "http://qt.gtimg.cn/q={market}{code}"
def _write_klines_to_db(code: str, daily: list, weekly: list, monthly: list, fundamentals: dict = None):
"""K线数据双写 SQLite(失败不影响缓存写入)"""
try:
from mofin_db import get_conn, init_all_tables, write_klines
conn = get_conn()
init_all_tables(conn)
# 从 stock_profiles.json 获取名称
name = code
try:
import json
profiles_path = os.path.join(DATA_DIR, "stock_profiles.json")
if os.path.exists(profiles_path):
with open(profiles_path, encoding="utf-8") as f:
profiles = json.load(f)
for p in profiles.get("profiles", []):
if p.get("code") == code:
name = p.get("name", code)
break
except Exception:
pass
write_klines(conn, code, name, daily, weekly, monthly, fundamentals)
conn.close()
except Exception:
pass # SQLite 写入失败不影响主流程
def _market_prefix(code: str) -> str:
"""根据代码确定市场前缀"""
raw = str(code).split("_")[0]
@@ -572,6 +598,9 @@ def _save_local_history(code: str, daily: list, weekly: list, monthly: list):
cache_data[code] = stock
_MTF_CACHE_DATA = cache_data # 更新模块级缓存
# ── SQLite 双写 ──
_write_klines_to_db(code, daily, weekly, monthly, stock.get("fundamentals"))
def batch_update_all(codes: list):
"""批量更新多只股票的多周期数据"""
+11 -1
View File
@@ -191,7 +191,7 @@ def save_events(events):
def record_event(code, name, event_type, price, trigger_value, event_label=""):
"""记录一次价格触发事件到 price_events.json"""
"""记录一次价格触发事件到 price_events.json + SQLite"""
events = load_events()
now = datetime.now().isoformat()
events["events"].append({
@@ -208,6 +208,16 @@ def record_event(code, name, event_type, price, trigger_value, event_label=""):
events["events"] = events["events"][-10000:]
save_events(events)
# ── SQLite 双写 ──
try:
from mofin_db import get_conn, init_all_tables, write_price_event
conn = get_conn()
init_all_tables(conn)
write_price_event(conn, code, name, event_type, price, trigger_value, event_label)
conn.close()
except Exception:
pass # SQLite 写入失败不影响主流程
def get_trigger_zones(trigger):
"""返回该trigger所有可监控的区间列表,跳过已执行的batch"""