#!/usr/bin/env python3 """multi_timeframe.py — 多周期技术分析模块 从腾讯API获取日/周/月K线数据,计算: - 多周期支撑压力位(日线/周线/月线) - 移动均线(MA5/10/20/60) - 趋势方向判断(上升/下降/震荡) - 综合策略调整建议 集成到 strategy_lifecycle.py 中使用。 """ import json import os import urllib.request import urllib.error from datetime import datetime, date, timedelta from typing import Optional DATA_DIR = "/home/hmo/web-dashboard/data" HISTORY_PATH = os.path.join(DATA_DIR, "price_history.json") MTF_CACHE_PATH = os.path.join(DATA_DIR, "multi_tf_cache.json") # 多周期缓存独立存储 # 腾讯API K线端点 KLINE_URL = "http://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={market}{code},{period},,,{count},qfq" # 腾讯实时行情端点(用于市场前缀判断) QUOTE_URL = "http://qt.gtimg.cn/q={market}{code}" def _write_klines_to_db(code: str, daily: list, weekly: list, monthly: list, fundamentals: dict = None): """K线数据双写 SQLite(失败不影响缓存写入)""" try: from mofin_db import get_conn, init_all_tables, write_klines conn = get_conn() init_all_tables(conn) # 从 stock_profiles.json 获取名称 name = code try: import json profiles_path = os.path.join(DATA_DIR, "stock_profiles.json") if os.path.exists(profiles_path): with open(profiles_path, encoding="utf-8") as f: profiles = json.load(f) for p in profiles.get("profiles", []): if p.get("code") == code: name = p.get("name", code) break except Exception: pass write_klines(conn, code, name, daily, weekly, monthly, fundamentals) conn.close() except Exception: pass # SQLite 写入失败不影响主流程 def _market_prefix(code: str) -> str: """根据代码确定市场前缀""" raw = str(code).split("_")[0] # 指数代码:sh/sz/hk开头 if raw.startswith("sh"): return "sh" if raw.startswith("sz"): return "sz" if raw.startswith("hk"): return "hk" if len(raw) == 5 and raw.isdigit(): return "hk" if raw.startswith("6") or raw.startswith("5"): return "sh" return "sz" def _user_agent() -> dict: return { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" } # 多周期缓存TTL(秒):日K线1小时,周/月K线1天 _KLINE_CACHE_TTL = {"day": 3600, "week": 86400, "month": 86400} # 模块级缓存:避免每次fetch_kline都重新读/写大文件 _MTF_CACHE_DATA = None # {code: {daily:[], weekly:[], monthly:[], updated_at: float, fundamentals:{}}} _MTF_CACHE_MTIME = 0 # 文件最后修改时间 def _load_mtf_cache(): """加载多周期缓存(带模块级缓存,避免频繁读盘)""" global _MTF_CACHE_DATA, _MTF_CACHE_MTIME import time try: current_mtime = os.path.getmtime(MTF_CACHE_PATH) if _MTF_CACHE_DATA is not None and current_mtime == _MTF_CACHE_MTIME: return _MTF_CACHE_DATA with open(MTF_CACHE_PATH) as f: _MTF_CACHE_DATA = json.load(f) _MTF_CACHE_MTIME = current_mtime return _MTF_CACHE_DATA except (FileNotFoundError, json.JSONDecodeError, OSError): _MTF_CACHE_DATA = {} _MTF_CACHE_MTIME = 0 return {} def _save_mtf_cache(): """将模块级缓存写回磁盘""" global _MTF_CACHE_DATA, _MTF_CACHE_MTIME if _MTF_CACHE_DATA is None: return try: os.makedirs(os.path.dirname(MTF_CACHE_PATH), exist_ok=True) with open(MTF_CACHE_PATH, "w") as f: json.dump(_MTF_CACHE_DATA, f, ensure_ascii=False, indent=2) import time _MTF_CACHE_MTIME = os.path.getmtime(MTF_CACHE_PATH) if os.path.exists(MTF_CACHE_PATH) else time.time() except Exception: pass def fetch_kline(code: str, period: str = "day", count: int = 120) -> list: """从腾讯API获取K线数据,优先使用本地缓存 Args: code: 股票代码 (如 "300548") period: "day" / "week" / "month" count: 需要多少条 Returns: list of dict: [{"date":str, "open":float, "close":float, "high":float, "low":float, "volume":float}, ...] """ import time now = time.time() # 优先检查本地缓存(模块级,避免重复读盘) # 注意:缓存中存储的key是'daily'/'weekly'/'monthly',参数period是'day'/'week'/'month' _PERIOD_MAP = {"day": "daily", "week": "weekly", "month": "monthly"} cache_data = _load_mtf_cache() cached = cache_data.get(code, {}) cache_key = _PERIOD_MAP.get(period, period) cached_klines = cached.get(cache_key, cached.get(period, [])) updated_at = cached.get("updated_at", 0) if cached_klines and updated_at and (now - updated_at) < _KLINE_CACHE_TTL.get(period, 3600): return cached_klines market = _market_prefix(code) is_index = any(code.startswith(p) for p in ["sh", "sz", "hk"]) # 指数代码已经自带前缀,API直接用code;普通股票需要加market前缀 api_code = code if is_index else f"{market}{code}" url = f"http://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={api_code},{period},,,{count},qfq" try: req = urllib.request.Request(url, headers=_user_agent()) with urllib.request.urlopen(req, timeout=10) as resp: raw = json.loads(resp.read().decode("utf-8")) except Exception as e: return {"error": str(e), "code": code, "period": period} if not isinstance(raw, dict): return {"error": f"API returned {type(raw).__name__}", "raw": str(raw)[:200]} api_data = raw.get("data", {}) if not isinstance(api_data, dict): return {"error": f"data field is {type(api_data).__name__}", "raw": str(api_data)[:200]} # 指数代码已经自带前缀(sh000001/sz399001),直接用 # 普通股票代码需要加market前缀(sh600036/sz300750) is_index = any(code.startswith(p) for p in ["sh", "sz", "hk"]) stock_key = code if is_index else f"{market}{code}" stock_data = api_data.get(stock_key, {}) # 腾讯API的K线字段名: qfqday, qfqweek, qfqmonth period_key = f"qfq{period}" klines = stock_data.get(period_key, []) if not klines: # 尝试其他字段名 for k in stock_data: if isinstance(stock_data[k], list) and len(stock_data[k]) > 0: if isinstance(stock_data[k][0], list) and len(stock_data[k][0]) >= 6: klines = stock_data[k] break result = [] for k in klines: if len(k) >= 6: try: result.append({ "date": str(k[0]), "open": float(k[1]), "close": float(k[2]), "high": float(k[3]), "low": float(k[4]), "volume": float(k[5]), }) except (ValueError, IndexError): continue return result def calc_moving_averages(klines: list, windows: list = [5, 10, 20, 60]) -> dict: """计算移动均线 Args: klines: K线数据(按时间正序或倒序均可,自动处理) windows: 均线周期列表 Returns: dict: {ma5: float|None, ma10: float|None, ...} """ if not klines: return {f"ma{w}": None for w in windows} # 确保按时间正序(旧的在前) closes = [k["close"] for k in klines] # 检查是否倒序(最新的在前) if len(closes) >= 2 and closes[0] > closes[-1]: closes = list(reversed(closes)) result = {} for w in windows: if len(closes) >= w: result[f"ma{w}"] = round(sum(closes[-w:]) / w, 2) else: result[f"ma{w}"] = None return result def calc_multi_tf_support_resistance(klines: list, lookback: int = 0) -> dict: """基于K线数据计算多周期支撑压力位 使用近期高点和低点作为关键位: - 强阻力 = 近期最高(或倒数第二高) - 弱阻力 = 近期中枢上沿 - 弱支撑 = 近期中枢下沿 - 强支撑 = 近期最低(或倒数第二低) Args: klines: K线数据 lookback: 取最近多少条(0=全部) Returns: dict: {strong_resist, weak_resist, weak_support, strong_support, high_52w, low_52w, range_pct} """ if not klines or len(klines) < 3: return {} # 取最近N条(日线看近期,周线/月线看全部) if lookback <= 0: lookback = min(len(klines), 20) # 日线默认20天 n = min(len(klines), lookback) recent = klines[-n:] # 全量数据(用于52周高低) all_highs = [k["high"] for k in klines] all_lows = [k["low"] for k in klines] highs = [k["high"] for k in recent] lows = [k["low"] for k in recent] max_h = max(highs) min_l = min(lows) mid = (max_h + min_l) / 2 # 找第二高和第二低作为更稳健的边界 sorted_h = sorted(set(highs), reverse=True) sorted_l = sorted(set(lows)) strong_resist = sorted_h[0] if sorted_h else max_h strong_support = sorted_l[0] if sorted_l else min_l weak_resist = sorted_h[1] if len(sorted_h) > 1 else (max_h + mid) / 2 weak_support = sorted_l[1] if len(sorted_l) > 1 else (min_l + mid) / 2 # 最近20日的振幅比例(判断波动率) if len(closes := [k["close"] for k in recent]) >= 2: recent_range = (max_h - min_l) / min_l * 100 if min_l > 0 else 0 else: recent_range = 0 return { "strong_resist": round(strong_resist, 2), "weak_resist": round(weak_resist, 2), "weak_support": round(weak_support, 2), "strong_support": round(strong_support, 2), "high_52w": round(max(all_highs), 2), "low_52w": round(min(all_lows), 2), "range_pct": round(recent_range, 1), } def assess_trend(klines: list) -> dict: """判断趋势方向 Args: klines: K线数据 Returns: dict: {trend (up/down/sideways), strength (0~1), description, ma_trend} """ if not klines or len(klines) < 10: return {"trend": "unknown", "strength": 0, "description": "数据不足"} closes = [k["close"] for k in klines] # 确保正序 if len(closes) >= 2 and closes[0] > closes[-1]: closes = list(reversed(closes)) n = len(closes) ma20 = sum(closes[-20:]) / 20 if n >= 20 else sum(closes) / n ma60 = sum(closes[-60:]) / 60 if n >= 60 else None current = closes[-1] # 均线多头/空头排列判断 ma5 = sum(closes[-5:]) / 5 if n >= 5 else None ma10 = sum(closes[-10:]) / 10 if n >= 10 else None # 趋势判断 up_count = sum(1 for i in range(1, len(closes)) if closes[i] > closes[i-1]) up_ratio = up_count / (len(closes) - 1) # 价格相对均线位置 above_ma20 = current > ma20 if ma20 else True if up_ratio > 0.6 and above_ma20: if ma60 and current > ma60 * 1.2: trend = "strong_up" strength = min(1.0, up_ratio + 0.2) desc = "强势上升" else: trend = "up" strength = up_ratio desc = "震荡上升" elif up_ratio < 0.4 and not above_ma20: if ma60 and current < ma60 * 0.8: trend = "strong_down" strength = min(1.0, (1 - up_ratio) + 0.2) desc = "强势下跌" else: trend = "down" strength = 1 - up_ratio desc = "震荡下跌" else: trend = "sideways" strength = 0.3 desc = "横盘震荡" # 均线排列 ma_trend = "unknown" if ma5 and ma10 and ma20: if ma5 > ma10 > ma20: ma_trend = "多头排列" elif ma5 < ma10 < ma20: ma_trend = "空头排列" else: ma_trend = "粘合/交叉" return { "trend": trend, "strength": round(strength, 2), "description": desc, "ma_trend": ma_trend, "ma5": round(ma5, 2) if ma5 else None, "ma10": round(ma10, 2) if ma10 else None, "ma20": round(ma20, 2), "ma60": round(ma60, 2) if ma60 else None, "current_above_ma20": current > ma20 if ma20 else None, } def full_multi_tf_analysis(code: str) -> dict: """完整多周期分析入口 同时获取日/周/月K线,计算: - 各周期支撑压力位 - 均线系统 - 趋势方向 - 综合策略建议 Args: code: 股票代码 (如 "300548") Returns: dict: 完整分析结果 """ # 获取三个周期的数据 daily = fetch_kline(code, "day", 120) weekly = fetch_kline(code, "week", 24) monthly = fetch_kline(code, "month", 12) # 如果API失败,检查是否有本地缓存 if isinstance(daily, dict) and "error" in daily: daily = _load_local_history(code, "daily") if isinstance(weekly, dict) and "error" in weekly: weekly = _load_local_history(code, "weekly") if isinstance(monthly, dict) and "error" in monthly: monthly = _load_local_history(code, "monthly") result = { "code": code, "analyzed_at": datetime.now().strftime("%Y-%m-%d %H:%M"), } # 日线分析 if daily and not (isinstance(daily, dict) and "error" in daily): result["daily"] = { "count": len(daily), "latest": daily[-1] if daily else None, "support_resistance": calc_multi_tf_support_resistance(daily, lookback=20), "mas": calc_moving_averages(daily, [5, 10, 20, 60]), "trend": assess_trend(daily), } # 周线分析 if weekly and not (isinstance(weekly, dict) and "error" in weekly): result["weekly"] = { "count": len(weekly), "latest": weekly[-1] if weekly else None, "support_resistance": calc_multi_tf_support_resistance(weekly, lookback=12), "mas": calc_moving_averages(weekly, [5, 10]), "trend": assess_trend(weekly), } # 月线分析 if monthly and not (isinstance(monthly, dict) and "error" in monthly): result["monthly"] = { "count": len(monthly), "latest": monthly[-1] if monthly else None, "support_resistance": calc_multi_tf_support_resistance(monthly, lookback=6), "mas": calc_moving_averages(monthly, [5]), "trend": assess_trend(monthly), } # 综合策略建议 result["strategy_adjustment"] = _generate_strategy_adjustment(result) # 写入本地缓存(供离线使用) _save_local_history(code, daily, weekly, monthly) return result def flush_mtf_cache(): """将模块级缓存显式刷回磁盘(供批量处理后调用)""" _save_mtf_cache() def _generate_strategy_adjustment(analysis: dict) -> dict: """基于多周期分析生成策略调整建议""" adj = { "stop_loss_reference": None, "take_profit_reference": None, "trend_alignment": "unknown", "multi_tf_summary": {}, "cautions": [], } daily_trend = analysis.get("daily", {}).get("trend", {}) weekly_trend = analysis.get("weekly", {}).get("trend", {}) monthly_trend = analysis.get("monthly", {}).get("trend", {}) # 均线数据 daily_mas = analysis.get("daily", {}).get("mas", {}) daily_sr = analysis.get("daily", {}).get("support_resistance", {}) weekly_sr = analysis.get("weekly", {}).get("support_resistance", {}) monthly_sr = analysis.get("monthly", {}).get("support_resistance", {}) current = analysis.get("daily", {}).get("latest", {}).get("close", 0) # 多周期趋势一致性 up_tfs, down_tfs = 0, 0 tf_details = [] for tf_name, tf_data in [("daily", daily_trend), ("weekly", weekly_trend), ("monthly", monthly_trend)]: t = tf_data.get("trend", "unknown") desc = tf_data.get("description", "") ma_t = tf_data.get("ma_trend", "") tf_details.append(f"{tf_name}:{desc}({ma_t})") if "up" in t or "strong_up" in t: up_tfs += 1 elif "down" in t or "strong_down" in t: down_tfs += 1 adj["multi_tf_summary"] = { "daily_trend": daily_trend.get("description", "未知"), "weekly_trend": weekly_trend.get("description", "未知"), "monthly_trend": monthly_trend.get("description", "未知"), "daily_ma_trend": daily_trend.get("ma_trend", "未知"), } if up_tfs >= 2: adj["trend_alignment"] = "多周期看多" elif down_tfs >= 2: adj["trend_alignment"] = "多周期看空" elif up_tfs >= 1 and down_tfs >= 1: adj["trend_alignment"] = "多周期分化" else: adj["trend_alignment"] = "震荡/无明显方向" if not current: return adj # ===== 参考止损位(三级递进)===== # 第一级:MA20(短线交易的生命线) ma20 = daily_mas.get("ma20") # 第二级:日线弱支撑(近20天次低点) daily_ws = daily_sr.get("weak_support") # 第三级:日线强支撑 / MA60 ma60 = daily_mas.get("ma60") daily_ss = daily_sr.get("strong_support") stop_candidates = [] if ma20: stop_candidates.append(("MA20", ma20, abs(current - ma20) / current * 100)) if daily_ws: stop_candidates.append(("日弱支撑", daily_ws, abs(current - daily_ws) / current * 100)) if ma60: stop_candidates.append(("MA60", ma60, abs(current - ma60) / current * 100)) if daily_ss: stop_candidates.append(("日强支撑", daily_ss, abs(current - daily_ss) / current * 100)) if stop_candidates: # 选一个合理的止损参考:MA20优先(如果距现价不太近),否则选日弱支撑 best_stop = None for name, level, dist in stop_candidates: if level < current: # 止损必须在现价下方 if 2 <= dist <= 15: # 距现价2~15%之间比较合理 best_stop = {"source": name, "level": level, "distance_pct": round(dist, 2)} break if not best_stop: # 没有2~15%内的,选最近的一个 below = [(n, l, d) for n, l, d in stop_candidates if l < current] if below: nearest = min(below, key=lambda x: x[2]) best_stop = {"source": nearest[0], "level": nearest[1], "distance_pct": round(nearest[2], 2)} if best_stop: adj["stop_loss_reference"] = best_stop # ===== 参考止盈位 ===== take_candidates = [] # 日线阻力 for name, level in [("日弱阻", daily_sr.get("weak_resist")), ("日强阻", daily_sr.get("strong_resist")), ("周强阻", weekly_sr.get("strong_resist")), ("月强阻", monthly_sr.get("strong_resist"))]: if level and level > current: dist = (level - current) / current * 100 take_candidates.append((name, level, dist)) if take_candidates: # 选距现价5~30%内的最高阻力位 best_take = None for name, level, dist in sorted(take_candidates, key=lambda x: x[1], reverse=True): if 3 <= dist <= 40: best_take = {"source": name, "level": level, "distance_pct": round(dist, 2)} break if not best_take: farthest = max(take_candidates, key=lambda x: x[2]) best_take = {"source": farthest[0], "level": farthest[1], "distance_pct": round(farthest[2], 2)} if best_take: adj["take_profit_reference"] = best_take # ===== 风险提示 ===== if ma20 and current < ma20: adj["cautions"].append(f"价格{current} list: """从本地多周期缓存读取历史数据,不修改 price_history.json""" try: with open(MTF_CACHE_PATH) as f: data = json.load(f) except (FileNotFoundError, json.JSONDecodeError): return [] stock = data.get(code, {}) return stock.get(period, []) def _save_local_history(code: str, daily: list, weekly: list, monthly: list): """将多周期数据写入模块级缓存(含时间戳),不直接写磁盘""" import time global _MTF_CACHE_DATA cache_data = _load_mtf_cache() stock = cache_data.get(code, {}) if daily and not (isinstance(daily, dict) and "error" in daily): stock["daily"] = daily if weekly and not (isinstance(weekly, dict) and "error" in weekly): stock["weekly"] = weekly if monthly and not (isinstance(monthly, dict) and "error" in monthly): stock["monthly"] = monthly stock["updated_at"] = time.time() # 缓存时间戳 cache_data[code] = stock _MTF_CACHE_DATA = cache_data # 更新模块级缓存 # ── SQLite 双写 ── _write_klines_to_db(code, daily, weekly, monthly, stock.get("fundamentals")) def batch_update_all(codes: list): """批量更新多只股票的多周期数据""" results = {} for code in codes: try: r = full_multi_tf_analysis(code) results[code] = { "status": "ok", "periods": [k for k in ["daily", "weekly", "monthly"] if k in r] } except Exception as e: results[code] = {"status": "error", "error": str(e)} return results if __name__ == "__main__": import sys codes = sys.argv[1:] or ["300548", "600110"] for code in codes: r = full_multi_tf_analysis(code) print(json.dumps(r, ensure_ascii=False, indent=2)) print("-" * 60)