Files
MoFin/technical_analysis.py
T

423 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""technical_analysis.py — 技术面分析模块 v2
基于多日价格数据计算支撑位/压力位:
1. 缓存每日 HLC 到 price_history.json
2. 使用 5 日最高/最低计算枢轴点
3. 结合振幅自动调整区间宽度
使用方式:
from technical_analysis import full_analysis
result = full_analysis("603259") # 自动识别A股/港股
"""
import json
import os
import urllib.request
from datetime import datetime, date
# 腾讯API字段索引
F = {
"name": 1, "code": 2, "price": 3, "close_yest": 4, "open": 5,
"volume": 6, "timestamp": 30, "change": 31, "change_pct": 32,
"high": 33, "low": 34, "amplitude": 43,
"turnover": 38, "pe": 39, "pb": 46,
"limit_up": 47, "limit_down": 48,
"avg_price": 51, "inner_vol": 52, "outer_vol": 53,
}
HISTORY_PATH = "/home/hmo/web-dashboard/data/price_history.json"
HISTORY_DAYS = 60 # 使用最近 N 天的 HLC 数据
def _load_history():
"""读取价格历史缓存"""
try:
return json.load(open(HISTORY_PATH))
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _save_history(h):
json.dump(h, open(HISTORY_PATH, "w"), ensure_ascii=False, indent=2)
def _market_prefix(code):
"""根据代码确定腾讯API前缀"""
if code.startswith("sh") or code.startswith("sz") or code.startswith("hk"):
code = code[2:] if code[2:].isdigit() else code
raw = str(code).split("_")[0]
if len(raw) == 5 and raw.isdigit():
return "hk"
if raw.startswith("6") or raw.startswith("5"):
return "sh"
return "sz"
def get_quote(code):
"""获取行情数据。DB 优先(price_monitor 维护),腾讯 API fallback"""
# DB 优先
try:
from mofin_db import get_price_from_db
p, chg = get_price_from_db(code)
if p: return {"code": code, "price": p, "change_pct": chg or 0}
except: pass
# Fallback: 腾讯 API
import time
_cache = get_quote.__dict__.get("_cache", {})
now = time.time()
cached = _cache.get(code)
if cached and (now - cached["ts"]) < 60:
return cached["data"]
raw = str(code).split("_")[0]
prefix = _market_prefix(code)
url = f"http://qt.gtimg.cn/q={prefix}{raw}"
try:
r = urllib.request.urlopen(url, timeout=5)
fields = r.read().decode("gbk").split('"')[1].split("~")
except Exception as e:
return {"code": code, "error": str(e)}
def get(i):
try:
return float(fields[i]) if fields[i].strip() else None
except (IndexError, ValueError):
return None
today_str = date.today().isoformat()
q = {
"code": raw,
"market": prefix,
"name": fields[F["name"]] if len(fields) > F["name"] else code,
"price": get(3),
"close_yest": get(4),
"open": get(5),
"high": get(33),
"low": get(34),
"volume": get(6),
"amount": get(37),
"change": get(31),
"change_pct": get(32),
"amplitude": get(43),
"turnover_rate": get(38),
"pe": get(39),
"pb": get(46),
"limit_up": get(47),
"limit_down": get(48),
"avg_price": get(51),
"inner_vol": get(52),
"outer_vol": get(53),
"timestamp": fields[F["timestamp"]] if len(fields) > F["timestamp"] else "",
"_date": today_str,
}
# 写入价格历史缓存(每日一次)
h = get(33) # high
l = get(34) # low
c = get(3) # price / close
if h and l and c:
history = _load_history()
if raw not in history:
history[raw] = []
days = history[raw]
# 如果今天已有记录,更新(盘中数据更精确)
if days and len(days) > 0 and days[-1].get("date") == today_str:
days[-1]["high"] = max(days[-1]["high"], h)
days[-1]["low"] = min(days[-1]["low"], l)
days[-1]["close"] = c # 盘中用最新价,收盘后是收盘价
else:
days.append({"date": today_str, "high": h, "low": l, "close": c})
# 只保留最近 HISTORY_DAYS 天
history[raw] = days[-HISTORY_DAYS:]
_save_history(history)
# 写入60秒缓存
get_quote.__dict__["_cache"] = {**get_quote.__dict__.get("_cache", {}), code: {"ts": now, "data": q}}
return q
def calc_support_resistance(q):
"""计算技术支撑位和压力位 — 多日枢轴点算法
使用多个数据源确定有效区间:
1. 当日波幅(H-L
2. 最近 N 日的最高/最低(从 price_history.json 读取)
3. 价格基数的百分比(对大市值低波动股票有效)
"""
h = q.get("high")
l = q.get("low")
c = q.get("price")
yc = q.get("close_yest")
amplitude = q.get("amplitude") # 当日振幅%
code = q.get("code", "")
if not all([h, l, c]):
return {"error": "数据不足"}
# 多日最高/最低(从历史缓存读取)
history = _load_history()
hist_days = history.get(code, [])
multi_high = max(d["high"] for d in hist_days) if hist_days else h
multi_low = min(d["low"] for d in hist_days) if hist_days else l
# 有效区间 = max(当日波幅, 多日波幅, 价格×5%)
daily_range = h - l
multi_range = multi_high - multi_low
min_range = c * 0.05 # 5%价格基数
effective_range = max(daily_range, multi_range, min_range)
# 如果股价接近多日高点(>80%分位),说明在上升趋势中,扩大区间
trend_position = (c - multi_low) / (multi_high - multi_low) if multi_high > multi_low else 0.5
if trend_position > 0.8:
# 高位运行,扩大有效区间到价格的8%确保合理空间
effective_range = max(effective_range, c * 0.08)
elif trend_position < 0.2:
# 低位运行,同样扩大
effective_range = max(effective_range, c * 0.08)
# 如果振幅数据可用且振幅较小(<3%),进一步扩大区间确保有效性
if amplitude and amplitude > 0 and amplitude < 3:
# 低波动股票用 振幅×3 作为最小范围
amp_based = c * amplitude / 100 * 3
effective_range = max(effective_range, amp_based)
# 枢轴点 (Pivot Point)
pp = (h + l + c) / 3
# 支撑位
s1 = 2 * pp - h # 弱支撑
s2 = pp - effective_range # 强支撑
# 压力位
r1 = 2 * pp - l # 弱压力
r2 = pp + effective_range # 强压力
# 参考昨收调整
if yc:
if yc < s1:
s1 = yc
if yc > r1:
r1 = yc
# A股涨停/跌停价作为极端边界
limit_up = q.get("limit_up")
limit_down = q.get("limit_down")
market = q.get("market", "hk")
if market != "hk" and limit_up and limit_down:
# 注意:当现价逼近涨停/跌停时,limit不再是有效边界
# 用有效区间判断:如果自然计算的r2/s2在合理范围内不截断
natural_r2 = r2
natural_s2 = s2
# 涨停限制只对距离现价超过2%的强压位生效
if limit_up < r2 and (limit_up - c) / c < 0.02:
# 涨停价离现价<2%,说明可能封板,不截断
pass # 使用自然计算的r2
elif limit_up < r2:
r2 = limit_up
if limit_down > s2 and (c - limit_down) / c < 0.02:
pass # 接近跌停,不截断
elif limit_down > s2:
s2 = limit_down
return {
"strong_support": round(s2, 2),
"weak_support": round(s1, 2),
"pivot": round(pp, 2),
"weak_resist": round(r1, 2),
"strong_resist": round(r2, 2),
"today_high": h,
"today_low": l,
"multi_high": multi_high,
"multi_low": multi_low,
"effective_range": round(effective_range, 2),
}
def analyze_candlestick(q):
"""判断K线形态"""
o = q.get("open")
c = q.get("price")
h = q.get("high")
l = q.get("low")
yc = q.get("close_yest")
if not all([o, c, h, l]):
return {"pattern": "unknown", "sentiment": "neutral"}
if c >= o:
body = c - o
upper = h - c
lower = o - l
is_green = True
else:
body = o - c
upper = h - o
lower = c - l
is_green = False
total_range = h - l
if total_range == 0:
return {"pattern": "平盘", "sentiment": "neutral"}
body_pct = body / total_range * 100
upper_pct = upper / total_range * 100
lower_pct = lower / total_range * 100
if body_pct < 5:
if upper_pct > 60:
pattern = "倒T线/射击之星"
sentiment = "bearish"
elif lower_pct > 60:
pattern = "锤子线/T字线"
sentiment = "bullish"
else:
pattern = "十字星"
sentiment = "neutral"
elif body_pct < 30:
if upper_pct > 40 and lower_pct > 40:
pattern = "长影星线"
sentiment = "neutral"
elif upper_pct > 40:
pattern = "倒T线/射击之星"
sentiment = "bearish" if is_green else "bearish"
elif lower_pct > 40:
pattern = "锤子线/T字线"
sentiment = "bullish" if is_green else "bullish"
else:
pattern = "小阳线" if is_green else "小阴线"
sentiment = "bullish" if is_green else "bearish"
else:
if upper_pct > 30:
pattern = "带上影阳线" if is_green else "带上影阴线"
sentiment = "neutral" if is_green else "bearish"
elif lower_pct > 30:
pattern = "带下影阳线" if is_green else "带下影阴线"
sentiment = "bullish" if is_green else "neutral"
else:
pattern = "光头光脚阳线" if is_green else "光头光脚阴线"
sentiment = "bullish" if is_green else "bearish"
gap_up = ""
gap_down = ""
if yc:
if o > yc * 1.01:
gap_up = "跳空高开"
if not is_green:
sentiment = "neutral"
elif o < yc * 0.99:
gap_down = "跳空低开"
if is_green:
sentiment = "neutral"
return {
"pattern": pattern,
"sentiment": sentiment,
"body_pct": round(body_pct, 1),
"upper_shadow_pct": round(upper_pct, 1),
"lower_shadow_pct": round(lower_pct, 1),
"is_green": is_green,
"gap": gap_up or gap_down or "无跳空",
}
def analyze_volume(q):
"""量价分析"""
outer = q.get("outer_vol")
inner = q.get("inner_vol")
turnover = q.get("turnover_rate")
result = {}
if outer and inner and (outer + inner) > 0:
ratio = outer / (outer + inner)
result["buy_sell_ratio"] = round(ratio, 2)
if ratio > 0.55:
result["volume_signal"] = "主动买盘占优"
elif ratio < 0.45:
result["volume_signal"] = "主动卖盘占优"
else:
result["volume_signal"] = "买卖均衡"
else:
result["volume_signal"] = "数据不足"
if turnover:
result["turnover_rate"] = turnover
return result
def full_analysis(code):
"""完整技术分析(带30秒缓存,避免分钟级波动)"""
import time
_cache = full_analysis.__dict__.get("_cache", {})
now = time.time()
cached = _cache.get(code)
if cached and (now - cached["ts"]) < 30:
return cached["data"]
q = get_quote(code)
if not q or "error" in q:
return q
sr = calc_support_resistance(q)
candle = analyze_candlestick(q)
vol = analyze_volume(q)
# 多周期+均线分析(整合 multi_timeframe
mtf = {}
try:
from multi_timeframe import full_multi_tf_analysis as _mtf
mtf_raw = _mtf(code)
if mtf_raw and 'daily' in mtf_raw:
d = mtf_raw['daily']
mtf = {
'mas': d.get('mas', {}),
'multi_tf_sr': d.get('support_resistance', {}),
'trend': d.get('trend', {}),
}
# 周线弱压/弱撑作为中周期参考
if 'weekly' in mtf_raw:
w = mtf_raw['weekly']
ws = w.get('support_resistance', {})
mtf['weekly_sr'] = {
'weak_resist': ws.get('weak_resist'),
'weak_support': ws.get('weak_support'),
}
except Exception:
pass # non-critical, graceful degradation
result = {
"quote": {
"name": q["name"],
"price": q["price"],
"change_pct": q["change_pct"],
"open": q["open"],
"high": q["high"],
"low": q["low"],
"close_yest": q["close_yest"],
"volume": q["volume"],
"amplitude": q["amplitude"],
},
"support_resistance": sr,
"candlestick": candle,
"volume": vol,
"multi_tf": mtf,
"analyzed_at": datetime.now().strftime("%H:%M"),
}
# 写入缓存
_cache[code] = {"ts": now, "data": result}
full_analysis.__dict__["_cache"] = _cache
return result
if __name__ == "__main__":
import sys
codes = sys.argv[1:] or ["603259", "002594", "00700"]
for c in codes:
r = full_analysis(c)
print(json.dumps(r, ensure_ascii=False, indent=2))
print()