Files
MoFin/multi_timeframe.py
T
知微 44eef95718 fix: 指数K线数据获取
fetch_kline 修复两个bug:
1. _market_prefix 不认识 sh/sz/hk 开头指数代码
2. 指数code自带前缀,API key不要重复拼接
3. fqkline端点同样支持指数,不需要mkline
4. refresh_mtf_cache 加入6大指数自动缓存

上证指数(4163) MA5=4110 MA10=4053 MA20=4070 周线横盘 月线震荡上升
2026-06-22 23:53:31 +08:00

641 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""multi_timeframe.py — 多周期技术分析模块
从腾讯API获取日/周/月K线数据,计算:
- 多周期支撑压力位(日线/周线/月线)
- 移动均线(MA5/10/20/60
- 趋势方向判断(上升/下降/震荡)
- 综合策略调整建议
集成到 strategy_lifecycle.py 中使用。
"""
import json
import os
import urllib.request
import urllib.error
from datetime import datetime, date, timedelta
from typing import Optional
DATA_DIR = "/home/hmo/web-dashboard/data"
HISTORY_PATH = os.path.join(DATA_DIR, "price_history.json")
MTF_CACHE_PATH = os.path.join(DATA_DIR, "multi_tf_cache.json") # 多周期缓存独立存储
# 腾讯API K线端点
KLINE_URL = "http://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={market}{code},{period},,,{count},qfq"
# 腾讯实时行情端点(用于市场前缀判断)
QUOTE_URL = "http://qt.gtimg.cn/q={market}{code}"
def _write_klines_to_db(code: str, daily: list, weekly: list, monthly: list, fundamentals: dict = None):
"""K线数据双写 SQLite(失败不影响缓存写入)"""
try:
from mofin_db import get_conn, init_all_tables, write_klines
conn = get_conn()
init_all_tables(conn)
# 从 stock_profiles.json 获取名称
name = code
try:
import json
profiles_path = os.path.join(DATA_DIR, "stock_profiles.json")
if os.path.exists(profiles_path):
with open(profiles_path, encoding="utf-8") as f:
profiles = json.load(f)
for p in profiles.get("profiles", []):
if p.get("code") == code:
name = p.get("name", code)
break
except Exception:
pass
write_klines(conn, code, name, daily, weekly, monthly, fundamentals)
conn.close()
except Exception:
pass # SQLite 写入失败不影响主流程
def _market_prefix(code: str) -> str:
"""根据代码确定市场前缀"""
raw = str(code).split("_")[0]
# 指数代码:sh/sz/hk开头
if raw.startswith("sh"):
return "sh"
if raw.startswith("sz"):
return "sz"
if raw.startswith("hk"):
return "hk"
if len(raw) == 5 and raw.isdigit():
return "hk"
if raw.startswith("6") or raw.startswith("5"):
return "sh"
return "sz"
def _user_agent() -> dict:
return {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"
}
# 多周期缓存TTL(秒):日K线1小时,周/月K线1天
_KLINE_CACHE_TTL = {"day": 3600, "week": 86400, "month": 86400}
# 模块级缓存:避免每次fetch_kline都重新读/写大文件
_MTF_CACHE_DATA = None # {code: {daily:[], weekly:[], monthly:[], updated_at: float, fundamentals:{}}}
_MTF_CACHE_MTIME = 0 # 文件最后修改时间
def _load_mtf_cache():
"""加载多周期缓存(带模块级缓存,避免频繁读盘)"""
global _MTF_CACHE_DATA, _MTF_CACHE_MTIME
import time
try:
current_mtime = os.path.getmtime(MTF_CACHE_PATH)
if _MTF_CACHE_DATA is not None and current_mtime == _MTF_CACHE_MTIME:
return _MTF_CACHE_DATA
with open(MTF_CACHE_PATH) as f:
_MTF_CACHE_DATA = json.load(f)
_MTF_CACHE_MTIME = current_mtime
return _MTF_CACHE_DATA
except (FileNotFoundError, json.JSONDecodeError, OSError):
_MTF_CACHE_DATA = {}
_MTF_CACHE_MTIME = 0
return {}
def _save_mtf_cache():
"""将模块级缓存写回磁盘"""
global _MTF_CACHE_DATA, _MTF_CACHE_MTIME
if _MTF_CACHE_DATA is None:
return
try:
os.makedirs(os.path.dirname(MTF_CACHE_PATH), exist_ok=True)
with open(MTF_CACHE_PATH, "w") as f:
json.dump(_MTF_CACHE_DATA, f, ensure_ascii=False, indent=2)
import time
_MTF_CACHE_MTIME = os.path.getmtime(MTF_CACHE_PATH) if os.path.exists(MTF_CACHE_PATH) else time.time()
except Exception:
pass
def fetch_kline(code: str, period: str = "day", count: int = 120) -> list:
"""从腾讯API获取K线数据,优先使用本地缓存
Args:
code: 股票代码 (如 "300548")
period: "day" / "week" / "month"
count: 需要多少条
Returns:
list of dict: [{"date":str, "open":float, "close":float,
"high":float, "low":float, "volume":float}, ...]
"""
import time
now = time.time()
# 优先检查本地缓存(模块级,避免重复读盘)
# 注意:缓存中存储的key是'daily'/'weekly'/'monthly',参数period是'day'/'week'/'month'
_PERIOD_MAP = {"day": "daily", "week": "weekly", "month": "monthly"}
cache_data = _load_mtf_cache()
cached = cache_data.get(code, {})
cache_key = _PERIOD_MAP.get(period, period)
cached_klines = cached.get(cache_key, cached.get(period, []))
updated_at = cached.get("updated_at", 0)
if cached_klines and updated_at and (now - updated_at) < _KLINE_CACHE_TTL.get(period, 3600):
return cached_klines
market = _market_prefix(code)
is_index = any(code.startswith(p) for p in ["sh", "sz", "hk"])
# 指数代码已经自带前缀,API直接用code;普通股票需要加market前缀
api_code = code if is_index else f"{market}{code}"
url = f"http://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={api_code},{period},,,{count},qfq"
try:
req = urllib.request.Request(url, headers=_user_agent())
with urllib.request.urlopen(req, timeout=10) as resp:
raw = json.loads(resp.read().decode("utf-8"))
except Exception as e:
return {"error": str(e), "code": code, "period": period}
if not isinstance(raw, dict):
return {"error": f"API returned {type(raw).__name__}", "raw": str(raw)[:200]}
api_data = raw.get("data", {})
if not isinstance(api_data, dict):
return {"error": f"data field is {type(api_data).__name__}", "raw": str(api_data)[:200]}
# 指数代码已经自带前缀(sh000001/sz399001),直接用
# 普通股票代码需要加market前缀(sh600036/sz300750
is_index = any(code.startswith(p) for p in ["sh", "sz", "hk"])
stock_key = code if is_index else f"{market}{code}"
stock_data = api_data.get(stock_key, {})
# 腾讯API的K线字段名: qfqday, qfqweek, qfqmonth
period_key = f"qfq{period}"
klines = stock_data.get(period_key, [])
if not klines:
# 尝试其他字段名
for k in stock_data:
if isinstance(stock_data[k], list) and len(stock_data[k]) > 0:
if isinstance(stock_data[k][0], list) and len(stock_data[k][0]) >= 6:
klines = stock_data[k]
break
result = []
for k in klines:
if len(k) >= 6:
try:
result.append({
"date": str(k[0]),
"open": float(k[1]),
"close": float(k[2]),
"high": float(k[3]),
"low": float(k[4]),
"volume": float(k[5]),
})
except (ValueError, IndexError):
continue
return result
def calc_moving_averages(klines: list, windows: list = [5, 10, 20, 60]) -> dict:
"""计算移动均线
Args:
klines: K线数据(按时间正序或倒序均可,自动处理)
windows: 均线周期列表
Returns:
dict: {ma5: float|None, ma10: float|None, ...}
"""
if not klines:
return {f"ma{w}": None for w in windows}
# 确保按时间正序(旧的在前)
closes = [k["close"] for k in klines]
# 检查是否倒序(最新的在前)
if len(closes) >= 2 and closes[0] > closes[-1]:
closes = list(reversed(closes))
result = {}
for w in windows:
if len(closes) >= w:
result[f"ma{w}"] = round(sum(closes[-w:]) / w, 2)
else:
result[f"ma{w}"] = None
return result
def calc_multi_tf_support_resistance(klines: list, lookback: int = 0) -> dict:
"""基于K线数据计算多周期支撑压力位
使用近期高点和低点作为关键位:
- 强阻力 = 近期最高(或倒数第二高)
- 弱阻力 = 近期中枢上沿
- 弱支撑 = 近期中枢下沿
- 强支撑 = 近期最低(或倒数第二低)
Args:
klines: K线数据
lookback: 取最近多少条(0=全部)
Returns:
dict: {strong_resist, weak_resist, weak_support, strong_support,
high_52w, low_52w, range_pct}
"""
if not klines or len(klines) < 3:
return {}
# 取最近N条(日线看近期,周线/月线看全部)
if lookback <= 0:
lookback = min(len(klines), 20) # 日线默认20天
n = min(len(klines), lookback)
recent = klines[-n:]
# 全量数据(用于52周高低)
all_highs = [k["high"] for k in klines]
all_lows = [k["low"] for k in klines]
highs = [k["high"] for k in recent]
lows = [k["low"] for k in recent]
max_h = max(highs)
min_l = min(lows)
mid = (max_h + min_l) / 2
# 找第二高和第二低作为更稳健的边界
sorted_h = sorted(set(highs), reverse=True)
sorted_l = sorted(set(lows))
strong_resist = sorted_h[0] if sorted_h else max_h
strong_support = sorted_l[0] if sorted_l else min_l
weak_resist = sorted_h[1] if len(sorted_h) > 1 else (max_h + mid) / 2
weak_support = sorted_l[1] if len(sorted_l) > 1 else (min_l + mid) / 2
# 最近20日的振幅比例(判断波动率)
if len(closes := [k["close"] for k in recent]) >= 2:
recent_range = (max_h - min_l) / min_l * 100 if min_l > 0 else 0
else:
recent_range = 0
return {
"strong_resist": round(strong_resist, 2),
"weak_resist": round(weak_resist, 2),
"weak_support": round(weak_support, 2),
"strong_support": round(strong_support, 2),
"high_52w": round(max(all_highs), 2),
"low_52w": round(min(all_lows), 2),
"range_pct": round(recent_range, 1),
}
def assess_trend(klines: list) -> dict:
"""判断趋势方向
Args:
klines: K线数据
Returns:
dict: {trend (up/down/sideways), strength (0~1),
description, ma_trend}
"""
if not klines or len(klines) < 10:
return {"trend": "unknown", "strength": 0, "description": "数据不足"}
closes = [k["close"] for k in klines]
# 确保正序
if len(closes) >= 2 and closes[0] > closes[-1]:
closes = list(reversed(closes))
n = len(closes)
ma20 = sum(closes[-20:]) / 20 if n >= 20 else sum(closes) / n
ma60 = sum(closes[-60:]) / 60 if n >= 60 else None
current = closes[-1]
# 均线多头/空头排列判断
ma5 = sum(closes[-5:]) / 5 if n >= 5 else None
ma10 = sum(closes[-10:]) / 10 if n >= 10 else None
# 趋势判断
up_count = sum(1 for i in range(1, len(closes)) if closes[i] > closes[i-1])
up_ratio = up_count / (len(closes) - 1)
# 价格相对均线位置
above_ma20 = current > ma20 if ma20 else True
if up_ratio > 0.6 and above_ma20:
if ma60 and current > ma60 * 1.2:
trend = "strong_up"
strength = min(1.0, up_ratio + 0.2)
desc = "强势上升"
else:
trend = "up"
strength = up_ratio
desc = "震荡上升"
elif up_ratio < 0.4 and not above_ma20:
if ma60 and current < ma60 * 0.8:
trend = "strong_down"
strength = min(1.0, (1 - up_ratio) + 0.2)
desc = "强势下跌"
else:
trend = "down"
strength = 1 - up_ratio
desc = "震荡下跌"
else:
trend = "sideways"
strength = 0.3
desc = "横盘震荡"
# 均线排列
ma_trend = "unknown"
if ma5 and ma10 and ma20:
if ma5 > ma10 > ma20:
ma_trend = "多头排列"
elif ma5 < ma10 < ma20:
ma_trend = "空头排列"
else:
ma_trend = "粘合/交叉"
return {
"trend": trend,
"strength": round(strength, 2),
"description": desc,
"ma_trend": ma_trend,
"ma5": round(ma5, 2) if ma5 else None,
"ma10": round(ma10, 2) if ma10 else None,
"ma20": round(ma20, 2),
"ma60": round(ma60, 2) if ma60 else None,
"current_above_ma20": current > ma20 if ma20 else None,
}
def full_multi_tf_analysis(code: str) -> dict:
"""完整多周期分析入口
同时获取日/周/月K线,计算:
- 各周期支撑压力位
- 均线系统
- 趋势方向
- 综合策略建议
Args:
code: 股票代码 (如 "300548")
Returns:
dict: 完整分析结果
"""
# 获取三个周期的数据
daily = fetch_kline(code, "day", 120)
weekly = fetch_kline(code, "week", 24)
monthly = fetch_kline(code, "month", 12)
# 如果API失败,检查是否有本地缓存
if isinstance(daily, dict) and "error" in daily:
daily = _load_local_history(code, "daily")
if isinstance(weekly, dict) and "error" in weekly:
weekly = _load_local_history(code, "weekly")
if isinstance(monthly, dict) and "error" in monthly:
monthly = _load_local_history(code, "monthly")
result = {
"code": code,
"analyzed_at": datetime.now().strftime("%Y-%m-%d %H:%M"),
}
# 日线分析
if daily and not (isinstance(daily, dict) and "error" in daily):
result["daily"] = {
"count": len(daily),
"latest": daily[-1] if daily else None,
"support_resistance": calc_multi_tf_support_resistance(daily, lookback=20),
"mas": calc_moving_averages(daily, [5, 10, 20, 60]),
"trend": assess_trend(daily),
}
# 周线分析
if weekly and not (isinstance(weekly, dict) and "error" in weekly):
result["weekly"] = {
"count": len(weekly),
"latest": weekly[-1] if weekly else None,
"support_resistance": calc_multi_tf_support_resistance(weekly, lookback=12),
"mas": calc_moving_averages(weekly, [5, 10]),
"trend": assess_trend(weekly),
}
# 月线分析
if monthly and not (isinstance(monthly, dict) and "error" in monthly):
result["monthly"] = {
"count": len(monthly),
"latest": monthly[-1] if monthly else None,
"support_resistance": calc_multi_tf_support_resistance(monthly, lookback=6),
"mas": calc_moving_averages(monthly, [5]),
"trend": assess_trend(monthly),
}
# 综合策略建议
result["strategy_adjustment"] = _generate_strategy_adjustment(result)
# 写入本地缓存(供离线使用)
_save_local_history(code, daily, weekly, monthly)
return result
def flush_mtf_cache():
"""将模块级缓存显式刷回磁盘(供批量处理后调用)"""
_save_mtf_cache()
def _generate_strategy_adjustment(analysis: dict) -> dict:
"""基于多周期分析生成策略调整建议"""
adj = {
"stop_loss_reference": None,
"take_profit_reference": None,
"trend_alignment": "unknown",
"multi_tf_summary": {},
"cautions": [],
}
daily_trend = analysis.get("daily", {}).get("trend", {})
weekly_trend = analysis.get("weekly", {}).get("trend", {})
monthly_trend = analysis.get("monthly", {}).get("trend", {})
# 均线数据
daily_mas = analysis.get("daily", {}).get("mas", {})
daily_sr = analysis.get("daily", {}).get("support_resistance", {})
weekly_sr = analysis.get("weekly", {}).get("support_resistance", {})
monthly_sr = analysis.get("monthly", {}).get("support_resistance", {})
current = analysis.get("daily", {}).get("latest", {}).get("close", 0)
# 多周期趋势一致性
up_tfs, down_tfs = 0, 0
tf_details = []
for tf_name, tf_data in [("daily", daily_trend), ("weekly", weekly_trend),
("monthly", monthly_trend)]:
t = tf_data.get("trend", "unknown")
desc = tf_data.get("description", "")
ma_t = tf_data.get("ma_trend", "")
tf_details.append(f"{tf_name}:{desc}({ma_t})")
if "up" in t or "strong_up" in t:
up_tfs += 1
elif "down" in t or "strong_down" in t:
down_tfs += 1
adj["multi_tf_summary"] = {
"daily_trend": daily_trend.get("description", "未知"),
"weekly_trend": weekly_trend.get("description", "未知"),
"monthly_trend": monthly_trend.get("description", "未知"),
"daily_ma_trend": daily_trend.get("ma_trend", "未知"),
}
if up_tfs >= 2:
adj["trend_alignment"] = "多周期看多"
elif down_tfs >= 2:
adj["trend_alignment"] = "多周期看空"
elif up_tfs >= 1 and down_tfs >= 1:
adj["trend_alignment"] = "多周期分化"
else:
adj["trend_alignment"] = "震荡/无明显方向"
if not current:
return adj
# ===== 参考止损位(三级递进)=====
# 第一级:MA20(短线交易的生命线)
ma20 = daily_mas.get("ma20")
# 第二级:日线弱支撑(近20天次低点)
daily_ws = daily_sr.get("weak_support")
# 第三级:日线强支撑 / MA60
ma60 = daily_mas.get("ma60")
daily_ss = daily_sr.get("strong_support")
stop_candidates = []
if ma20:
stop_candidates.append(("MA20", ma20, abs(current - ma20) / current * 100))
if daily_ws:
stop_candidates.append(("日弱支撑", daily_ws, abs(current - daily_ws) / current * 100))
if ma60:
stop_candidates.append(("MA60", ma60, abs(current - ma60) / current * 100))
if daily_ss:
stop_candidates.append(("日强支撑", daily_ss, abs(current - daily_ss) / current * 100))
if stop_candidates:
# 选一个合理的止损参考:MA20优先(如果距现价不太近),否则选日弱支撑
best_stop = None
for name, level, dist in stop_candidates:
if level < current: # 止损必须在现价下方
if 2 <= dist <= 15: # 距现价2~15%之间比较合理
best_stop = {"source": name, "level": level,
"distance_pct": round(dist, 2)}
break
if not best_stop:
# 没有2~15%内的,选最近的一个
below = [(n, l, d) for n, l, d in stop_candidates if l < current]
if below:
nearest = min(below, key=lambda x: x[2])
best_stop = {"source": nearest[0], "level": nearest[1],
"distance_pct": round(nearest[2], 2)}
if best_stop:
adj["stop_loss_reference"] = best_stop
# ===== 参考止盈位 =====
take_candidates = []
# 日线阻力
for name, level in [("日弱阻", daily_sr.get("weak_resist")),
("日强阻", daily_sr.get("strong_resist")),
("周强阻", weekly_sr.get("strong_resist")),
("月强阻", monthly_sr.get("strong_resist"))]:
if level and level > current:
dist = (level - current) / current * 100
take_candidates.append((name, level, dist))
if take_candidates:
# 选距现价5~30%内的最高阻力位
best_take = None
for name, level, dist in sorted(take_candidates, key=lambda x: x[1], reverse=True):
if 3 <= dist <= 40:
best_take = {"source": name, "level": level,
"distance_pct": round(dist, 2)}
break
if not best_take:
farthest = max(take_candidates, key=lambda x: x[2])
best_take = {"source": farthest[0], "level": farthest[1],
"distance_pct": round(farthest[2], 2)}
if best_take:
adj["take_profit_reference"] = best_take
# ===== 风险提示 =====
if ma20 and current < ma20:
adj["cautions"].append(f"价格{current}<MA20{ma20},短线转弱")
if ma60 and current < ma60 * 1.05:
if current < ma60:
adj["cautions"].append(f"价格{current}<MA60{ma60},中期趋势转弱")
else:
adj["cautions"].append(f"价格接近MA60{ma60},中期支撑面临考验")
return adj
def _load_local_history(code: str, period: str) -> list:
"""从本地多周期缓存读取历史数据,不修改 price_history.json"""
try:
with open(MTF_CACHE_PATH) as f:
data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return []
stock = data.get(code, {})
return stock.get(period, [])
def _save_local_history(code: str, daily: list, weekly: list, monthly: list):
"""将多周期数据写入模块级缓存(含时间戳),不直接写磁盘"""
import time
global _MTF_CACHE_DATA
cache_data = _load_mtf_cache()
stock = cache_data.get(code, {})
if daily and not (isinstance(daily, dict) and "error" in daily):
stock["daily"] = daily
if weekly and not (isinstance(weekly, dict) and "error" in weekly):
stock["weekly"] = weekly
if monthly and not (isinstance(monthly, dict) and "error" in monthly):
stock["monthly"] = monthly
stock["updated_at"] = time.time() # 缓存时间戳
cache_data[code] = stock
_MTF_CACHE_DATA = cache_data # 更新模块级缓存
# ── SQLite 双写 ──
_write_klines_to_db(code, daily, weekly, monthly, stock.get("fundamentals"))
def batch_update_all(codes: list):
"""批量更新多只股票的多周期数据"""
results = {}
for code in codes:
try:
r = full_multi_tf_analysis(code)
results[code] = {
"status": "ok",
"periods": [k for k in ["daily", "weekly", "monthly"]
if k in r]
}
except Exception as e:
results[code] = {"status": "error", "error": str(e)}
return results
if __name__ == "__main__":
import sys
codes = sys.argv[1:] or ["300548", "600110"]
for code in codes:
r = full_multi_tf_analysis(code)
print(json.dumps(r, ensure_ascii=False, indent=2))
print("-" * 60)