#!/usr/bin/env python3 """technical_analysis.py — 技术面分析模块 v2 基于多日价格数据计算支撑位/压力位: 1. 缓存每日 HLC 到 price_history.json 2. 使用 5 日最高/最低计算枢轴点 3. 结合振幅自动调整区间宽度 使用方式: from technical_analysis import full_analysis result = full_analysis("603259") # 自动识别A股/港股 """ import json import os import urllib.request from datetime import datetime, date # 腾讯API字段索引 F = { "name": 1, "code": 2, "price": 3, "close_yest": 4, "open": 5, "volume": 6, "timestamp": 30, "change": 31, "change_pct": 32, "high": 33, "low": 34, "amplitude": 43, "turnover": 38, "pe": 39, "pb": 46, "limit_up": 47, "limit_down": 48, "avg_price": 51, "inner_vol": 52, "outer_vol": 53, } HISTORY_PATH = "/home/hmo/web-dashboard/data/price_history.json" HISTORY_DAYS = 60 # 使用最近 N 天的 HLC 数据 def _load_history(): """读取价格历史缓存""" try: return json.load(open(HISTORY_PATH)) except (FileNotFoundError, json.JSONDecodeError): return {} def _save_history(h): json.dump(h, open(HISTORY_PATH, "w"), ensure_ascii=False, indent=2) def _market_prefix(code): """根据代码确定腾讯API前缀""" if code.startswith("sh") or code.startswith("sz") or code.startswith("hk"): code = code[2:] if code[2:].isdigit() else code raw = str(code).split("_")[0] if len(raw) == 5 and raw.isdigit(): return "hk" if raw.startswith("6") or raw.startswith("5"): return "sh" return "sz" def get_quote(code): """获取腾讯API行情数据(带60秒缓存)""" import time _cache = get_quote.__dict__.get("_cache", {}) now = time.time() cached = _cache.get(code) if cached and (now - cached["ts"]) < 60: return cached["data"] raw = str(code).split("_")[0] prefix = _market_prefix(code) url = f"http://qt.gtimg.cn/q={prefix}{raw}" try: r = urllib.request.urlopen(url, timeout=5) fields = r.read().decode("gbk").split('"')[1].split("~") except Exception as e: return {"code": code, "error": str(e)} def get(i): try: return float(fields[i]) if fields[i].strip() else None except (IndexError, ValueError): return None today_str = date.today().isoformat() q = { "code": raw, "market": prefix, "name": fields[F["name"]] if len(fields) > F["name"] else code, "price": get(3), "close_yest": get(4), "open": get(5), "high": get(33), "low": get(34), "volume": get(6), "amount": get(37), "change": get(31), "change_pct": get(32), "amplitude": get(43), "turnover_rate": get(38), "pe": get(39), "pb": get(46), "limit_up": get(47), "limit_down": get(48), "avg_price": get(51), "inner_vol": get(52), "outer_vol": get(53), "timestamp": fields[F["timestamp"]] if len(fields) > F["timestamp"] else "", "_date": today_str, } # 写入价格历史缓存(每日一次) h = get(33) # high l = get(34) # low c = get(3) # price / close if h and l and c: history = _load_history() if raw not in history: history[raw] = [] days = history[raw] # 如果今天已有记录,更新(盘中数据更精确) if days and len(days) > 0 and days[-1].get("date") == today_str: days[-1]["high"] = max(days[-1]["high"], h) days[-1]["low"] = min(days[-1]["low"], l) days[-1]["close"] = c # 盘中用最新价,收盘后是收盘价 else: days.append({"date": today_str, "high": h, "low": l, "close": c}) # 只保留最近 HISTORY_DAYS 天 history[raw] = days[-HISTORY_DAYS:] _save_history(history) # 写入60秒缓存 get_quote.__dict__["_cache"] = {**get_quote.__dict__.get("_cache", {}), code: {"ts": now, "data": q}} return q def calc_support_resistance(q): """计算技术支撑位和压力位 — 多日枢轴点算法 使用多个数据源确定有效区间: 1. 当日波幅(H-L) 2. 最近 N 日的最高/最低(从 price_history.json 读取) 3. 价格基数的百分比(对大市值低波动股票有效) """ h = q.get("high") l = q.get("low") c = q.get("price") yc = q.get("close_yest") amplitude = q.get("amplitude") # 当日振幅% code = q.get("code", "") if not all([h, l, c]): return {"error": "数据不足"} # 多日最高/最低(从历史缓存读取) history = _load_history() hist_days = history.get(code, []) multi_high = max(d["high"] for d in hist_days) if hist_days else h multi_low = min(d["low"] for d in hist_days) if hist_days else l # 有效区间 = max(当日波幅, 多日波幅, 价格×5%) daily_range = h - l multi_range = multi_high - multi_low min_range = c * 0.05 # 5%价格基数 effective_range = max(daily_range, multi_range, min_range) # 如果股价接近多日高点(>80%分位),说明在上升趋势中,扩大区间 trend_position = (c - multi_low) / (multi_high - multi_low) if multi_high > multi_low else 0.5 if trend_position > 0.8: # 高位运行,扩大有效区间到价格的8%确保合理空间 effective_range = max(effective_range, c * 0.08) elif trend_position < 0.2: # 低位运行,同样扩大 effective_range = max(effective_range, c * 0.08) # 如果振幅数据可用且振幅较小(<3%),进一步扩大区间确保有效性 if amplitude and amplitude > 0 and amplitude < 3: # 低波动股票用 振幅×3 作为最小范围 amp_based = c * amplitude / 100 * 3 effective_range = max(effective_range, amp_based) # 枢轴点 (Pivot Point) pp = (h + l + c) / 3 # 支撑位 s1 = 2 * pp - h # 弱支撑 s2 = pp - effective_range # 强支撑 # 压力位 r1 = 2 * pp - l # 弱压力 r2 = pp + effective_range # 强压力 # 参考昨收调整 if yc: if yc < s1: s1 = yc if yc > r1: r1 = yc # A股涨停/跌停价作为极端边界 limit_up = q.get("limit_up") limit_down = q.get("limit_down") market = q.get("market", "hk") if market != "hk" and limit_up and limit_down: # 注意:当现价逼近涨停/跌停时,limit不再是有效边界 # 用有效区间判断:如果自然计算的r2/s2在合理范围内不截断 natural_r2 = r2 natural_s2 = s2 # 涨停限制只对距离现价超过2%的强压位生效 if limit_up < r2 and (limit_up - c) / c < 0.02: # 涨停价离现价<2%,说明可能封板,不截断 pass # 使用自然计算的r2 elif limit_up < r2: r2 = limit_up if limit_down > s2 and (c - limit_down) / c < 0.02: pass # 接近跌停,不截断 elif limit_down > s2: s2 = limit_down return { "strong_support": round(s2, 2), "weak_support": round(s1, 2), "pivot": round(pp, 2), "weak_resist": round(r1, 2), "strong_resist": round(r2, 2), "today_high": h, "today_low": l, "multi_high": multi_high, "multi_low": multi_low, "effective_range": round(effective_range, 2), } def analyze_candlestick(q): """判断K线形态""" o = q.get("open") c = q.get("price") h = q.get("high") l = q.get("low") yc = q.get("close_yest") if not all([o, c, h, l]): return {"pattern": "unknown", "sentiment": "neutral"} if c >= o: body = c - o upper = h - c lower = o - l is_green = True else: body = o - c upper = h - o lower = c - l is_green = False total_range = h - l if total_range == 0: return {"pattern": "平盘", "sentiment": "neutral"} body_pct = body / total_range * 100 upper_pct = upper / total_range * 100 lower_pct = lower / total_range * 100 if body_pct < 5: if upper_pct > 60: pattern = "倒T线/射击之星" sentiment = "bearish" elif lower_pct > 60: pattern = "锤子线/T字线" sentiment = "bullish" else: pattern = "十字星" sentiment = "neutral" elif body_pct < 30: if upper_pct > 40 and lower_pct > 40: pattern = "长影星线" sentiment = "neutral" elif upper_pct > 40: pattern = "倒T线/射击之星" sentiment = "bearish" if is_green else "bearish" elif lower_pct > 40: pattern = "锤子线/T字线" sentiment = "bullish" if is_green else "bullish" else: pattern = "小阳线" if is_green else "小阴线" sentiment = "bullish" if is_green else "bearish" else: if upper_pct > 30: pattern = "带上影阳线" if is_green else "带上影阴线" sentiment = "neutral" if is_green else "bearish" elif lower_pct > 30: pattern = "带下影阳线" if is_green else "带下影阴线" sentiment = "bullish" if is_green else "neutral" else: pattern = "光头光脚阳线" if is_green else "光头光脚阴线" sentiment = "bullish" if is_green else "bearish" gap_up = "" gap_down = "" if yc: if o > yc * 1.01: gap_up = "跳空高开" if not is_green: sentiment = "neutral" elif o < yc * 0.99: gap_down = "跳空低开" if is_green: sentiment = "neutral" return { "pattern": pattern, "sentiment": sentiment, "body_pct": round(body_pct, 1), "upper_shadow_pct": round(upper_pct, 1), "lower_shadow_pct": round(lower_pct, 1), "is_green": is_green, "gap": gap_up or gap_down or "无跳空", } def analyze_volume(q): """量价分析""" outer = q.get("outer_vol") inner = q.get("inner_vol") turnover = q.get("turnover_rate") result = {} if outer and inner and (outer + inner) > 0: ratio = outer / (outer + inner) result["buy_sell_ratio"] = round(ratio, 2) if ratio > 0.55: result["volume_signal"] = "主动买盘占优" elif ratio < 0.45: result["volume_signal"] = "主动卖盘占优" else: result["volume_signal"] = "买卖均衡" else: result["volume_signal"] = "数据不足" if turnover: result["turnover_rate"] = turnover return result def full_analysis(code): """完整技术分析(带30秒缓存,避免分钟级波动)""" import time _cache = full_analysis.__dict__.get("_cache", {}) now = time.time() cached = _cache.get(code) if cached and (now - cached["ts"]) < 30: return cached["data"] q = get_quote(code) if not q or "error" in q: return q sr = calc_support_resistance(q) candle = analyze_candlestick(q) vol = analyze_volume(q) # 多周期+均线分析(整合 multi_timeframe) mtf = {} try: from multi_timeframe import full_multi_tf_analysis as _mtf mtf_raw = _mtf(code) if mtf_raw and 'daily' in mtf_raw: d = mtf_raw['daily'] mtf = { 'mas': d.get('mas', {}), 'multi_tf_sr': d.get('support_resistance', {}), 'trend': d.get('trend', {}), } # 周线弱压/弱撑作为中周期参考 if 'weekly' in mtf_raw: w = mtf_raw['weekly'] ws = w.get('support_resistance', {}) mtf['weekly_sr'] = { 'weak_resist': ws.get('weak_resist'), 'weak_support': ws.get('weak_support'), } except Exception: pass # non-critical, graceful degradation result = { "quote": { "name": q["name"], "price": q["price"], "change_pct": q["change_pct"], "open": q["open"], "high": q["high"], "low": q["low"], "close_yest": q["close_yest"], "volume": q["volume"], "amplitude": q["amplitude"], }, "support_resistance": sr, "candlestick": candle, "volume": vol, "multi_tf": mtf, "analyzed_at": datetime.now().strftime("%H:%M"), } # 写入缓存 _cache[code] = {"ts": now, "data": result} full_analysis.__dict__["_cache"] = _cache return result if __name__ == "__main__": import sys codes = sys.argv[1:] or ["603259", "002594", "00700"] for c in codes: r = full_analysis(c) print(json.dumps(r, ensure_ascii=False, indent=2)) print()