From a293119a3182f0fe3083729dca7cdc5db47bf259 Mon Sep 17 00:00:00 2001 From: hmo Date: Sat, 20 Jun 2026 12:50:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=98=B6=E6=AE=B51=20=E2=80=94=20marke?= =?UTF-8?q?t=5Fwatch=20=E5=8F=8C=E5=86=99=20SQLite=20+=20=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - market_watch.py: 新增 init_db() 建表 + write_snapshot() 双写 SQLite - market_snapshots: 每次采集的元信息(时间、来源、涨跌比、情绪) - sector_snapshots: 每个板块的涨跌幅、资金流向、领涨股等 - JSON 写入保留不变,SQLite 写入失败不影响 JSON 管道 - mofin_query.py: 通用查询工具 - 板块趋势查询:「半导体最近5次采集的涨跌幅」 - 资金流向排行:「净流入最多的5个板块」 - 连续净流入检测:「最近3天连续净流入的板块」 - 市场情绪趋势 + 数据库概览 - 支持直接 SQL 查询 --- market_watch.py | 99 +++++++++++++++++++ mofin_query.py | 251 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 350 insertions(+) create mode 100644 mofin_query.py diff --git a/market_watch.py b/market_watch.py index 20d64be..c73053e 100644 --- a/market_watch.py +++ b/market_watch.py @@ -13,10 +13,102 @@ """ import json +import sqlite3 from datetime import datetime from pathlib import Path DATA_DIR = Path(__file__).parent / "data" +DB_PATH = DATA_DIR / "mofin.db" + +# ── 数据库初始化 ────────────────────────────────────── + +def init_db(): + """创建 mofin.db 及所有表(幂等,已存在则跳过)""" + DATA_DIR.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(DB_PATH)) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + conn.executescript(""" + CREATE TABLE IF NOT EXISTS market_snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + source TEXT NOT NULL DEFAULT 'ths', + up_ratio REAL, + mood TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_snapshots_time ON market_snapshots(timestamp); + + CREATE TABLE IF NOT EXISTS sector_snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + snapshot_id INTEGER NOT NULL REFERENCES market_snapshots(id), + name TEXT NOT NULL, + change_pct REAL, + up_count INTEGER, + down_count INTEGER, + net_inflow REAL, + lead_stock TEXT, + lead_stock_change REAL, + volume REAL, + turnover REAL + ); + CREATE INDEX IF NOT EXISTS idx_sector_name ON sector_snapshots(name); + CREATE INDEX IF NOT EXISTS idx_sector_snapshot ON sector_snapshots(snapshot_id); + CREATE INDEX IF NOT EXISTS idx_sector_name_time ON sector_snapshots(name, snapshot_id); + """) + conn.commit() + return conn + + +def write_snapshot(conn, market_data: dict): + """将一次采集结果双写 SQLite(JSON 写入由 main 负责)""" + try: + # 1. INSERT market_snapshots + cur = conn.execute( + """INSERT INTO market_snapshots (timestamp, source, up_ratio, mood) + VALUES (?, ?, ?, ?)""", + ( + market_data["timestamp"], + market_data.get("source", "unknown"), + market_data.get("up_ratio", 0), + market_data.get("mood", "unknown"), + ), + ) + snapshot_id = cur.lastrowid + + # 2. INSERT sector_snapshots(逐板块) + sectors = market_data.get("sectors", []) + rows = [] + for s in sectors: + rows.append(( + snapshot_id, + s.get("name", ""), + s.get("change", 0), + s.get("up_count"), + s.get("down_count"), + s.get("net_inflow"), + s.get("lead_stock"), + s.get("lead_stock_change"), + s.get("volume"), + s.get("turnover"), + )) + if rows: + conn.executemany( + """INSERT INTO sector_snapshots + (snapshot_id, name, change_pct, up_count, down_count, + net_inflow, lead_stock, lead_stock_change, volume, turnover) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + rows, + ) + conn.commit() + return snapshot_id, len(rows) + except Exception as e: + print(f"[DB] SQLite 写入失败(JSON 不受影响): {e}", flush=True) + try: + conn.rollback() + except Exception: + pass + return None, 0 # ── 後端A:東方財富 push2 API(首選,有板塊代碼+實時指數) ── @@ -159,6 +251,13 @@ def main(): with open(DATA_DIR / "market.json", "w", encoding="utf-8") as f: json.dump(market_data, f, ensure_ascii=False, indent=2) + # ── SQLite 双写 ── + conn = init_db() + sid, count = write_snapshot(conn, market_data) + if sid: + print(f"[DB] snapshot_id={sid}, sectors={count}", flush=True) + conn.close() + # 靜默:只寫文件,不輸出到stdout,避免cron推送 diff --git a/mofin_query.py b/mofin_query.py new file mode 100644 index 0000000..84462e8 --- /dev/null +++ b/mofin_query.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python3 +"""mofin_query.py — MoFin 数据库通用查询工具 + +用法: + python3 mofin_query.py "半导体最近5次采集的涨跌幅" + python3 mofin_query.py "今天资金净流入最多的5个板块" + python3 mofin_query.py "最近3天连续净流入的板块" + python3 mofin_query.py "市场情绪趋势(最近10次)" +""" + +import sqlite3 +import sys +from pathlib import Path + +DB_PATH = Path(__file__).parent / "data" / "mofin.db" + + +def get_conn(): + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + return conn + + +# ── 预定义查询 ────────────────────────────────────── + +def query_sector_trend(name: str, limit: int = 5): + """查询某板块最近N次采集的涨跌幅趋势""" + conn = get_conn() + rows = conn.execute(""" + SELECT s.timestamp, ss.change_pct, ss.net_inflow, + ss.up_count, ss.down_count, ss.lead_stock, ss.lead_stock_change + FROM sector_snapshots ss + JOIN market_snapshots s ON ss.snapshot_id = s.id + WHERE ss.name = ? + ORDER BY s.timestamp DESC + LIMIT ? + """, (name, limit)).fetchall() + conn.close() + + if not rows: + print(f"未找到板块「{name}」的数据") + return + + print(f"\n{'='*60}") + print(f" {name} 板块 — 最近 {len(rows)} 次采集") + print(f"{'='*60}") + print(f"{'时间':<20} {'涨跌幅%':>8} {'净流入(亿)':>10} {'上涨':>6} {'下跌':>6} {'领涨股':>10}") + print(f"{'-'*20} {'-'*8} {'-'*10} {'-'*6} {'-'*6} {'-'*10}") + for r in reversed(rows): + print(f"{r['timestamp']:<20} {r['change_pct']:>8.2f} {r['net_inflow']:>10.2f} " + f"{r['up_count'] or '-':>6} {r['down_count'] or '-':>6} {r['lead_stock'] or '-':>10}") + + +def query_top_inflow(limit: int = 5): + """最新一次采集中资金净流入最多的板块""" + conn = get_conn() + rows = conn.execute(""" + SELECT ss.name, ss.change_pct, ss.net_inflow, ss.lead_stock, s.timestamp + FROM sector_snapshots ss + JOIN market_snapshots s ON ss.snapshot_id = s.id + WHERE s.id = (SELECT MAX(id) FROM market_snapshots) + AND ss.net_inflow IS NOT NULL + ORDER BY ss.net_inflow DESC + LIMIT ? + """, (limit,)).fetchall() + conn.close() + + if not rows: + print("暂无数据") + return + + print(f"\n{'='*60}") + print(f" 资金净流入 Top {len(rows)}({rows[0]['timestamp']})") + print(f"{'='*60}") + print(f"{'板块':<12} {'涨跌幅%':>8} {'净流入(亿)':>10} {'领涨股':>10}") + print(f"{'-'*12} {'-'*8} {'-'*10} {'-'*10}") + for r in rows: + print(f"{r['name']:<12} {r['change_pct']:>8.2f} {r['net_inflow']:>10.2f} {r['lead_stock'] or '-':>10}") + + +def query_consecutive_inflow(days: int = 3): + """最近N次采集中连续净流入的板块""" + conn = get_conn() + rows = conn.execute(""" + SELECT name, COUNT(*) as times, ROUND(AVG(net_inflow), 2) as avg_inflow, + ROUND(AVG(change_pct), 2) as avg_change + FROM sector_snapshots ss + JOIN market_snapshots s ON ss.snapshot_id = s.id + WHERE s.id > (SELECT MAX(id) - ? FROM market_snapshots) + AND net_inflow > 0 + GROUP BY name + HAVING COUNT(*) >= ? + ORDER BY avg_inflow DESC + """, (days, days)).fetchall() + conn.close() + + if not rows: + print(f"没有板块连续 {days} 次净流入") + return + + print(f"\n{'='*60}") + print(f" 连续 {days} 次净流入的板块") + print(f"{'='*60}") + print(f"{'板块':<12} {'次数':>4} {'均净流入(亿)':>12} {'均涨跌幅%':>10}") + print(f"{'-'*12} {'-'*4} {'-'*12} {'-'*10}") + for r in rows: + print(f"{r['name']:<12} {r['times']:>4} {r['avg_inflow']:>12.2f} {r['avg_change']:>10.2f}") + + +def query_market_mood(limit: int = 10): + """市场情绪趋势""" + conn = get_conn() + rows = conn.execute(""" + SELECT timestamp, source, up_ratio, mood + FROM market_snapshots + ORDER BY timestamp DESC + LIMIT ? + """, (limit,)).fetchall() + conn.close() + + if not rows: + print("暂无数据") + return + + print(f"\n{'='*60}") + print(f" 市场情绪趋势 — 最近 {len(rows)} 次") + print(f"{'='*60}") + print(f"{'时间':<20} {'来源':>10} {'上涨占比%':>10} {'情绪':>10}") + print(f"{'-'*20} {'-'*10} {'-'*10} {'-'*10}") + for r in reversed(rows): + mood_emoji = {"bullish": "🟢", "neutral": "🟡", "bearish": "🔴"}.get(r['mood'], "⚪") + print(f"{r['timestamp']:<20} {r['source']:>10} {r['up_ratio']:>10.1f} {mood_emoji} {r['mood']:>8}") + + +def query_stats(): + """数据库概览统计""" + conn = get_conn() + snap_count = conn.execute("SELECT COUNT(*) FROM market_snapshots").fetchone()[0] + sector_count = conn.execute("SELECT COUNT(*) FROM sector_snapshots").fetchone()[0] + latest = conn.execute( + "SELECT timestamp, source FROM market_snapshots ORDER BY id DESC LIMIT 1" + ).fetchone() + conn.close() + + print(f"\n{'='*40}") + print(f" MoFin 数据库概览") + print(f"{'='*40}") + print(f" 采集次数: {snap_count}") + print(f" 板块快照: {sector_count}") + if latest: + print(f" 最新采集: {latest['timestamp']} ({latest['source']})") + else: + print(f" 最新采集: 暂无") + + +# ── 智能路由 ────────────────────────────────────── + +def route(query: str): + q = query.strip() + + # 板块趋势 + if "最近" in q and "次" in q and ("涨跌" in q or "趋势" in q or "采集" in q): + # 提取板块名 + import re + names = re.findall(r'["「]([^"」]+)["」]', q) + if not names: + # 尝试无引号匹配 + for word in ["半导体", "银行", "医药", "新能源", "白酒", "军工", "芯片", "房地产", "汽车"]: + if word in q: + names = [word] + break + if names: + limit = 5 + m = re.search(r'(\d+)\s*次', q) + if m: + limit = int(m.group(1)) + query_sector_trend(names[0], limit) + return + + # 资金净流入排行 + if "净流入" in q and ("最多" in q or "排行" in q or "top" in q.lower()): + limit = 5 + import re + m = re.search(r'(\d+)', q) + if m: + limit = int(m.group(1)) + query_top_inflow(limit) + return + + # 连续净流入 + if "连续" in q and "净流入" in q: + days = 3 + import re + m = re.search(r'(\d+)\s*天', q) + if m: + days = int(m.group(1)) + query_consecutive_inflow(days) + return + + # 市场情绪 + if "情绪" in q or "mood" in q.lower(): + limit = 10 + import re + m = re.search(r'(\d+)\s*次', q) + if m: + limit = int(m.group(1)) + query_market_mood(limit) + return + + # 统计概览 + if "概览" in q or "统计" in q or "stats" in q.lower(): + query_stats() + return + + # 直接 SQL(以 SELECT 开头) + if q.upper().strip().startswith("SELECT"): + conn = get_conn() + try: + rows = conn.execute(q).fetchall() + if rows: + cols = rows[0].keys() + print("\t".join(cols)) + for r in rows: + print("\t".join(str(r[c]) for c in cols)) + else: + print("(empty)") + except Exception as e: + print(f"SQL 错误: {e}") + finally: + conn.close() + return + + # 未匹配 + print(f"未识别的查询: {q}") + print() + print("支持的查询模式:") + print(" 「半导体」最近5次采集的涨跌幅") + print(" 今天资金净流入最多的5个板块") + print(" 最近3天连续净流入的板块") + print(" 市场情绪趋势(最近10次)") + print(" 数据库概览") + print(" SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 5") + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("用法: python3 mofin_query.py \"查询语句\"") + print("示例: python3 mofin_query.py \"半导体最近5次采集的涨跌幅\"") + sys.exit(1) + + route(sys.argv[1])