#!/usr/bin/env python3 """mofin_query.py — MoFin 数据库通用查询工具 用法: python3 mofin_query.py "半导体最近5次采集的涨跌幅" python3 mofin_query.py "今天资金净流入最多的5个板块" python3 mofin_query.py "最近3天连续净流入的板块" python3 mofin_query.py "市场情绪趋势(最近10次)" python3 mofin_query.py "数据库概览" """ import sys import re from mofin_db import (get_conn, query_sector_trend, query_top_inflow, query_consecutive_inflow, query_market_mood, query_db_stats) def _print_sector_trend(name: str, limit: int = 5): conn = get_conn() rows = query_sector_trend(conn, name, limit) conn.close() if not rows: print(f"未找到板块「{name}」的数据") return print(f"\n{'='*60}") print(f" {name} 板块 — 最近 {len(rows)} 次采集") print(f"{'='*60}") print(f"{'时间':<20} {'涨跌幅%':>8} {'净流入(亿)':>10} {'上涨':>6} {'下跌':>6} {'领涨股':>10}") print(f"{'-'*20} {'-'*8} {'-'*10} {'-'*6} {'-'*6} {'-'*10}") for r in reversed(rows): print(f"{r['timestamp']:<20} {r['change_pct']:>8.2f} {r['net_inflow']:>10.2f} " f"{r['up_count'] or '-':>6} {r['down_count'] or '-':>6} {r['lead_stock'] or '-':>10}") def _print_top_inflow(limit: int = 5): conn = get_conn() rows = query_top_inflow(conn, limit) conn.close() if not rows: print("暂无数据") return print(f"\n{'='*60}") print(f" 资金净流入 Top {len(rows)}({rows[0]['timestamp']})") print(f"{'='*60}") print(f"{'板块':<12} {'涨跌幅%':>8} {'净流入(亿)':>10} {'领涨股':>10}") print(f"{'-'*12} {'-'*8} {'-'*10} {'-'*10}") for r in rows: print(f"{r['name']:<12} {r['change_pct']:>8.2f} {r['net_inflow']:>10.2f} {r['lead_stock'] or '-':>10}") def _print_consecutive_inflow(days: int = 3): conn = get_conn() rows = query_consecutive_inflow(conn, days) conn.close() if not rows: print(f"没有板块连续 {days} 次净流入") return print(f"\n{'='*60}") print(f" 连续 {days} 次净流入的板块") print(f"{'='*60}") print(f"{'板块':<12} {'次数':>4} {'均净流入(亿)':>12} {'均涨跌幅%':>10}") print(f"{'-'*12} {'-'*4} {'-'*12} {'-'*10}") for r in rows: print(f"{r['name']:<12} {r['times']:>4} {r['avg_inflow']:>12.2f} {r['avg_change']:>10.2f}") def _print_market_mood(limit: int = 10): conn = get_conn() rows = query_market_mood(conn, limit) conn.close() if not rows: print("暂无数据") return print(f"\n{'='*60}") print(f" 市场情绪趋势 — 最近 {len(rows)} 次") print(f"{'='*60}") print(f"{'时间':<20} {'来源':>10} {'上涨占比%':>10} {'情绪':>10}") print(f"{'-'*20} {'-'*10} {'-'*10} {'-'*10}") for r in reversed(rows): mood_emoji = {"bullish": "🟢", "neutral": "🟡", "bearish": "🔴"}.get(r['mood'], "⚪") print(f"{r['timestamp']:<20} {r['source']:>10} {r['up_ratio']:>10.1f} {mood_emoji} {r['mood']:>8}") def _print_stats(): conn = get_conn() stats = query_db_stats(conn) conn.close() print(f"\n{'='*40}") print(f" MoFin 数据库概览") print(f"{'='*40}") print(f" 采集次数: {stats['snapshots']}") print(f" 板块快照: {stats['sector_rows']}") print(f" 个股数量: {stats['stocks']}") print(f" 日K线数: {stats['daily_klines']}") print(f" 价格事件: {stats['price_events']}") ls = stats.get('latest_snapshot') if ls: print(f" 最新采集: {ls['timestamp']} ({ls['source']})") else: print(f" 最新采集: 暂无") def route(query: str): q = query.strip() if "最近" in q and "次" in q and ("涨跌" in q or "趋势" in q or "采集" in q): names = re.findall(r'["「]([^"」]+)["」]', q) if not names: for word in ["半导体", "银行", "医药", "新能源", "白酒", "军工", "芯片", "房地产", "汽车"]: if word in q: names = [word]; break if names: limit = 5 m = re.search(r'(\d+)\s*次', q) if m: limit = int(m.group(1)) _print_sector_trend(names[0], limit) return if "净流入" in q and ("最多" in q or "排行" in q or "top" in q.lower()): limit = 5 m = re.search(r'(\d+)', q) if m: limit = int(m.group(1)) _print_top_inflow(limit) return if "连续" in q and "净流入" in q: days = 3 m = re.search(r'(\d+)\s*天', q) if m: days = int(m.group(1)) _print_consecutive_inflow(days) return if "情绪" in q or "mood" in q.lower(): limit = 10 m = re.search(r'(\d+)\s*次', q) if m: limit = int(m.group(1)) _print_market_mood(limit) return if "概览" in q or "统计" in q or "stats" in q.lower(): _print_stats() return if q.upper().strip().startswith("SELECT"): conn = get_conn() try: rows = conn.execute(q).fetchall() if rows: cols = [d[0] for d in conn.execute(q + " LIMIT 0").description] print("\t".join(cols)) for r in rows: print("\t".join(str(c) for c in r)) else: print("(empty)") except Exception as e: print(f"SQL 错误: {e}") finally: conn.close() return print(f"未识别的查询: {q}\n") print("支持的查询模式:") print(" 「半导体」最近5次采集的涨跌幅") print(" 今天资金净流入最多的5个板块") print(" 最近3天连续净流入的板块") print(" 市场情绪趋势(最近10次)") print(" 数据库概览") print(" SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 5") if __name__ == "__main__": if len(sys.argv) < 2: print("用法: python3 mofin_query.py \"查询语句\"") print("示例: python3 mofin_query.py \"半导体最近5次采集的涨跌幅\"") sys.exit(1) route(sys.argv[1])