Files
MoFin/mofin_query.py
hmo 0924cf3124 refactor: 数据层重构 — 统一 SQLite 访问层 + 多脚本双写
新建 mofin_db.py 共享数据库模块:
- get_conn() 统一连接管理 (WAL + Row factory + 外键)
- init_all_tables() 幂等建表 (12张表: market/sector/stock/kline/fundamentals/sectors/holdings/strategies/watchlist/candidates/score_history/events/evaluations)
- write_market_snapshot() 市场快照双写
- write_klines() K线数据双写 (stocks + daily/weekly/monthly + fundamentals)
- write_price_event() 价格事件双写
- migrate_stock_sectors() 一次性迁移 stock_sector_map.json
- query_*() 通用查询函数 (sector_trend/top_inflow/consecutive_inflow/market_mood/db_stats)

重构现有脚本:
- market_watch.py: 删除内联 DB 代码,改用 mofin_db
- multi_timeframe.py: _save_local_history() 加 SQLite 双写
- price_monitor.py: record_event() 加 SQLite 双写
- mofin_query.py: 改用 mofin_db 查询函数

新增:
- migrate_sectors.py: 一次性迁移脚本

清理:
- get_realtime_prices.py: 死代码 (只读 portfolio.json,不调API)
2026-06-20 16:26:17 +08:00

170 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""mofin_query.py — MoFin 数据库通用查询工具
用法:
python3 mofin_query.py "半导体最近5次采集的涨跌幅"
python3 mofin_query.py "今天资金净流入最多的5个板块"
python3 mofin_query.py "最近3天连续净流入的板块"
python3 mofin_query.py "市场情绪趋势(最近10次)"
python3 mofin_query.py "数据库概览"
"""
import sys
import re
from mofin_db import (get_conn, query_sector_trend, query_top_inflow,
query_consecutive_inflow, query_market_mood, query_db_stats)
def _print_sector_trend(name: str, limit: int = 5):
conn = get_conn()
rows = query_sector_trend(conn, name, limit)
conn.close()
if not rows:
print(f"未找到板块「{name}」的数据")
return
print(f"\n{'='*60}")
print(f" {name} 板块 — 最近 {len(rows)} 次采集")
print(f"{'='*60}")
print(f"{'时间':<20} {'涨跌幅%':>8} {'净流入(亿)':>10} {'上涨':>6} {'下跌':>6} {'领涨股':>10}")
print(f"{'-'*20} {'-'*8} {'-'*10} {'-'*6} {'-'*6} {'-'*10}")
for r in reversed(rows):
print(f"{r['timestamp']:<20} {r['change_pct']:>8.2f} {r['net_inflow']:>10.2f} "
f"{r['up_count'] or '-':>6} {r['down_count'] or '-':>6} {r['lead_stock'] or '-':>10}")
def _print_top_inflow(limit: int = 5):
conn = get_conn()
rows = query_top_inflow(conn, limit)
conn.close()
if not rows:
print("暂无数据")
return
print(f"\n{'='*60}")
print(f" 资金净流入 Top {len(rows)}{rows[0]['timestamp']}")
print(f"{'='*60}")
print(f"{'板块':<12} {'涨跌幅%':>8} {'净流入(亿)':>10} {'领涨股':>10}")
print(f"{'-'*12} {'-'*8} {'-'*10} {'-'*10}")
for r in rows:
print(f"{r['name']:<12} {r['change_pct']:>8.2f} {r['net_inflow']:>10.2f} {r['lead_stock'] or '-':>10}")
def _print_consecutive_inflow(days: int = 3):
conn = get_conn()
rows = query_consecutive_inflow(conn, days)
conn.close()
if not rows:
print(f"没有板块连续 {days} 次净流入")
return
print(f"\n{'='*60}")
print(f" 连续 {days} 次净流入的板块")
print(f"{'='*60}")
print(f"{'板块':<12} {'次数':>4} {'均净流入(亿)':>12} {'均涨跌幅%':>10}")
print(f"{'-'*12} {'-'*4} {'-'*12} {'-'*10}")
for r in rows:
print(f"{r['name']:<12} {r['times']:>4} {r['avg_inflow']:>12.2f} {r['avg_change']:>10.2f}")
def _print_market_mood(limit: int = 10):
conn = get_conn()
rows = query_market_mood(conn, limit)
conn.close()
if not rows:
print("暂无数据")
return
print(f"\n{'='*60}")
print(f" 市场情绪趋势 — 最近 {len(rows)}")
print(f"{'='*60}")
print(f"{'时间':<20} {'来源':>10} {'上涨占比%':>10} {'情绪':>10}")
print(f"{'-'*20} {'-'*10} {'-'*10} {'-'*10}")
for r in reversed(rows):
mood_emoji = {"bullish": "🟢", "neutral": "🟡", "bearish": "🔴"}.get(r['mood'], "")
print(f"{r['timestamp']:<20} {r['source']:>10} {r['up_ratio']:>10.1f} {mood_emoji} {r['mood']:>8}")
def _print_stats():
conn = get_conn()
stats = query_db_stats(conn)
conn.close()
print(f"\n{'='*40}")
print(f" MoFin 数据库概览")
print(f"{'='*40}")
print(f" 采集次数: {stats['snapshots']}")
print(f" 板块快照: {stats['sector_rows']}")
print(f" 个股数量: {stats['stocks']}")
print(f" 日K线数: {stats['daily_klines']}")
print(f" 价格事件: {stats['price_events']}")
ls = stats.get('latest_snapshot')
if ls:
print(f" 最新采集: {ls['timestamp']} ({ls['source']})")
else:
print(f" 最新采集: 暂无")
def route(query: str):
q = query.strip()
if "最近" in q and "" in q and ("涨跌" in q or "趋势" in q or "采集" in q):
names = re.findall(r'["「]([^"」]+)["」]', q)
if not names:
for word in ["半导体", "银行", "医药", "新能源", "白酒", "军工", "芯片", "房地产", "汽车"]:
if word in q:
names = [word]; break
if names:
limit = 5
m = re.search(r'(\d+)\s*次', q)
if m: limit = int(m.group(1))
_print_sector_trend(names[0], limit)
return
if "净流入" in q and ("最多" in q or "排行" in q or "top" in q.lower()):
limit = 5
m = re.search(r'(\d+)', q)
if m: limit = int(m.group(1))
_print_top_inflow(limit)
return
if "连续" in q and "净流入" in q:
days = 3
m = re.search(r'(\d+)\s*天', q)
if m: days = int(m.group(1))
_print_consecutive_inflow(days)
return
if "情绪" in q or "mood" in q.lower():
limit = 10
m = re.search(r'(\d+)\s*次', q)
if m: limit = int(m.group(1))
_print_market_mood(limit)
return
if "概览" in q or "统计" in q or "stats" in q.lower():
_print_stats()
return
if q.upper().strip().startswith("SELECT"):
conn = get_conn()
try:
rows = conn.execute(q).fetchall()
if rows:
cols = [d[0] for d in conn.execute(q + " LIMIT 0").description]
print("\t".join(cols))
for r in rows:
print("\t".join(str(c) for c in r))
else:
print("(empty)")
except Exception as e:
print(f"SQL 错误: {e}")
finally:
conn.close()
return
print(f"未识别的查询: {q}\n")
print("支持的查询模式:")
print(" 「半导体」最近5次采集的涨跌幅")
print(" 今天资金净流入最多的5个板块")
print(" 最近3天连续净流入的板块")
print(" 市场情绪趋势(最近10次)")
print(" 数据库概览")
print(" SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 5")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python3 mofin_query.py \"查询语句\"")
print("示例: python3 mofin_query.py \"半导体最近5次采集的涨跌幅\"")
sys.exit(1)
route(sys.argv[1])