Files
MoFin/market_insight.py
hmo 25f8c6ec67 refactor: 消费者切 SQLite 优先读取
切换策略: SQLite 优先 → 失败回退 JSON

price_events (100%覆盖):
- strategy_feedback.py: run() 优先 query_price_events()
- system_health_check.py: 优先 query_price_events() + query_price_events_by_date()

stock_sector_map (100%覆盖):
- strategy_lifecycle.py: load_stock_sector_map() 优先 stock_sectors 表

market.json (85%覆盖):
- strategy_lifecycle.py: load_market_context() 优先 query_latest_market()
- market_insight.py: generate() 优先 query_latest_market()

portfolio.json + watchlist.json (70%覆盖):
- strategy_lifecycle.py: regenerate_all() 优先 query_holdings() + query_watchlist()
- server.py: /api/portfolio, /api/watchlist, /api/overview, /api/market 优先 SQLite

所有改动保留 JSON 回退路径,SQLite 不可用时自动降级
2026-06-20 17:50:15 +08:00

201 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""market_insight.py — 基于 market.json 数据生成基础洞察 + 潜力挖掘
输出:更新 data/market.json 中的 insights / potential_stocks 字段
策略:
1. 行业热点 vs 持仓匹配 → 相关影响
2. 资金流向异常 → 关注信号
3. 市场情绪 → 每日研判
4. 潜力挖掘 → 强势行业中寻找持仓相关标的
"""
import json
import sys
from datetime import datetime
from pathlib import Path
DATA_DIR = Path(__file__).parent / "data"
# ── 持仓股 → 行业映射(从 stock_profiles 自动提取) ──
def load_holding_industry_map():
"""从 stock_profiles 和 portfolio 提取持仓→行业映射"""
try:
with open(DATA_DIR / "stock_profiles.json", "r", encoding="utf-8") as f:
profiles = json.load(f).get("profiles", [])
with open(DATA_DIR / "portfolio.json", "r", encoding="utf-8") as f:
portfolio = json.load(f)
except FileNotFoundError:
return {}
# 构建 code→name 映射(从 portfolio
code_to_name = {}
for item in portfolio.get("holdings", []):
code_to_name[item.get("code", "")] = item.get("name", "")
# 构建行业→持仓列表
industry_holdings = {}
for p in profiles:
code = p.get("code", "")
name = p.get("name", "")
sector = p.get("sector", "")
if not sector or sector == "待补全":
continue
# 提取一级行业(取斜杠前第一个)
primary = sector.split("/")[0].split("")[0].strip()
if primary:
industry_holdings.setdefault(primary, []).append({
"code": code,
"name": name,
"sector": sector,
})
return industry_holdings
def generate():
# 优先从 SQLite 读取市场数据
try:
from mofin_db import get_conn, query_latest_market
conn = get_conn()
market = query_latest_market(conn)
conn.close()
if market and market.get("sectors"):
sectors = market["sectors"]
top_gainers = market.get("top_gainers", [])
top_losers = market.get("top_losers", [])
mood = market.get("mood", "unknown")
up_ratio = market.get("up_ratio", 0)
timestamp = market.get("timestamp", "")
# 字段名适配
for s in sectors:
s["change"] = s.get("change_pct", 0)
for g in top_gainers:
g["change"] = g.get("change_pct", 0)
for l in top_losers:
l["change"] = l.get("change_pct", 0)
else:
raise Exception("no data")
except Exception:
market_path = DATA_DIR / "market.json"
with open(market_path, "r", encoding="utf-8") as f:
market = json.load(f)
sectors = market.get("sectors", [])
top_gainers = market.get("top_gainers", [])
top_losers = market.get("top_losers", [])
mood = market.get("mood", "unknown")
up_ratio = market.get("up_ratio", 0)
timestamp = market.get("timestamp", "")
industry_holdings = load_holding_industry_map()
insights = []
potentials = []
# ── 洞察1:市场情绪总览 ──
mood_cn = {"bullish": "偏强", "neutral": "中性", "bearish": "偏弱", "unknown": "未知"}
insights.append(
f"市场情绪{mood_cn.get(mood, '未知')},上涨占比{up_ratio}%"
)
# ── 洞察2:领涨行业 vs 持仓影响 ──
gainer_insights = []
for g in top_gainers[:3]:
name = g.get("name", "")
change = g.get("change", 0)
# 看持仓中是否有该行业
matched = []
for industry, holdings in industry_holdings.items():
if industry in name or name in industry:
matched.extend([h["name"] for h in holdings])
if matched:
gainer_insights.append(
f"{name}+{change}%, 关联持仓{'/'.join(matched[:3])}受益"
)
else:
gainer_insights.append(f"{name}+{change}%, 暂无持仓")
if gainer_insights:
insights.append("领涨板块: " + " | ".join(gainer_insights[:2]))
# ── 洞察3:领跌行业 vs 持仓风险 ──
loser_insights = []
for g in top_losers[:3]:
name = g.get("name", "")
change = g.get("change", 0)
matched = []
for industry, holdings in industry_holdings.items():
if industry in name or name in industry:
matched.extend([h["name"] for h in holdings])
if matched:
loser_insights.append(
f"{name}{change}%, {'/'.join(matched[:2])}需关注"
)
else:
loser_insights.append(f"{name}{change}%")
if loser_insights:
insights.append("风险板块: " + " | ".join(loser_insights[:3]))
# ── 洞察4:资金流向异动 ──
big_inflow = [s for s in sectors if s.get("net_inflow", 0) > 50]
big_outflow = [s for s in sectors if s.get("net_inflow", 0) < -50]
if big_inflow:
top = max(big_inflow, key=lambda s: s["net_inflow"])
insights.append(
f"资金流入最大: {top['name']} {top['net_inflow']}亿"
)
if big_outflow:
top = min(big_outflow, key=lambda s: s["net_inflow"])
insights.append(
f"资金流出最大: {top['name']} {top['net_inflow']}亿"
)
# ── 潜力股挖掘:从强势行业中找持仓或自选相关 ──
for g in top_gainers[:5]:
name = g.get("name", "")
change = g.get("change", 0)
if change < 2:
continue # 只关注涨>2%的
# 找该行业指数有没有关联持仓
lead_stock = g.get("lead_stock", "")
if lead_stock:
potentials.append({
"name": lead_stock,
"reason": f"{name}领涨股, 板块+{change}%",
})
# 看持仓中是否有该行业
for industry, holdings in industry_holdings.items():
if industry in name or name in industry:
for h in holdings:
potentials.append({
"name": h["name"],
"reason": f"所在行业{name}{change}%",
})
# 去重(最多5条)
seen = set()
unique_potentials = []
for p in potentials:
key = p["name"]
if key not in seen:
seen.add(key)
unique_potentials.append(p)
if len(unique_potentials) >= 5:
break
potentials = unique_potentials
# ── 写入 market.json ──
market["insights"] = insights
market["potential_stocks"] = potentials
market["insight_timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M")
with open(market_path, "w", encoding="utf-8") as f:
json.dump(market, f, ensure_ascii=False, indent=2)
print(f"生成{len(insights)}条洞察 + {len(potentials)}条潜力挖掘")
if __name__ == "__main__":
generate()