423 lines
13 KiB
Python
423 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""technical_analysis.py — 技术面分析模块 v2
|
||
|
||
基于多日价格数据计算支撑位/压力位:
|
||
1. 缓存每日 HLC 到 price_history.json
|
||
2. 使用 5 日最高/最低计算枢轴点
|
||
3. 结合振幅自动调整区间宽度
|
||
|
||
使用方式:
|
||
from technical_analysis import full_analysis
|
||
result = full_analysis("603259") # 自动识别A股/港股
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import urllib.request
|
||
from datetime import datetime, date
|
||
|
||
# 腾讯API字段索引
|
||
F = {
|
||
"name": 1, "code": 2, "price": 3, "close_yest": 4, "open": 5,
|
||
"volume": 6, "timestamp": 30, "change": 31, "change_pct": 32,
|
||
"high": 33, "low": 34, "amplitude": 43,
|
||
"turnover": 38, "pe": 39, "pb": 46,
|
||
"limit_up": 47, "limit_down": 48,
|
||
"avg_price": 51, "inner_vol": 52, "outer_vol": 53,
|
||
}
|
||
|
||
HISTORY_PATH = "/home/hmo/web-dashboard/data/price_history.json"
|
||
HISTORY_DAYS = 60 # 使用最近 N 天的 HLC 数据
|
||
|
||
|
||
def _load_history():
|
||
"""读取价格历史缓存"""
|
||
try:
|
||
return json.load(open(HISTORY_PATH))
|
||
except (FileNotFoundError, json.JSONDecodeError):
|
||
return {}
|
||
|
||
|
||
def _save_history(h):
|
||
json.dump(h, open(HISTORY_PATH, "w"), ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _market_prefix(code):
|
||
"""根据代码确定腾讯API前缀"""
|
||
if code.startswith("sh") or code.startswith("sz") or code.startswith("hk"):
|
||
code = code[2:] if code[2:].isdigit() else code
|
||
raw = str(code).split("_")[0]
|
||
if len(raw) == 5 and raw.isdigit():
|
||
return "hk"
|
||
if raw.startswith("6") or raw.startswith("5"):
|
||
return "sh"
|
||
return "sz"
|
||
|
||
|
||
def get_quote(code):
|
||
"""获取行情数据。DB 优先(price_monitor 维护),腾讯 API fallback"""
|
||
# DB 优先
|
||
try:
|
||
from mofin_db import get_price_from_db
|
||
p, chg = get_price_from_db(code)
|
||
if p: return {"code": code, "price": p, "change_pct": chg or 0}
|
||
except: pass
|
||
# Fallback: 腾讯 API
|
||
import time
|
||
_cache = get_quote.__dict__.get("_cache", {})
|
||
now = time.time()
|
||
cached = _cache.get(code)
|
||
if cached and (now - cached["ts"]) < 60:
|
||
return cached["data"]
|
||
|
||
raw = str(code).split("_")[0]
|
||
prefix = _market_prefix(code)
|
||
url = f"http://qt.gtimg.cn/q={prefix}{raw}"
|
||
try:
|
||
r = urllib.request.urlopen(url, timeout=5)
|
||
fields = r.read().decode("gbk").split('"')[1].split("~")
|
||
except Exception as e:
|
||
return {"code": code, "error": str(e)}
|
||
|
||
def get(i):
|
||
try:
|
||
return float(fields[i]) if fields[i].strip() else None
|
||
except (IndexError, ValueError):
|
||
return None
|
||
|
||
today_str = date.today().isoformat()
|
||
q = {
|
||
"code": raw,
|
||
"market": prefix,
|
||
"name": fields[F["name"]] if len(fields) > F["name"] else code,
|
||
"price": get(3),
|
||
"close_yest": get(4),
|
||
"open": get(5),
|
||
"high": get(33),
|
||
"low": get(34),
|
||
"volume": get(6),
|
||
"amount": get(37),
|
||
"change": get(31),
|
||
"change_pct": get(32),
|
||
"amplitude": get(43),
|
||
"turnover_rate": get(38),
|
||
"pe": get(39),
|
||
"pb": get(46),
|
||
"limit_up": get(47),
|
||
"limit_down": get(48),
|
||
"avg_price": get(51),
|
||
"inner_vol": get(52),
|
||
"outer_vol": get(53),
|
||
"timestamp": fields[F["timestamp"]] if len(fields) > F["timestamp"] else "",
|
||
"_date": today_str,
|
||
}
|
||
|
||
# 写入价格历史缓存(每日一次)
|
||
h = get(33) # high
|
||
l = get(34) # low
|
||
c = get(3) # price / close
|
||
if h and l and c:
|
||
history = _load_history()
|
||
if raw not in history:
|
||
history[raw] = []
|
||
days = history[raw]
|
||
# 如果今天已有记录,更新(盘中数据更精确)
|
||
if days and len(days) > 0 and days[-1].get("date") == today_str:
|
||
days[-1]["high"] = max(days[-1]["high"], h)
|
||
days[-1]["low"] = min(days[-1]["low"], l)
|
||
days[-1]["close"] = c # 盘中用最新价,收盘后是收盘价
|
||
else:
|
||
days.append({"date": today_str, "high": h, "low": l, "close": c})
|
||
# 只保留最近 HISTORY_DAYS 天
|
||
history[raw] = days[-HISTORY_DAYS:]
|
||
_save_history(history)
|
||
|
||
# 写入60秒缓存
|
||
get_quote.__dict__["_cache"] = {**get_quote.__dict__.get("_cache", {}), code: {"ts": now, "data": q}}
|
||
|
||
return q
|
||
|
||
|
||
def calc_support_resistance(q):
|
||
"""计算技术支撑位和压力位 — 多日枢轴点算法
|
||
|
||
使用多个数据源确定有效区间:
|
||
1. 当日波幅(H-L)
|
||
2. 最近 N 日的最高/最低(从 price_history.json 读取)
|
||
3. 价格基数的百分比(对大市值低波动股票有效)
|
||
"""
|
||
h = q.get("high")
|
||
l = q.get("low")
|
||
c = q.get("price")
|
||
yc = q.get("close_yest")
|
||
amplitude = q.get("amplitude") # 当日振幅%
|
||
code = q.get("code", "")
|
||
|
||
if not all([h, l, c]):
|
||
return {"error": "数据不足"}
|
||
|
||
# 多日最高/最低(从历史缓存读取)
|
||
history = _load_history()
|
||
hist_days = history.get(code, [])
|
||
multi_high = max(d["high"] for d in hist_days) if hist_days else h
|
||
multi_low = min(d["low"] for d in hist_days) if hist_days else l
|
||
|
||
# 有效区间 = max(当日波幅, 多日波幅, 价格×5%)
|
||
daily_range = h - l
|
||
multi_range = multi_high - multi_low
|
||
min_range = c * 0.05 # 5%价格基数
|
||
|
||
effective_range = max(daily_range, multi_range, min_range)
|
||
|
||
# 如果股价接近多日高点(>80%分位),说明在上升趋势中,扩大区间
|
||
trend_position = (c - multi_low) / (multi_high - multi_low) if multi_high > multi_low else 0.5
|
||
if trend_position > 0.8:
|
||
# 高位运行,扩大有效区间到价格的8%确保合理空间
|
||
effective_range = max(effective_range, c * 0.08)
|
||
elif trend_position < 0.2:
|
||
# 低位运行,同样扩大
|
||
effective_range = max(effective_range, c * 0.08)
|
||
|
||
# 如果振幅数据可用且振幅较小(<3%),进一步扩大区间确保有效性
|
||
if amplitude and amplitude > 0 and amplitude < 3:
|
||
# 低波动股票用 振幅×3 作为最小范围
|
||
amp_based = c * amplitude / 100 * 3
|
||
effective_range = max(effective_range, amp_based)
|
||
|
||
# 枢轴点 (Pivot Point)
|
||
pp = (h + l + c) / 3
|
||
|
||
# 支撑位
|
||
s1 = 2 * pp - h # 弱支撑
|
||
s2 = pp - effective_range # 强支撑
|
||
|
||
# 压力位
|
||
r1 = 2 * pp - l # 弱压力
|
||
r2 = pp + effective_range # 强压力
|
||
|
||
# 参考昨收调整
|
||
if yc:
|
||
if yc < s1:
|
||
s1 = yc
|
||
if yc > r1:
|
||
r1 = yc
|
||
|
||
# A股涨停/跌停价作为极端边界
|
||
limit_up = q.get("limit_up")
|
||
limit_down = q.get("limit_down")
|
||
market = q.get("market", "hk")
|
||
if market != "hk" and limit_up and limit_down:
|
||
# 注意:当现价逼近涨停/跌停时,limit不再是有效边界
|
||
# 用有效区间判断:如果自然计算的r2/s2在合理范围内不截断
|
||
natural_r2 = r2
|
||
natural_s2 = s2
|
||
# 涨停限制只对距离现价超过2%的强压位生效
|
||
if limit_up < r2 and (limit_up - c) / c < 0.02:
|
||
# 涨停价离现价<2%,说明可能封板,不截断
|
||
pass # 使用自然计算的r2
|
||
elif limit_up < r2:
|
||
r2 = limit_up
|
||
if limit_down > s2 and (c - limit_down) / c < 0.02:
|
||
pass # 接近跌停,不截断
|
||
elif limit_down > s2:
|
||
s2 = limit_down
|
||
|
||
return {
|
||
"strong_support": round(s2, 2),
|
||
"weak_support": round(s1, 2),
|
||
"pivot": round(pp, 2),
|
||
"weak_resist": round(r1, 2),
|
||
"strong_resist": round(r2, 2),
|
||
"today_high": h,
|
||
"today_low": l,
|
||
"multi_high": multi_high,
|
||
"multi_low": multi_low,
|
||
"effective_range": round(effective_range, 2),
|
||
}
|
||
|
||
|
||
def analyze_candlestick(q):
|
||
"""判断K线形态"""
|
||
o = q.get("open")
|
||
c = q.get("price")
|
||
h = q.get("high")
|
||
l = q.get("low")
|
||
yc = q.get("close_yest")
|
||
|
||
if not all([o, c, h, l]):
|
||
return {"pattern": "unknown", "sentiment": "neutral"}
|
||
|
||
if c >= o:
|
||
body = c - o
|
||
upper = h - c
|
||
lower = o - l
|
||
is_green = True
|
||
else:
|
||
body = o - c
|
||
upper = h - o
|
||
lower = c - l
|
||
is_green = False
|
||
|
||
total_range = h - l
|
||
if total_range == 0:
|
||
return {"pattern": "平盘", "sentiment": "neutral"}
|
||
|
||
body_pct = body / total_range * 100
|
||
upper_pct = upper / total_range * 100
|
||
lower_pct = lower / total_range * 100
|
||
|
||
if body_pct < 5:
|
||
if upper_pct > 60:
|
||
pattern = "倒T线/射击之星"
|
||
sentiment = "bearish"
|
||
elif lower_pct > 60:
|
||
pattern = "锤子线/T字线"
|
||
sentiment = "bullish"
|
||
else:
|
||
pattern = "十字星"
|
||
sentiment = "neutral"
|
||
elif body_pct < 30:
|
||
if upper_pct > 40 and lower_pct > 40:
|
||
pattern = "长影星线"
|
||
sentiment = "neutral"
|
||
elif upper_pct > 40:
|
||
pattern = "倒T线/射击之星"
|
||
sentiment = "bearish" if is_green else "bearish"
|
||
elif lower_pct > 40:
|
||
pattern = "锤子线/T字线"
|
||
sentiment = "bullish" if is_green else "bullish"
|
||
else:
|
||
pattern = "小阳线" if is_green else "小阴线"
|
||
sentiment = "bullish" if is_green else "bearish"
|
||
else:
|
||
if upper_pct > 30:
|
||
pattern = "带上影阳线" if is_green else "带上影阴线"
|
||
sentiment = "neutral" if is_green else "bearish"
|
||
elif lower_pct > 30:
|
||
pattern = "带下影阳线" if is_green else "带下影阴线"
|
||
sentiment = "bullish" if is_green else "neutral"
|
||
else:
|
||
pattern = "光头光脚阳线" if is_green else "光头光脚阴线"
|
||
sentiment = "bullish" if is_green else "bearish"
|
||
|
||
gap_up = ""
|
||
gap_down = ""
|
||
if yc:
|
||
if o > yc * 1.01:
|
||
gap_up = "跳空高开"
|
||
if not is_green:
|
||
sentiment = "neutral"
|
||
elif o < yc * 0.99:
|
||
gap_down = "跳空低开"
|
||
if is_green:
|
||
sentiment = "neutral"
|
||
|
||
return {
|
||
"pattern": pattern,
|
||
"sentiment": sentiment,
|
||
"body_pct": round(body_pct, 1),
|
||
"upper_shadow_pct": round(upper_pct, 1),
|
||
"lower_shadow_pct": round(lower_pct, 1),
|
||
"is_green": is_green,
|
||
"gap": gap_up or gap_down or "无跳空",
|
||
}
|
||
|
||
|
||
def analyze_volume(q):
|
||
"""量价分析"""
|
||
outer = q.get("outer_vol")
|
||
inner = q.get("inner_vol")
|
||
turnover = q.get("turnover_rate")
|
||
|
||
result = {}
|
||
if outer and inner and (outer + inner) > 0:
|
||
ratio = outer / (outer + inner)
|
||
result["buy_sell_ratio"] = round(ratio, 2)
|
||
if ratio > 0.55:
|
||
result["volume_signal"] = "主动买盘占优"
|
||
elif ratio < 0.45:
|
||
result["volume_signal"] = "主动卖盘占优"
|
||
else:
|
||
result["volume_signal"] = "买卖均衡"
|
||
else:
|
||
result["volume_signal"] = "数据不足"
|
||
|
||
if turnover:
|
||
result["turnover_rate"] = turnover
|
||
|
||
return result
|
||
|
||
|
||
def full_analysis(code):
|
||
"""完整技术分析(带30秒缓存,避免分钟级波动)"""
|
||
import time
|
||
_cache = full_analysis.__dict__.get("_cache", {})
|
||
now = time.time()
|
||
cached = _cache.get(code)
|
||
if cached and (now - cached["ts"]) < 30:
|
||
return cached["data"]
|
||
|
||
q = get_quote(code)
|
||
if not q or "error" in q:
|
||
return q
|
||
|
||
sr = calc_support_resistance(q)
|
||
candle = analyze_candlestick(q)
|
||
vol = analyze_volume(q)
|
||
|
||
# 多周期+均线分析(整合 multi_timeframe)
|
||
mtf = {}
|
||
try:
|
||
from multi_timeframe import full_multi_tf_analysis as _mtf
|
||
mtf_raw = _mtf(code)
|
||
if mtf_raw and 'daily' in mtf_raw:
|
||
d = mtf_raw['daily']
|
||
mtf = {
|
||
'mas': d.get('mas', {}),
|
||
'multi_tf_sr': d.get('support_resistance', {}),
|
||
'trend': d.get('trend', {}),
|
||
}
|
||
# 周线弱压/弱撑作为中周期参考
|
||
if 'weekly' in mtf_raw:
|
||
w = mtf_raw['weekly']
|
||
ws = w.get('support_resistance', {})
|
||
mtf['weekly_sr'] = {
|
||
'weak_resist': ws.get('weak_resist'),
|
||
'weak_support': ws.get('weak_support'),
|
||
}
|
||
except Exception:
|
||
pass # non-critical, graceful degradation
|
||
|
||
result = {
|
||
"quote": {
|
||
"name": q["name"],
|
||
"price": q["price"],
|
||
"change_pct": q["change_pct"],
|
||
"open": q["open"],
|
||
"high": q["high"],
|
||
"low": q["low"],
|
||
"close_yest": q["close_yest"],
|
||
"volume": q["volume"],
|
||
"amplitude": q["amplitude"],
|
||
},
|
||
"support_resistance": sr,
|
||
"candlestick": candle,
|
||
"volume": vol,
|
||
"multi_tf": mtf,
|
||
"analyzed_at": datetime.now().strftime("%H:%M"),
|
||
}
|
||
|
||
# 写入缓存
|
||
_cache[code] = {"ts": now, "data": result}
|
||
full_analysis.__dict__["_cache"] = _cache
|
||
return result
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import sys
|
||
codes = sys.argv[1:] or ["603259", "002594", "00700"]
|
||
for c in codes:
|
||
r = full_analysis(c)
|
||
print(json.dumps(r, ensure_ascii=False, indent=2))
|
||
print()
|