Files
MoFin/stock_profile.py
T

493 lines
16 KiB
Python

#!/usr/bin/env python3
"""stock_profile.py — 个股综合画像系统
将宏观、行业、基本面、技术面四维数据整合为一只股票的完整画像。
输出:分类(短炒/中短线/中长线/深套)、综合评分、操作建议基调。
"""
import json
import os
import urllib.request
from datetime import datetime
from typing import Optional
DATA_DIR = "/home/hmo/web-dashboard/data"
MTF_CACHE_PATH = os.path.join(DATA_DIR, "multi_tf_cache.json")
MACRO_PATH = os.path.join(DATA_DIR, "macro_context.json")
PORTFOLIO_PATH = os.path.join(DATA_DIR, "portfolio.json")
# 腾讯API字段索引(quote批量接口)
F = {
"name": 1, "code": 2, "price": 3, "close_yest": 4, "open": 5,
"volume": 6, "outer_vol": 7, "inner_vol": 8,
"timestamp": 30, "change": 31, "change_pct": 32,
"high": 33, "low": 34,
"turnover": 37, "turnover_rate": 38, "pe": 39,
"high_limit": 41, "low_limit": 42, "amplitude": 43,
"market_cap_流通": 44, "market_cap_总": 45, "pb": 46,
"ep": 47, "es": 48, "eps": 49,
"avg_price": 51,
"sector_tag": 60, "sector": 61,
"high_52w": 67, "low_52w": 68,
}
# 港股字段偏移不同
F_HK = {
"name": 1, "code": 2, "price": 3, "close_yest": 4,
"change": 31, "change_pct": 32,
"high": 33, "low": 34, "high_limit": 48, "low_limit": 49,
"pe": 57, "pb": 58, "eps": 72, "market_cap_总": 45,
"turnover": 37,
}
def get_quote(code: str) -> dict:
"""获取腾讯API实时行情+基本面"""
raw = str(code).split("_")[0]
if len(raw) == 5 and raw.isdigit():
prefix = "hk"
fields = F_HK
elif raw.startswith("6") or raw.startswith("5"):
prefix = "sh"
fields = F
else:
prefix = "sz"
fields = F
# DB 优先
try:
from mofin_db import get_price_from_db
p, chg = get_price_from_db(raw)
if p: return {"price": p, "name": name, "code": raw, "change_pct": chg or 0}
except: pass
# Fallback: 腾讯
url = f"http://qt.gtimg.cn/q={prefix}{raw}"
try:
req = urllib.request.Request(url, headers={
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"
})
with urllib.request.urlopen(req, timeout=5) as resp:
raw_text = resp.read().decode("gbk")
fields_raw = raw_text.split('"')[1].split("~")
except Exception as e:
return {"code": code, "error": str(e)}
def get(idx):
try:
v = fields_raw[idx].strip()
return float(v) if v else None
except (IndexError, ValueError):
return None
def get_str(idx):
try:
return fields_raw[idx].strip()
except IndexError:
return ""
is_hk = prefix == "hk"
result = {
"code": raw,
"name": get_str(fields["name"]),
"price": get(fields["price"]),
"change_pct": get(fields["change_pct"]),
"high": get(fields["high"]),
"low": get(fields["low"]),
"pe": get(fields["pe"]),
"pb": get(fields["pb"]),
"eps": get(fields["eps"]),
}
if is_hk:
result["market_cap"] = get(fields["market_cap_总"])
result["high_52w"] = get(fields["high_limit"])
result["low_52w"] = get(fields["low_limit"])
else:
result["market_cap"] = get(fields["market_cap_总"])
result["market_cap_流通"] = get(fields["market_cap_流通"])
result["high_52w"] = get(fields["high_52w"])
result["low_52w"] = get(fields["low_52w"])
result["turnover_rate"] = get(fields["turnover_rate"])
result["amplitude"] = get(fields["amplitude"])
result["sector"] = get_str(fields["sector"])
result["outer_vol"] = get(fields["outer_vol"])
result["inner_vol"] = get(fields["inner_vol"])
return result
def load_mtf_cache() -> dict:
try:
with open(MTF_CACHE_PATH) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def load_macro() -> dict:
"""加载宏观上下文,优先DB"""
try:
import sqlite3
conn = sqlite3.connect(os.path.join(DATA_DIR, "mofin.db"))
row = conn.execute(
"SELECT indices, structure, key_sectors FROM macro_context_log "
"WHERE has_valid_data=1 ORDER BY created_at DESC LIMIT 1"
).fetchone()
conn.close()
if row:
return {"indices": json.loads(row[0] or "{}"),
"structure": json.loads(row[1] or "{}"),
"key_sectors": json.loads(row[2] or "[]")}
except:
pass
try:
with open(MACRO_PATH) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def get_fundamental_rating(f: dict) -> dict:
"""基本面评分"""
pe = f.get("pe")
pb = f.get("pb")
eps = f.get("eps")
mcap = f.get("market_cap")
high_52w = f.get("high_52w")
low_52w = f.get("low_52w")
price = f.get("price")
chg = f.get("change_pct")
score = 50 # 基准分
signals = []
details = {}
# PE评估
if pe and pe > 0:
details["pe"] = round(pe, 2)
if pe < 15:
score += 15
signals.append("PE低估值")
elif pe < 30:
score += 8
signals.append("PE合理")
elif pe < 60:
score += 0
signals.append("PE偏高")
else:
score -= 10
signals.append("PE>60高估")
elif pe and pe < 0:
details["pe"] = round(pe, 2)
score -= 5
signals.append("PE为负(亏损)")
# PB评估
if pb and pb > 0:
details["pb"] = round(pb, 2)
if pb < 1.5:
score += 10
signals.append("PB低")
elif pb < 3:
score += 5
signals.append("PB合理")
elif pb < 8:
score += 0
else:
score -= 5
signals.append("PB>8偏高")
elif pb and pb < 0:
score -= 5
# EPS评估
if eps:
details["eps"] = round(eps, 2)
if eps > 2:
score += 10
signals.append("EPS优秀>2")
elif eps > 0.5:
score += 5
signals.append("EPS良好")
elif eps > 0:
score += 2
else:
score -= 5
signals.append("EPS为负")
# 52周位置
if high_52w and low_52w and price and high_52w > low_52w:
position = (price - low_52w) / (high_52w - low_52w)
details["52w_position"] = round(position, 2)
if position < 0.2:
score += 10
signals.append("近52周低位")
elif position < 0.4:
score += 5
signals.append("52周偏低")
elif position > 0.8:
score -= 5
signals.append("近52周高位")
elif position > 0.95:
score -= 10
signals.append("52周顶部区域")
# 市值评估(大盘股加分)
if mcap and mcap > 0:
details["market_cap_亿"] = round(mcap, 0)
if mcap > 1000:
score += 5
signals.append("大盘股")
elif mcap > 100:
score += 0
else:
score -= 5
signals.append("小盘股")
return {
"score": max(0, min(100, score)),
"signals": signals,
"details": details,
}
def classify_stock(f: dict, mtf: dict, macro: dict) -> dict:
"""股票分类:短炒 / 中短线 / 中长线 / 深套持有"""
price = f.get("price", 0)
pe = f.get("pe") or 0
chg = f.get("change_pct", 0)
eps = f.get("eps") or 0
# 多周期趋势
mtf_adj = mtf.get("strategy_adjustment", {})
trend_align = mtf_adj.get("trend_alignment", "未知")
mtf_daily = mtf.get("daily", {})
mtf_trend = mtf_daily.get("trend", {})
mas = mtf_daily.get("mas", {})
ma20 = mas.get("ma20") or 0
ma60 = mas.get("ma60") or 0
# 基本面得分
fund_rating = get_fundamental_rating(f)
fund_score = fund_rating["score"]
# 短期涨幅判断(近20日)
mtf_sr = mtf_daily.get("support_resistance", {})
high_20d = mtf_sr.get("high_52w", price) # 近20日最高
low_20d = mtf_sr.get("low_52w", price) # 近20日最低
recent_volatility = ((high_20d - low_20d) / low_20d * 100) if low_20d > 0 else 0
# ---- 分类逻辑 ----
category = "中短线"
reason = []
position_suggestion = ""
time_horizon = ""
# 1. 深套检查
cost = 0
try:
pf = json.load(open(PORTFOLIO_PATH))
for h in pf.get("holdings", []):
if h.get("code") == f.get("code"):
cost = h.get("cost", 0) or 0
break
except Exception:
pass
if cost > 0 and price > 0:
profit_pct = (price - cost) / cost * 100
else:
profit_pct = 0
if profit_pct < -20:
category = "深套持有"
reason.append(f"浮亏{profit_pct:.0f}%")
position_suggestion = "不补不割,等趋势反转"
time_horizon = "长期"
return {
"category": category,
"reasons": reason,
"fundamental_score": fund_score,
"position_suggestion": position_suggestion,
"time_horizon": time_horizon,
"volatility_20d": round(recent_volatility, 1),
}
# 2. 短线爆发判断
# 特征:近20日振幅大(>30%)、涨幅大、PE可能极高或为负、换手率高
recent_chg = chg or 0
is_high_volatility = recent_volatility > 30
is_momentum = is_high_volatility and (pe > 100 or pe < 0)
is_turnover_high = f.get("turnover_rate", 0) or 0 > 10 if f.get("turnover_rate") else False
if is_momentum or (is_high_volatility and is_turnover_high):
category = "短炒"
if is_momentum:
reason.append("高波动+高PE题材驱动")
if is_turnover_high:
reason.append(f"换手率{f.get('turnover_rate',0):.1f}%活跃")
position_suggestion = "小仓位快进快出,止损严格"
time_horizon = "数日~2周"
# 3. 中长线判断
# 特征:基本面好(PE合理<30、EPS>1、大盘股)、多周期看多、行业向好
elif (fund_score >= 65 and pe < 30 and eps > 0.5) or \
(trend_align == "多周期看多" and fund_score >= 55 and ma20 > 0 and price > ma20):
category = "中长线"
if fund_score >= 65:
reason.append(f"基本面良好({fund_score}分)")
if trend_align == "多周期看多":
reason.append("多周期共振看多")
if pe and pe < 20:
reason.append(f"PE{pe:.0f}低估")
position_suggestion = "正常仓位配置,趋势不破不走"
time_horizon = "数月~1年"
# 4. 中短线(默认)
else:
category = "中短线"
if fund_score >= 50:
reason.append("基本面中等")
else:
reason.append(f"基本面偏弱({fund_score}分)")
if trend_align in ("震荡/无明显方向", "多周期分化"):
reason.append("方向不明")
position_suggestion = "中等仓位,技术面操作为主"
time_horizon = "2周~3月"
return {
"category": category,
"reasons": reason,
"fundamental_score": fund_score,
"position_suggestion": position_suggestion,
"time_horizon": time_horizon,
"volatility_20d": round(recent_volatility, 1),
}
def full_profile(code: str) -> dict:
"""完整个股画像"""
# 获取实时行情+基本面
quote = get_quote(code)
if "error" in quote:
return {"code": code, "error": quote["error"]}
# 多周期技术面
from multi_timeframe import full_multi_tf_analysis
mtf = full_multi_tf_analysis(code)
# 宏观环境
macro = load_macro()
# 基本面评分
fund_rating = get_fundamental_rating(quote)
# 股票分类
classification = classify_stock(quote, mtf, macro)
# 综合评分(四维加权)
# 技术面得分: 从multi_timeframe的趋势判断中提取
tech_score = 50
mtf_daily = mtf.get("daily", {})
mtf_trend = mtf_daily.get("trend", {})
if mtf_trend.get("trend") == "up":
tech_score = 70
elif mtf_trend.get("trend") == "strong_up":
tech_score = 85
elif mtf_trend.get("trend") == "down":
tech_score = 30
elif mtf_trend.get("trend") == "strong_down":
tech_score = 15
elif mtf_trend.get("trend") == "sideways":
tech_score = 50
# 行业得分(从宏观读取该行业表现)
sector_score = 50
macro_structure = macro.get("structure", {})
sector_map = {
"芯片": ["688981", "00981", "300548"],
"信息技术": ["00700", "300124"],
"新能源电池": ["300750", "300035", "600110"],
"机器人": ["300124"],
"周期": ["601899", "600739"],
"蓝筹": ["600036", "02318", "00700"],
}
# 找该股票所属行业
for sector_name, sector_codes in sector_map.items():
if code in sector_codes or any(code.startswith(c[:2]) for c in sector_codes):
sector_data = macro_structure.get(sector_name)
if sector_data:
chg_val = sector_data if isinstance(sector_data, (int, float)) else \
float(str(sector_data).replace("%", "").replace("+", "")) if sector_data else 0
if chg_val > 1:
sector_score = 70
elif chg_val < -1:
sector_score = 30
else:
sector_score = 50
break
# 宏观得分(整体市场情绪)
macro_score = 50
overall = macro_structure.get("overall", "neutral")
if overall == "strong_bullish":
macro_score = 80
elif overall == "bullish":
macro_score = 65
elif overall == "bearish":
macro_score = 35
elif overall == "strong_bearish":
macro_score = 20
# 综合评分 = 基本面30% + 技术面30% + 行业20% + 宏观20%
composite = round(
fund_rating["score"] * 0.30 +
tech_score * 0.30 +
sector_score * 0.20 +
macro_score * 0.20
)
return {
"code": code,
"name": quote.get("name", ""),
"price": quote.get("price"),
"change_pct": quote.get("change_pct"),
"fundamentals": {
"pe": quote.get("pe"),
"pb": quote.get("pb"),
"eps": quote.get("eps"),
"market_cap": quote.get("market_cap"),
"high_52w": quote.get("high_52w"),
"low_52w": quote.get("low_52w"),
"rating": fund_rating,
},
"classification": classification,
"multi_timeframe": {
"daily_ma_trend": mtf.get("daily", {}).get("trend", {}).get("ma_trend", ""),
"weekly_trend": mtf.get("weekly", {}).get("trend", {}).get("description", ""),
"monthly_trend": mtf.get("monthly", {}).get("trend", {}).get("description", ""),
"trend_alignment": mtf.get("strategy_adjustment", {}).get("trend_alignment", ""),
"ma20": mtf.get("daily", {}).get("mas", {}).get("ma20"),
"ma60": mtf.get("daily", {}).get("mas", {}).get("ma60"),
},
"scoring": {
"fundamental": fund_rating["score"],
"technical": tech_score,
"sector": sector_score,
"macro": macro_score,
"composite": composite,
},
"strategy_note": (
f"{classification['category']} | "
f"综合{composite}分(基本{fund_rating['score']}/技术{tech_score}/行业{sector_score}/宏观{macro_score}) | "
f"{classification['position_suggestion']}"
),
}
if __name__ == "__main__":
import sys
codes = sys.argv[1:] or ["300548", "600110", "600036"]
for code in codes:
p = full_profile(code)
print(json.dumps(p, ensure_ascii=False, indent=2))
print("-" * 60)