Files
MoFin/mofin_query.py
T
hmo a293119a31 feat: 阶段1 — market_watch 双写 SQLite + 查询工具
- market_watch.py: 新增 init_db() 建表 + write_snapshot() 双写 SQLite
  - market_snapshots: 每次采集的元信息(时间、来源、涨跌比、情绪)
  - sector_snapshots: 每个板块的涨跌幅、资金流向、领涨股等
  - JSON 写入保留不变,SQLite 写入失败不影响 JSON 管道
- mofin_query.py: 通用查询工具
  - 板块趋势查询:「半导体最近5次采集的涨跌幅」
  - 资金流向排行:「净流入最多的5个板块」
  - 连续净流入检测:「最近3天连续净流入的板块」
  - 市场情绪趋势 + 数据库概览
  - 支持直接 SQL 查询
2026-06-20 12:51:02 +08:00

252 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""mofin_query.py — MoFin 数据库通用查询工具
用法:
python3 mofin_query.py "半导体最近5次采集的涨跌幅"
python3 mofin_query.py "今天资金净流入最多的5个板块"
python3 mofin_query.py "最近3天连续净流入的板块"
python3 mofin_query.py "市场情绪趋势(最近10次)"
"""
import sqlite3
import sys
from pathlib import Path
DB_PATH = Path(__file__).parent / "data" / "mofin.db"
def get_conn():
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
# ── 预定义查询 ──────────────────────────────────────
def query_sector_trend(name: str, limit: int = 5):
"""查询某板块最近N次采集的涨跌幅趋势"""
conn = get_conn()
rows = conn.execute("""
SELECT s.timestamp, ss.change_pct, ss.net_inflow,
ss.up_count, ss.down_count, ss.lead_stock, ss.lead_stock_change
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE ss.name = ?
ORDER BY s.timestamp DESC
LIMIT ?
""", (name, limit)).fetchall()
conn.close()
if not rows:
print(f"未找到板块「{name}」的数据")
return
print(f"\n{'='*60}")
print(f" {name} 板块 — 最近 {len(rows)} 次采集")
print(f"{'='*60}")
print(f"{'时间':<20} {'涨跌幅%':>8} {'净流入(亿)':>10} {'上涨':>6} {'下跌':>6} {'领涨股':>10}")
print(f"{'-'*20} {'-'*8} {'-'*10} {'-'*6} {'-'*6} {'-'*10}")
for r in reversed(rows):
print(f"{r['timestamp']:<20} {r['change_pct']:>8.2f} {r['net_inflow']:>10.2f} "
f"{r['up_count'] or '-':>6} {r['down_count'] or '-':>6} {r['lead_stock'] or '-':>10}")
def query_top_inflow(limit: int = 5):
"""最新一次采集中资金净流入最多的板块"""
conn = get_conn()
rows = conn.execute("""
SELECT ss.name, ss.change_pct, ss.net_inflow, ss.lead_stock, s.timestamp
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE s.id = (SELECT MAX(id) FROM market_snapshots)
AND ss.net_inflow IS NOT NULL
ORDER BY ss.net_inflow DESC
LIMIT ?
""", (limit,)).fetchall()
conn.close()
if not rows:
print("暂无数据")
return
print(f"\n{'='*60}")
print(f" 资金净流入 Top {len(rows)}{rows[0]['timestamp']}")
print(f"{'='*60}")
print(f"{'板块':<12} {'涨跌幅%':>8} {'净流入(亿)':>10} {'领涨股':>10}")
print(f"{'-'*12} {'-'*8} {'-'*10} {'-'*10}")
for r in rows:
print(f"{r['name']:<12} {r['change_pct']:>8.2f} {r['net_inflow']:>10.2f} {r['lead_stock'] or '-':>10}")
def query_consecutive_inflow(days: int = 3):
"""最近N次采集中连续净流入的板块"""
conn = get_conn()
rows = conn.execute("""
SELECT name, COUNT(*) as times, ROUND(AVG(net_inflow), 2) as avg_inflow,
ROUND(AVG(change_pct), 2) as avg_change
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE s.id > (SELECT MAX(id) - ? FROM market_snapshots)
AND net_inflow > 0
GROUP BY name
HAVING COUNT(*) >= ?
ORDER BY avg_inflow DESC
""", (days, days)).fetchall()
conn.close()
if not rows:
print(f"没有板块连续 {days} 次净流入")
return
print(f"\n{'='*60}")
print(f" 连续 {days} 次净流入的板块")
print(f"{'='*60}")
print(f"{'板块':<12} {'次数':>4} {'均净流入(亿)':>12} {'均涨跌幅%':>10}")
print(f"{'-'*12} {'-'*4} {'-'*12} {'-'*10}")
for r in rows:
print(f"{r['name']:<12} {r['times']:>4} {r['avg_inflow']:>12.2f} {r['avg_change']:>10.2f}")
def query_market_mood(limit: int = 10):
"""市场情绪趋势"""
conn = get_conn()
rows = conn.execute("""
SELECT timestamp, source, up_ratio, mood
FROM market_snapshots
ORDER BY timestamp DESC
LIMIT ?
""", (limit,)).fetchall()
conn.close()
if not rows:
print("暂无数据")
return
print(f"\n{'='*60}")
print(f" 市场情绪趋势 — 最近 {len(rows)}")
print(f"{'='*60}")
print(f"{'时间':<20} {'来源':>10} {'上涨占比%':>10} {'情绪':>10}")
print(f"{'-'*20} {'-'*10} {'-'*10} {'-'*10}")
for r in reversed(rows):
mood_emoji = {"bullish": "🟢", "neutral": "🟡", "bearish": "🔴"}.get(r['mood'], "")
print(f"{r['timestamp']:<20} {r['source']:>10} {r['up_ratio']:>10.1f} {mood_emoji} {r['mood']:>8}")
def query_stats():
"""数据库概览统计"""
conn = get_conn()
snap_count = conn.execute("SELECT COUNT(*) FROM market_snapshots").fetchone()[0]
sector_count = conn.execute("SELECT COUNT(*) FROM sector_snapshots").fetchone()[0]
latest = conn.execute(
"SELECT timestamp, source FROM market_snapshots ORDER BY id DESC LIMIT 1"
).fetchone()
conn.close()
print(f"\n{'='*40}")
print(f" MoFin 数据库概览")
print(f"{'='*40}")
print(f" 采集次数: {snap_count}")
print(f" 板块快照: {sector_count}")
if latest:
print(f" 最新采集: {latest['timestamp']} ({latest['source']})")
else:
print(f" 最新采集: 暂无")
# ── 智能路由 ──────────────────────────────────────
def route(query: str):
q = query.strip()
# 板块趋势
if "最近" in q and "" in q and ("涨跌" in q or "趋势" in q or "采集" in q):
# 提取板块名
import re
names = re.findall(r'["「]([^"」]+)["」]', q)
if not names:
# 尝试无引号匹配
for word in ["半导体", "银行", "医药", "新能源", "白酒", "军工", "芯片", "房地产", "汽车"]:
if word in q:
names = [word]
break
if names:
limit = 5
m = re.search(r'(\d+)\s*次', q)
if m:
limit = int(m.group(1))
query_sector_trend(names[0], limit)
return
# 资金净流入排行
if "净流入" in q and ("最多" in q or "排行" in q or "top" in q.lower()):
limit = 5
import re
m = re.search(r'(\d+)', q)
if m:
limit = int(m.group(1))
query_top_inflow(limit)
return
# 连续净流入
if "连续" in q and "净流入" in q:
days = 3
import re
m = re.search(r'(\d+)\s*天', q)
if m:
days = int(m.group(1))
query_consecutive_inflow(days)
return
# 市场情绪
if "情绪" in q or "mood" in q.lower():
limit = 10
import re
m = re.search(r'(\d+)\s*次', q)
if m:
limit = int(m.group(1))
query_market_mood(limit)
return
# 统计概览
if "概览" in q or "统计" in q or "stats" in q.lower():
query_stats()
return
# 直接 SQL(以 SELECT 开头)
if q.upper().strip().startswith("SELECT"):
conn = get_conn()
try:
rows = conn.execute(q).fetchall()
if rows:
cols = rows[0].keys()
print("\t".join(cols))
for r in rows:
print("\t".join(str(r[c]) for c in cols))
else:
print("(empty)")
except Exception as e:
print(f"SQL 错误: {e}")
finally:
conn.close()
return
# 未匹配
print(f"未识别的查询: {q}")
print()
print("支持的查询模式:")
print(" 「半导体」最近5次采集的涨跌幅")
print(" 今天资金净流入最多的5个板块")
print(" 最近3天连续净流入的板块")
print(" 市场情绪趋势(最近10次)")
print(" 数据库概览")
print(" SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 5")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python3 mofin_query.py \"查询语句\"")
print("示例: python3 mofin_query.py \"半导体最近5次采集的涨跌幅\"")
sys.exit(1)
route(sys.argv[1])