diff --git a/scripts/branch_scanner.py b/scripts/branch_scanner.py index 247e50d..eef80ca 100644 --- a/scripts/branch_scanner.py +++ b/scripts/branch_scanner.py @@ -23,6 +23,10 @@ SCANNER_STATE = "/home/hmo/web-dashboard/data/scanner_state.json" def get_price(code): + # DB 优先 + try: from mofin_db import get_price_from_db; p, _ = get_price_from_db(code); return p if p else 0 + except: pass + # Fallback: 腾讯 mkt = "sh" if code.startswith("6") or code.startswith("5") else "sz" url = f"http://qt.gtimg.cn/q={mkt}{code}" req = Request(url, headers={"User-Agent": "Mozilla/5.0"}) diff --git a/scripts/capital_flow_collector.py b/scripts/capital_flow_collector.py index 7c8295d..18efb91 100644 --- a/scripts/capital_flow_collector.py +++ b/scripts/capital_flow_collector.py @@ -6,14 +6,39 @@ API: push2his.eastmoney.com 个股资金流日线 """ -import json, os, sys, time +import json, os, sys, time, urllib.request from datetime import datetime -from urllib.request import urlopen +from urllib.request import urlopen, Request +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Semaphore DATA_DIR = "/home/hmo/web-dashboard/data" DECISIONS_PATH = f"{DATA_DIR}/decisions.json" CACHE_PATH = f"{DATA_DIR}/capital_flow_cache.json" +UA = "Mozilla/5.0" +# 限速器:最多5个并发,每请求后强制间隔0.3s +RATE_LIMIT = Semaphore(5) +MIN_INTERVAL = 0.3 +_last_req = 0 + +def _rate_limited_request(url): + """带速率限制的HTTP GET,用Semaphore控制并发数""" + global _last_req + with RATE_LIMIT: + elapsed = time.time() - _last_req + if elapsed < MIN_INTERVAL: + time.sleep(MIN_INTERVAL - elapsed) + proxy_handler = urllib.request.ProxyHandler({}) + opener = urllib.request.build_opener(proxy_handler) + req = Request(url, headers={"User-Agent": UA, "Referer": "https://data.eastmoney.com/"}) + try: + resp = opener.open(req, timeout=8) + _last_req = time.time() + return json.loads(resp.read().decode("utf-8")) + except Exception: + return None + # eastmoney secid: 1=上海 0=深圳 def secid(code): code = str(code).strip() @@ -22,30 +47,28 @@ def secid(code): return f"0.{code}" def fetch_flow(code, days=5): - """拉取个股近N日资金流""" + """拉取个股近N日资金流(带限速+代理绕过)""" sid = secid(code) url = f"http://push2his.eastmoney.com/api/qt/stock/fflow/daykline/get?secid={sid}&fields1=f1,f2,f3,f7&fields2=f51,f52,f53,f54,f55,f56,f57&lmt={days}" - try: - resp = urlopen(url, timeout=5) - data = json.loads(resp.read().decode("utf-8")) - klines = data.get("data", {}).get("klines", []) - if not klines: - return None - result = [] - for k in klines: - p = k.split(",") - if len(p) >= 7: - result.append({ - "date": p[0], - "main_net": float(p[1]), # 主力净流入(元) - "super_large": float(p[2]), # 超大单净流入(元) - "large": float(p[3]), # 大单净流入(元) - "medium": float(p[4]), # 中单净流入(元) - "small": float(p[5]), # 小单净流入(元) - }) - return result - except Exception as e: + data = _rate_limited_request(url) + if not data: return None + klines = data.get("data", {}).get("klines", []) + if not klines: + return None + result = [] + for k in klines: + p = k.split(",") + if len(p) >= 7: + result.append({ + "date": p[0], + "main_net": float(p[1]), # 主力净流入(元) + "super_large": float(p[2]), # 超大单净流入(元) + "large": float(p[3]), # 大单净流入(元) + "medium": float(p[4]), # 中单净流入(元) + "small": float(p[5]), # 小单净流入(元) + }) + return result def fetch_flow_intraday(code): """拉取当日分时资金流(用于盘中判断)""" @@ -119,27 +142,40 @@ def main(): codes.add(c) except: pass - + all_flows = {} - - for code in sorted(codes): + + # 并行抓取:ThreadPoolExecutor + 内置限速器(Semaphore 5 + 0.3s间隔) + code_list = sorted(codes) + if not code_list: + print("[capital_flow] 无代码需要采集") + return + + def fetch_one(code): flow = fetch_flow(code, days=5) if flow: analysis = analyze_flow(flow) - all_flows[code] = { + return (code, { "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M"), "flow": flow, "analysis": analysis, - } - time.sleep(0.3) # API限流 - + }) + return (code, None) + + with ThreadPoolExecutor(max_workers=5) as pool: + futures = {pool.submit(fetch_one, c): c for c in code_list} + for f in as_completed(futures): + code, result = f.result() + if result: + all_flows[code] = result + # 写缓存 cache = { "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M"), "stocks": all_flows, } json.dump(cache, open(CACHE_PATH, "w"), indent=2, ensure_ascii=False) - print(f"[capital_flow] {len(all_flows)}只更新完成") + print(f"[capital_flow] {len(all_flows)}/{len(code_list)}只更新完成") if __name__ == "__main__": main() diff --git a/scripts/divergence_detector.py b/scripts/divergence_detector.py index fef8d3c..66fd6c1 100644 --- a/scripts/divergence_detector.py +++ b/scripts/divergence_detector.py @@ -41,7 +41,7 @@ DIVERGENCE_MODERATE = 3.0 # >3% → moderate信号 STREAK_DAYS = 3 # 连涨/连跌3天 → 信号 def fetch_indices(): - """获取所有指数实时数据""" + """获取所有指数实时数据(指数无 DB 缓存,腾讯 API 是唯一源)""" symbols = list(INDEX_CODES.values()) url = f"http://qt.gtimg.cn/q={','.join(symbols)}" try: diff --git a/scripts/hardcode_scanner.py b/scripts/hardcode_scanner.py index 4170e05..65863d1 100644 --- a/scripts/hardcode_scanner.py +++ b/scripts/hardcode_scanner.py @@ -41,7 +41,7 @@ SUSPICIOUS_NUMBERS = [ (r'1手\s*[:=]\s*\d{3,}', '可能的每手股数硬编码'), (r'[><=]\s*0\.[0-9]+', '可能的百分比阈值硬编码'), (r'仓位\s*[:=]\s*\d{3,}', '可能的仓位金额硬编码'), - (r'['\"](?!http|~|\./|\.\./)/home/[^'\"]+['\"]', '可能的文件路径硬编码(应使用环境变量或配置)'), + (r"['\"](?!http|~|\./|\.\./)/home/[^'\"]+['\"]", '可能的文件路径硬编码(应使用环境变量或配置)'), # 扩展点 — meta_growth 在此追加新规则 ] diff --git a/scripts/macro_context_collector.py b/scripts/macro_context_collector.py index f8b7882..ab84ae6 100644 --- a/scripts/macro_context_collector.py +++ b/scripts/macro_context_collector.py @@ -29,34 +29,35 @@ STATE_PATH = DATA_DIR / "macro_risk_state.json" # HIGH: 任何一条匹配 → 立即 HIGH 预警 HIGH_PATTERNS = [ # 全球巨头+核心产业 - r"苹果.*(?:涨价|降价|推迟|取消|禁|制裁|调查|召回|大跌|暴跌)", - r"openai.*(?:推迟|取消|风险|调查|起诉|倒闭|ipo)", - r"英伟达|nvidia.*(?:跌|调查|制裁|推迟|禁令)", - r"台积电.*(?:跌|推迟|取消|地震|火灾|禁)", + r"苹果[^。]*(?:涨价|降价|推迟|取消|禁|制裁|调查|召回|大跌|暴跌)", + r"openai[^。]*(?:推迟|取消|风险|调查|起诉|倒闭|ipo)", + r"(?:英伟达|nvidia)[^。]*(?:跌|调查|制裁|推迟|禁令)", + r"台积电[^。]*(?:跌|推迟|取消|地震|火灾|禁)", r"特斯拉.*(?:暴跌|召回|调查|破产|禁)", # 美联储/央行意外 r"美联储.*(?:意外|紧急|缩表|风暴|警告|超预期|加息\s*50|降息\s*50|紧急\s*(?:会议|声明))", r"美联储.*(?:利率|决议).*(?:超预期|意外|紧急)", r"fed.*(?:emergency|unexpected|surprise|hike|cut)", - # 指数暴跌 - r"指数.*(?:跌幅|暴跌|熔断|闪崩|重挫)", + # 指数暴跌(需 ≥2% 跌幅或使用更强范围词) + r"指数[^。]*?(?:暴跌|熔断|闪崩|重挫)", + r"指数[^。]*?(?:跌幅|下跌)[^。]*?[2-9]%", r"(?:暴跌|重挫|熔断).*[5-9]%", r"熔断|闪崩", # 地缘+贸易 r"关税.*(?:升级|新|报复|制裁)", r"制裁.*(?:新|升级|全面)", - r"战争|开战|入侵|核|导弹.*发射", + r"战争|开战|入侵|核(?:威胁|武器|弹头|试验|攻击|冲突|导弹|战争|潜艇|问题|危机|设施)|导弹[^。]*(?:发射|袭击|攻击)", # 系统性能源 r"原油.*(?:跌破|暴跌|崩盘|断供)", r"石油.*(?:禁运|制裁|断供)", r"能源危机|粮食危机", # 系统金融 r"银行.*(?:倒闭|挤兑|破产|接管|危机)", - r"金融危机|债务危机|违约潮|系统性", + r"金融危机(?:风险|爆发|蔓延|冲击|预警|警示|逼近|担忧|席卷|升级|恐慌|进入|出现|形成|即将|来袭|警报|当前|新一轮|全面|全球性)|债务危机|违约潮|系统性(?:风险|危机)", # AI/科技板块重挫 r"半导体.*(?:暴跌|熔断|崩盘|跌幅)", r"科技股.*(?:暴跌|熔断|崩盘|重挫)", - r"费城半导体|sox.*(?:跌|崩)", + r"(?:费城半导体|sox)[^。]*?(?:跌|崩)", ] # MEDIUM: 累计匹配2条以上 → MEDIUM 预警 @@ -75,6 +76,39 @@ MEDIUM_PATTERNS = [ r"黑天鹅|灰犀牛", ] +# ── 模式完整性校验(防 .pyc 缓存/版本回退) ── +# 直接检查关键特征字符串是否存在于模式中 +# 原理: 旧版有 standalone |核| , 新版有 核(?:威胁 +# 旧版有 银行.*倒闭|挤兑|破产 , 新版有 银行.*(?:倒闭|挤兑|破产 +_PATTERN_CHECKS = { + 8: ["暴跌|熔断|闪崩|重挫"], # index pattern must use strong crash words, not "跌幅" + 9: ["[2-9]%"], # 指数+跌幅 requires ≥2% + 14: ["核(?:威胁", "核威胁|武器|弹头"], # must NOT have standalone 核 + 18: ["倒闭|挤兑|破产"], # bank pattern must have crisis keywords + 19: ["金融危机(?:风险", "危机|债务危机"], # must NOT have standalone 金融危机 +} +_KNOWN_BAD_SIGS = { + # Known stale .pyc signature fragments that indicate wrong version + "指数.*跌幅": "旧版用 .* 跨句匹配且无 ≥2% 阈值", + "|核|": "旧版有独立单字核", + "英伟达|nvidia.*跌": "旧版 alternation 分组错误", + "导弹.*发射": "旧版只匹配发射不匹配袭击", + "|金融危机|": "旧版 standalone 金融危机匹配历史参照", +} +for idx, required_list in _PATTERN_CHECKS.items(): + if not any(req in HIGH_PATTERNS[idx] for req in required_list): + print(f"[MACRO-安全] ⚠️ HIGH_PATTERNS[{idx}] 签名不匹配!") + print(f"[MACRO-安全] 当前: {HIGH_PATTERNS[idx][:100]}") + print(f"[MACRO-安全] 预期应包含: {required_list[0]}") + print(f"[MACRO-安全] 可能原因: .pyc 缓存过期 / 回退到旧版本") + +# 额外扫描:检查是否有已知的旧版签名残留 +_all_patterns_text = "\n".join(HIGH_PATTERNS) +for bad_sig, reason in _KNOWN_BAD_SIGS.items(): + if bad_sig in _all_patterns_text: + print(f"[MACRO-安全] ⚠️ 检测到旧版模式签名 '{bad_sig}' ({reason})") + print(f"[MACRO-安全] .pyc 缓存可能未刷新,当前 HIGH_PATTERNS 可能仍为旧版本") + def ensure_tables(conn): conn.execute(""" CREATE TABLE IF NOT EXISTS macro_raw_news ( diff --git a/scripts/macro_signal_consumer.py b/scripts/macro_signal_consumer.py index 719e6ea..f13a585 100644 --- a/scripts/macro_signal_consumer.py +++ b/scripts/macro_signal_consumer.py @@ -11,7 +11,7 @@ no_agent 模式:有HIGH风险→输出风险摘要 | 无→静默 ↓ 如果 HIGH → 推送到 Dad """ -import sqlite3, json, os +import sqlite3, json, os, sys, time from pathlib import Path from datetime import datetime @@ -20,8 +20,25 @@ DATA = BASE / "data" DB_PATH = DATA / "mofin.db" STATE_PATH = DATA / "macro_risk_state.json" +def db_update(conn, sql, params, max_retries=3): + """幂等DB更新,遇到锁自动重试""" + for attempt in range(max_retries): + try: + conn.execute(sql, params) + conn.commit() + return True + except sqlite3.OperationalError as e: + if "locked" in str(e).lower(): + if attempt < max_retries - 1: + time.sleep(1) + continue + raise + print(f"[MACRO-CONSUMER] DB更新失败(持续锁): {sql}", file=sys.stderr) + return False + def main(): - conn = sqlite3.connect(str(DB_PATH)) + conn = sqlite3.connect(str(DB_PATH), timeout=10) + conn.execute("PRAGMA busy_timeout=5000") conn.row_factory = sqlite3.Row # 读取未处理的 macro_watch 信号 @@ -43,14 +60,28 @@ def main(): conn.close() return # SILENT - # 聚合风险等级 + # 聚合风险等级(考虑修正覆盖信号) levels = {"宏观-WATCH_HIGH": "high", "宏观-WATCH_MEDIUM": "medium", "宏观-WATCH_INFO": "info"} highest = "info" all_summaries = [] + def _effective_level(sentiment, summary): + """修正覆盖信号取修正后的级别,不按原始sentiment算""" + if "修正覆盖" in (summary or ""): + s = (summary or "").lower() + # 明确说零风险/正面/利好 → info + if any(kw in s for kw in ["零风险", "正面利好", "正面进展", "非风险", "利好"]): + return "info" + # 明确说MEDIUM → medium + if "medium" in s or "中风险" in s: + return "medium" + # 修正覆盖HIGH → 默认降为medium(不保留原始HIGH) + return "medium" + return levels.get(sentiment, "info") + for r in rows: sentiment = r["overall_sentiment"] - lv = levels.get(sentiment, "info") + lv = _effective_level(sentiment, r["summary"]) if lv == "high": highest = "high" elif lv == "medium" and highest != "high": @@ -72,10 +103,9 @@ def main(): } STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2)) - # 标记为已处理 + # 标记为已处理(含重试) for r in rows: - conn.execute("UPDATE signal_news SET processed=1 WHERE id=?", (r["id"],)) - conn.commit() + db_update(conn, "UPDATE signal_news SET processed=1 WHERE id=?", (r["id"],)) conn.close() # no_agent 输出(有 HIGH 才主动出声) diff --git a/scripts/mofin_collect.py b/scripts/mofin_collect.py new file mode 100644 index 0000000..a8d1aff --- /dev/null +++ b/scripts/mofin_collect.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +"""mofin_collect.py — MoFin 数据采集链 + +每轮盯盘 cron 前运行,顺序执行: + 1. market_watch — 拉90个行业板块数据(9:30前跳过,市场未开) + 2. trend_detector — 检测17种信号(依赖板块数据,同跳) + 3. mofin_news — 搜新闻+小果分析 + 4. stock_quote — 所有持仓最新行情(CRITICAL: LLM唯一价格源) +""" + +import subprocess, sys, time +from pathlib import Path +from datetime import datetime + +BASE = Path(__file__).parent.parent if "hermes" in str(Path(__file__).resolve()) else Path(__file__).parent + +now = datetime.now() +market_open = (now.hour >= 9 and now.minute >= 30) or now.hour >= 10 + +# 步骤1-3: 行业/新闻数据 +SCRIPTS = [] +if market_open: + SCRIPTS.append(("market_watch.py", 60)) + SCRIPTS.append(("trend_detector.py", 60)) +else: + print(f"[{now.strftime('%H:%M')}] 市场未开盘(9:30),跳过板块采集", flush=True) + +SCRIPTS.append(("mofin_news.py", 50)) + +for script, timeout in SCRIPTS: + path = BASE / script + if not path.exists(): + path = Path("/home/hmo/MoFin") / script + print(f"--- {script} ---", flush=True) + start = time.time() + try: + result = subprocess.run( + [sys.executable, str(path)], + capture_output=True, text=True, timeout=timeout + ) + elapsed = time.time() - start + if result.returncode == 0: + print(f"OK ({elapsed:.0f}s)", flush=True) + if result.stdout.strip(): + for line in result.stdout.strip().split("\n")[-3:]: + print(f" {line}", flush=True) + else: + print(f"FAIL ({elapsed:.0f}s): {result.stderr[:200]}", flush=True) + except subprocess.TimeoutExpired: + print(f"TIMEOUT ({timeout}s)", flush=True) + except Exception as e: + print(f"ERROR: {e}", flush=True) + +# ── 步骤4: 个股行情注入(唯一权威价格源)── +# 所有持仓最新行情,注入到 LLM context +# LLM 禁止自行调用原始API解析价格 +PRICE_SCRIPT = BASE / "stock_quote.py" +if not PRICE_SCRIPT.exists(): + PRICE_SCRIPT = Path("/home/hmo/MoFin/scripts/stock_quote.py") +if PRICE_SCRIPT.exists(): + print("--- stock_quote.py ---", flush=True) + try: + result = subprocess.run( + [sys.executable, str(PRICE_SCRIPT), "--all-holdings"], + capture_output=True, text=True, timeout=30 + ) + if result.returncode == 0 and result.stdout.strip(): + lines = [l for l in result.stdout.strip().split("\n") if l.strip()] + print(f"OK ({len(lines)}只持仓)", flush=True) + for line in lines[:50]: + print(f" {line}", flush=True) + else: + print(f"WARN: stock_quote stderr={result.stderr[:100]}", flush=True) + except Exception as e: + print(f"WARN: stock_quote skipped ({e})", flush=True) +else: + print("WARN: stock_quote.py not found", flush=True) + +print("采集链完成", flush=True) diff --git a/scripts/price_monitor.py b/scripts/price_monitor.py new file mode 100644 index 0000000..4a81ffe --- /dev/null +++ b/scripts/price_monitor.py @@ -0,0 +1,788 @@ +#!/usr/bin/env python3 +"""price_monitor.py — 高频价格监控脚本(批量版) +规则:进入区间报一次,离开区间报一次,中间不重复。 +每次运行时一次性刷新所有持仓+自选股的实时价。 +""" +import json +import urllib.request +import os +import sys +import time +from datetime import datetime + +# ── MoFin unified model ────────────────────────────────────────────── +sys.path.insert(0, "/home/hmo/MoFin") +from mo_models import is_hk_stock, get_hk_rate, calc_total_assets, calc_total_mv, calc_position_pct +from mofin_db import get_conn, write_holdings_batch, write_portfolio_summary, write_price_event, write_watchlist_stock + +DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" +PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" +WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json" +BREACH_PATH = "/home/hmo/.hermes/zone_breach.json" +STATE_PATH = os.path.expanduser("~/.hermes/price_trigger_state.json") +EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json" + +# 策略重评依赖(技术面驱动,非机械百分比) +sys.path.insert(0, "/home/hmo/web-dashboard") +try: + from strategy_lifecycle import reassess_strategy + HAS_REASSESS = True +except ImportError: + HAS_REASSESS = False + +try: + HK_RATE = get_hk_rate() +except Exception: + HK_RATE = 0.87 # ultimate fallback + +# 分支系统与情景检测 +try: + sys.path.insert(0, '/home/hmo/MoFin') + from strategy_tree import detect_scenario, evaluate_branches + HAS_TREE = True +except Exception: + HAS_TREE = False + def detect_scenario(): return {} + def evaluate_branches(*a, **kw): return [] + +# 情景缓存(每次run_once刷新) +_SCENARIO_CACHE = {} +_BRANCH_CACHE = {} # code -> branches list + +UA = "Mozilla/5.0" + +# ── 批量拉取价格 ────────────────────────────────────────────────────────── + +def fetch_all_prices(codes): + """腾讯批量行情API:仅用于A股(沪市/深市) + A股:sh600110 / sz000001 + 港股已迁移至 fetch_hk_eastmoney()(东方财富实时行情) + 返回 {code: (price, change, change_pct)} + """ + if not codes: + return {} + + # 只处理A股(6位代码),港股走东方财富 + a_codes = [c for c in codes if len(str(c).strip()) == 6] + if not a_codes: + return {} + + symbols = [] + code_map = {} + for code in a_codes: + code_s = str(code).strip() + if code_s.startswith(('5', '6', '9')): + sym = f"sh{code_s}" + else: + sym = f"sz{code_s}" + symbols.append(sym) + code_map[sym] = code_s + + url = f"http://qt.gtimg.cn/q={','.join(symbols)}" + try: + req = urllib.request.Request(url, headers={"User-Agent": UA}) + with urllib.request.urlopen(req, timeout=10) as r: + text = r.read().decode("gbk") + except Exception as e: + print(f"⚠️ 腾讯A股拉取失败: {e}", file=sys.stderr) + return {} + + results = {} + for line in text.strip().split("\n"): + line = line.strip() + if not line or "=" not in line: + continue + try: + raw_value = line.split("=", 1)[1].strip().strip('"').strip(";") + fields = raw_value.split("~") + if len(fields) < 6: + continue + sym = line.split("=", 1)[0].strip().lstrip("v_") + orig_code = code_map.get(sym) + if not orig_code: + continue + price = float(fields[3]) if fields[3] else 0 + prev_close = float(fields[4]) if fields[4] else 0 + change = price - prev_close if prev_close > 0 else 0 + change_pct = fields[32] if len(fields) > 32 and fields[32] else "0" + results[orig_code] = (price, change, change_pct) + except (ValueError, IndexError): + continue + + return results + + +# ── 港股实时行情(新浪财经批量版,实时,无延迟)───────────────────────────── + +def fetch_hk_sina_batch(codes): + """新浪财经港股批量实时行情 — 一次HTTP请求获取全部港股。 + + 新浪港股API(hq.sinajs.cn)支持批量查询,返回实时数据。 + 对比东财逐股查询(0.2s间隔×17只=3.4s),新浪1次请求搞定。 + + API: https://hq.sinajs.cn/list=hk00700,hk09988 + 格式: hq_str_hk00700="TENCENT,腾讯控股,当前价,昨收,开盘,最高,最低,涨跌额,涨跌幅,..." + + 返回 {code: (price, change, change_pct)} + """ + if not codes: + return {} + + hk_codes = [str(c).strip() for c in codes if len(str(c).strip()) <= 5] + if not hk_codes: + return {} + + symbols = [f"hk{c}" for c in hk_codes] + url = f"https://hq.sinajs.cn/list={','.join(symbols)}" + + try: + # 新浪要求有 Referer,且需绕过系统代理(某些环境下东财/新浪走代理会断连) + proxy_handler = urllib.request.ProxyHandler({}) + opener = urllib.request.build_opener(proxy_handler) + req = urllib.request.Request(url, headers={ + "User-Agent": "Mozilla/5.0", + "Referer": "https://finance.sina.com.cn", + }) + with opener.open(req, timeout=10) as r: + text = r.read().decode("gbk") + except Exception as e: + print(f"⚠️ 新浪港股批量拉取失败: {e}", file=sys.stderr) + return {} + + results = {} + for line in text.strip().split("\n"): + line = line.strip() + if "=" not in line: + continue + try: + code = line.split("=", 1)[0].replace("hq_str_hk", "").replace("var ", "").strip() + raw = line.split("=", 1)[1].strip().strip('"').strip(";") + fields = raw.split(",") + if len(fields) < 9: + continue + price = float(fields[2]) if fields[2] else 0 + prev_close = float(fields[3]) if fields[3] else 0 + change_amt = float(fields[7]) if fields[7] else 0 + change_pct = fields[8] if fields[8] else "0" + # 新浪 field[2] 可能非实时最新价,用 prev_close + change 计算更准确 + if prev_close > 0 and abs(change_amt) > 0: + price = round(prev_close + change_amt, 2) + change = round(change_amt, 2) + if price > 0: + results[code] = (price, change, change_pct) + except (ValueError, IndexError): + continue + + return results + + +# ── 港股备用通道(东方财富逐股 + 腾讯15min延迟)─────────────────────────── + +def fetch_hk_eastmoney_fallback(codes): + """东方财富港股实时行情(备用通道),逐股查询、间隔1秒避免限流。 + + FTP 说明:港股限流严重,不适合主通道,降级为备用。 + 建议用上面的 fetch_hk_sina_batch() 做主通道。 + + 返回 {code: (price, change, change_pct)} + Fallback: 仍失败时回退到腾讯 qt.gtimg.cn(15分钟延迟) + """ + if not codes: + return {} + + hk_codes = [str(c).strip() for c in codes if len(str(c).strip()) <= 5] + if not hk_codes: + return {} + + results = {} + + # 东方财富逐股查询,1秒间隔避免限流 + for code in hk_codes: + try: + url = (f"https://push2.eastmoney.com/api/qt/stock/get" + f"?secid=116.{code}" + f"&fields=f43,f170,f60,f57,f58" + f"&fltt=2") + proxy_handler = urllib.request.ProxyHandler({}) + opener = urllib.request.build_opener(proxy_handler) + req = urllib.request.Request(url, headers={ + "User-Agent": UA, + "Referer": "https://quote.eastmoney.com/", + }) + with opener.open(req, timeout=5) as r: + resp = json.loads(r.read().decode("utf-8")) + + if resp.get("rc") != 0: + continue + item = resp.get("data", {}) + if not item: + continue + price = float(item.get("f43", 0)) if item.get("f43") else 0 + prev_close = float(item.get("f60", 0)) if item.get("f60") else 0 + change = round(price - prev_close, 2) if prev_close > 0 else 0 + change_pct = str(item.get("f170", "0")) + if price > 0: + results[code] = (price, change, change_pct) + time.sleep(1.0) # 1秒间隔,大幅降低限流概率 + except Exception as e: + print(f" [东财备用 {code}] {e}", file=sys.stderr) + continue + + # Fallback: 腾讯 qt.gtimg.cn(15分钟延迟) + missing = [c for c in hk_codes if c not in results] + if missing: + try: + fallback = _fetch_hk_tencent_fallback(missing) + results.update(fallback) + except Exception: + pass + + return results + + +def _fetch_hk_tencent_fallback(codes): + """腾讯港股行情(15分钟延迟,仅作 fallback)""" + symbols = [f"hk{c}" for c in codes] + url = f"http://qt.gtimg.cn/q={','.join(symbols)}" + req = urllib.request.Request(url, headers={"User-Agent": UA}) + with urllib.request.urlopen(req, timeout=10) as r: + text = r.read().decode("gbk") + + code_map = {f"hk{c}": c for c in codes} + results = {} + for line in text.strip().split("\n"): + if "=" not in line: + continue + try: + raw = line.split("=", 1)[1].strip().strip('"').strip(";") + fields = raw.split("~") + if len(fields) < 6: + continue + sym = line.split("=", 1)[0].strip().lstrip("v_") + orig = code_map.get(sym) + if not orig: + continue + price = float(fields[3]) if fields[3] else 0 + prev_close = float(fields[4]) if fields[4] else 0 + change = price - prev_close if prev_close > 0 else 0 + change_pct = fields[32] if len(fields) > 32 and fields[32] else "0" + results[orig] = (price, change, change_pct) + except (ValueError, IndexError): + continue + return results + + +def refresh_data_prices(): + """一次性刷新portfolio.json和watchlist.json的所有实时价""" + all_codes = set() + + # 收集所有需要拉取的代码 + try: + pf = json.load(open(PORTFOLIO_PATH)) + for s in pf.get('holdings', []): + all_codes.add(s['code']) + except: + pf = {"holdings": []} + + try: + wl = json.load(open(WATCHLIST_PATH)) + for s in wl.get('stocks', []): + all_codes.add(s['code']) + except: + wl = {"stocks": []} + + if not all_codes: + return 0 + + # 分批拉取:A股走腾讯(实时) + 港股走新浪批量(实时,无限流) + all_list = list(all_codes) + prices = fetch_all_prices(all_list) # A股(腾讯,实时) + hk_prices = fetch_hk_sina_batch(all_list) # 港股(新浪批量,实时) + # 新浪未覆盖的走备用通道(东财逐股→腾讯15min延迟) + hk_codes_missing = [c for c in all_list if len(str(c).strip()) <= 5 and c not in hk_prices] + if hk_codes_missing: + fallback = fetch_hk_eastmoney_fallback(hk_codes_missing) + hk_prices.update(fallback) + prices.update(hk_prices) + updated = 0 + + # 保存全量实时价快照(供报告管道消费,确保分析用最新数据) + try: + live = {"updated_at": datetime.now().isoformat(), "prices": {}} + for code in all_codes: + if code in prices: + p, c, chg = prices[code] + live["prices"][code] = {"price": p, "change_pct": chg} + json.dump(live, open("/home/hmo/web-dashboard/data/live_prices.json", "w"), indent=2) + except Exception: + pass + + # 更新portfolio(只在价格变化时写入,避免触发文件变更通知) + changed = False + for s in pf.get('holdings', []): + if s['code'] in prices: + price, _, change_pct = prices[s['code']] + if price > 0: + # 港股:API返回HKD,需转RMB + if is_hk_stock(s['code']): + price = round(price * HK_RATE, 2) + old = s.get('price', 0) + if abs(old - price) > 0.001: + s['price'] = round(price, 2) + s['change_pct'] = float(change_pct) if change_pct else 0 + updated += 1 + changed = True + if changed: + pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M') + pf['total_mv'] = calc_total_mv(pf.get('holdings', [])) + pf['total_assets'] = calc_total_assets(pf) + pf['position_pct'] = calc_position_pct(pf) + # DB 写入(替代 json.dump,强制币种约束) + try: + conn = get_conn() + write_holdings_batch(conn, pf['holdings']) + write_portfolio_summary(conn, pf) + conn.close() + except Exception as e: + print(f" [DB写入失败] {e}", flush=True) + # 保留 JSON 副本作为冷备 + json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2) + elif pf.get('updated_at'): + try: + last_ts = datetime.strptime(pf['updated_at'], '%Y-%m-%d %H:%M') + if (datetime.now() - last_ts).total_seconds() > 600: + pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M') + json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2) + except: + pass + + # 更新watchlist(只在价格变化时写入) + changed = False + for s in wl.get('stocks', []): + if s['code'] in prices: + price, _, change_pct = prices[s['code']] + if price > 0: + # 港股:API返回HKD,需转RMB + if is_hk_stock(s['code']): + price = round(price * HK_RATE, 2) + old = s.get('price', 0) + if abs(old - price) > 0.001: + s['price'] = round(price, 2) + s['change_pct'] = float(change_pct) if change_pct else 0 + updated += 1 + changed = True + if changed: + wl['updated_at'] = datetime.now().isoformat() + # DB 写入(替代 json.dump) + try: + conn = get_conn() + for s in wl.get('stocks', []): + s['currency'] = 'CNY' # 自选股价格统一CNY + write_watchlist_stock(conn, s) + conn.close() + except Exception as e: + print(f" [DB watchlist写入失败] {e}", flush=True) + # 保留 JSON 冷备 + json.dump(wl, open(WATCHLIST_PATH, 'w'), ensure_ascii=False, indent=2) + + # --- 汇总值重算(使用 mo_models 唯一公式)--- + try: + live_market_value = calc_total_mv(pf.get('holdings', [])) + old_mv = pf.get('total_mv', 0) + + if abs(old_mv - live_market_value) > 0.01: + pf['total_mv'] = round(live_market_value, 2) + + pf['total_assets'] = calc_total_assets(pf) + if pf['total_assets'] > 0: + pf['position_pct'] = calc_position_pct(pf) + pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M') + # DB 写入 + try: + conn = get_conn() + write_portfolio_summary(conn, pf) + conn.close() + except Exception as e: + print(f" [DB汇总写入失败] {e}", flush=True) + # JSON 冷备 + json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2) + except Exception as e: + print(f" [汇总重算失败] {e}", flush=True) + # --- 结束汇总重算 --- + + return updated + + +# ── 分支系统辅助函数 ────────────────────────────────────────────────────── + +def _branch_alert_suffix(code, price, shares=0, cost=0): + """返回分支信息后缀:「 | 情景→动作」""" + if not HAS_TREE or not _SCENARIO_CACHE.get('id'): + return "" + try: + sc_id = _SCENARIO_CACHE['id'] + results = evaluate_branches(code, sc_id, price, shares, cost) + for r in results: + if r.get('applicable'): + _record_branch_trigger(code, r.get('branch_id',''), price) + branch_action = r.get('action_type', r.get('action', 'hold')) + return f" | {sc_id}→{branch_action}" + except Exception: + pass + return "" + + +def _record_branch_trigger(code, branch_id, price): + """记录分支触发事件(自成长:trigger_count+1)""" + try: + raw = json.load(open(DECISIONS_PATH)) + for d in raw.get('decisions', []): + if d.get('code') == code and d.get('strategy_tree',{}).get('branches'): + for b in d['strategy_tree']['branches']: + if b['id'] == branch_id: + b.setdefault('trigger_count', 0) + b['trigger_count'] += 1 + b['last_trigger_price'] = round(price, 2) + b['last_triggered'] = datetime.now().isoformat() + break + json.dump(raw, open(DECISIONS_PATH, 'w'), ensure_ascii=False, indent=2) + except Exception: + pass + + +# ── 区间偏离检测 ────────────────────────────────────────────────────────── + +def load_state(): + try: + with open(STATE_PATH) as f: + return json.load(f) + except: + return {} + +def save_state(state): + os.makedirs(os.path.dirname(STATE_PATH), exist_ok=True) + with open(STATE_PATH, 'w') as f: + json.dump(state, f, ensure_ascii=False, indent=2) + +def load_breaches(): + try: + with open(BREACH_PATH) as f: + return json.load(f) + except: + return {} + +def save_breaches(data): + os.makedirs(os.path.dirname(BREACH_PATH), exist_ok=True) + with open(BREACH_PATH, 'w') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + +def load_events(): + try: + with open(EVENTS_PATH) as f: + return json.load(f) + except: + return {"events": []} + + +def save_events(events): + os.makedirs(os.path.dirname(EVENTS_PATH), exist_ok=True) + with open(EVENTS_PATH, 'w') as f: + json.dump(events, f, ensure_ascii=False, indent=2) + + +def record_event(code, name, event_type, price, trigger_value, event_label=""): + """记录一次价格触发事件到 price_events.json + SQLite""" + events = load_events() + now = datetime.now().isoformat() + events["events"].append({ + "code": code, + "name": name, + "event_type": event_type, # entry_zone, stop_loss, take_profit, exit_zone + "price": round(price, 2), + "trigger_value": trigger_value, + "event_label": event_label, + "timestamp": now, + "date": datetime.now().strftime("%Y-%m-%d"), + }) + # 保留最近10000条 + events["events"] = events["events"][-10000:] + save_events(events) + + # ── SQLite 双写 ── + try: + from mofin_db import get_conn, init_all_tables, write_price_event + conn = get_conn() + init_all_tables(conn) + write_price_event(conn, code, name, event_type, price, trigger_value, event_label) + conn.close() + except Exception: + pass # SQLite 写入失败不影响主流程 + + +def get_trigger_zones(d): + """返回该decision所有可监控的区间列表,从顶层字段读取""" + zones = [] + is_holding = d.get('shares', 0) > 0 + # 买入区间(自选和持仓都监控) + el = d.get("entry_low", 0) + eh = d.get("entry_high", 0) + if el and eh and float(el) > 0 and float(eh) > 0: + try: + zones.append(("entry_zone", "买入区间", float(el), float(eh))) + except: + pass + # 止损+止盈(只有持仓才监控,自选无意义) + if is_holding: + sl = d.get("stop_loss", 0) + if sl and float(sl) > 0: + try: + zones.append(("stop_loss", "止损", 0, float(sl))) + except: + pass + tp = d.get("take_profit", 0) + if tp and float(tp) > 0: + try: + zones.append(("take_profit_zone", "止盈区间", 0, float(tp))) + except: + pass + return zones + + +def run_once(round_label=""): + """执行一轮完整的监控流程""" + global _SCENARIO_CACHE, _BRANCH_CACHE + label = f" [{round_label}]" if round_label else "" + start = time.time() + + # 刷新情景与分支缓存(每轮更新) + _SCENARIO_CACHE = detect_scenario() if HAS_TREE else {} + _BRANCH_CACHE = {} + try: + raw = json.load(open(DECISIONS_PATH)) + for d in raw.get('decisions', []): + tree = d.get('strategy_tree', {}) + if tree and tree.get('branches'): + _BRANCH_CACHE[d['code']] = tree['branches'] + except Exception: + pass + + # === 第一步:一次性刷新所有价格 === + refreshed = refresh_data_prices() + + # === 第二步:检查触发条件 === + try: + with open(DECISIONS_PATH) as f: + dec = json.load(f) + except: + print(f"❌{label} 无法读取decisions.json", file=sys.stderr) + return + + active = [d for d in dec.get("decisions", []) if d.get("status") in ("active", "updated")] + state = load_state() + outputs = [] + state_updated = False + + # 收集所有需要检查的代码 + check_codes = set() + for d in active: + if get_trigger_zones(d): + check_codes.add(d["code"]) + + # 批量拉取这些股票的价格 + prices = fetch_all_prices(list(check_codes)) + + for d in active: + code = d["code"] + + zones = get_trigger_zones(d) + if not zones: + continue + + price_info = prices.get(code) + if not price_info: + continue + price, _, _ = price_info + if price == 0: + continue + + name = d.get("name", code) + if code not in state: + state[code] = {} + + for key, label, lo, hi in zones: + in_zone = lo <= price <= hi + prev_in_zone = state[code].get(key, None) + + if in_zone and prev_in_zone != True: + if key == "stop_loss": + branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0)) + outputs.append(f"⚠️ {name}({code}) {price} → 跌破止损{hi}!{branch_sfx}") + record_event(code, name, "stop_loss", price, str(hi)) + else: + extra = "" + if "_price" in key: + batch_shares = d.get(key.replace("_price", "_shares"), "") + action = d.get(key.replace("_price", "_action"), "") + if batch_shares: + extra = f" {action}{batch_shares}股" if action else f" {batch_shares}股" + elif key in ("take_profit_zone",): + act = d.get("take_profit_action", "") + if act: + extra = f"({act})" + branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0)) + outputs.append(f"⚡ {name}({code}) {price} → 进入{label}{lo}~{hi}{extra}{branch_sfx}") + record_event(code, name, "entry_zone", price, f"{lo}~{hi}", label) + state[code][key] = True + state_updated = True + + elif not in_zone and prev_in_zone == True: + if key != "stop_loss": + outputs.append(f"📌 {name}({code}) {price} → 离开{label}{lo}~{hi}") + state[code][key] = False + state_updated = True + + # === 第三步:买入区偏离检测 + 自动重评 === + reassesed_codes = [] + for d in active: + code = d["code"] + name = d.get("name", code) + price_info = prices.get(code) + if not price_info: + continue + price, _, _ = price_info + if price == 0: + continue + + # 从 decisions.json 中读取 analysis 的买入区 + entry_low = d.get("entry_low", 0) + entry_high = d.get("entry_high", 0) + if not entry_low or not entry_high: + continue + + in_buy_zone = entry_low <= price <= entry_high + prev_in_buy_zone = state.get(code, {}).get("__buy_zone", None) + + # 状态变化时才触发:True→False离区 或 False→True进区 + # [2026-07-01 fix] prev_in_buy_zone is None(新加自选首次检测) + # 也要触发——否则新自选全程不走重评,timing_signal卡在初始值 + if in_buy_zone and (prev_in_buy_zone == False or prev_in_buy_zone is None): + # 进入买入区 → 触发技术面重评,更新止损/止盈/信号 + outputs.append(f"🔄 {name}({code}) {price} → 重新进入买入区{entry_low}~{entry_high},触发技术面重评") + do_reassess = True + elif not in_buy_zone and prev_in_buy_zone == True: + # 离开买入区 → 立即重评,更新止损/止盈/区间 + outputs.append(f"🔄 {name}({code}) {price} → 离开买入区{entry_low}~{entry_high},立即技术面重评") + do_reassess = True + else: + do_reassess = False + + if do_reassess and HAS_REASSESS: + try: + cost = d.get("cost", 0) or 0 + shares = d.get("shares", 0) or 0 + profit_pct = (price - cost) / cost * 100 if cost else 0 + is_deep_loss = profit_pct < -20 + sentiment = "neutral" + if d.get("tech_snapshot"): + if "bearish" in d["tech_snapshot"]: + sentiment = "bearish" + elif "bullish" in d["tech_snapshot"]: + sentiment = "bullish" + + # 调用技术面驱动重评(非机械百分比) + result = reassess_strategy( + code, name, price, cost, shares, + current_action=d.get("action", ""), + volume_signal="中性", sentiment=sentiment, + ) + outputs.append(f" 📊 新策略: 损{result['stop_loss']} 盈{result['take_profit']} 区{result['entry_low']}~{result['entry_high']} RR={result['rr_ratio']}") + reassesed_codes.append(code) + except Exception as e: + outputs.append(f" ⚠️ 重评失败: {e}") + + # 更新买入区状态 + if "__buy_zone" not in state.get(code, {}): + if code not in state: + state[code] = {} + state[code]["__buy_zone"] = in_buy_zone + state_updated = True + + # 如果有重评过的股票,更新 decisions.json + if reassesed_codes and HAS_REASSESS: + try: + # 重新 regenerate_all 只针对受影响的股票效率太低 + # 直接全量重评(regenerate_all 内部会批量拉价格、做技术分析) + from strategy_lifecycle import regenerate_all + r = regenerate_all(stdout=False) + outputs.append(f" ✅ 策略已全量重评: {r.get('ok',0)}/{r.get('total',0)}成功") + outputs.append(f" 📌 触发股票: {', '.join(reassesed_codes)}") + except Exception as e: + outputs.append(f" ⚠️ 全量重评失败: {e}") + + # === 3.5 资金流异常检测(2026-06-27 新增)=== + try: + cf = json.load(open("/home/hmo/web-dashboard/data/capital_flow_cache.json")) + # 检查所有 active decision 中的资金流异常 + for d in active: + code = d["code"] + stock_cf = cf.get("stocks", {}).get(code, {}) + analysis = stock_cf.get("analysis", {}) + alerts = analysis.get("alerts", []) + if alerts: + name = d.get("name", code) + for a in alerts: + outputs.append(f" 💰 {name}({code}) {a}") + except Exception: + pass + + # === 第四步:情景变化检测 + 输出 → 直接推XMPP === + now_str = datetime.now().strftime("%H:%M:%S") + elapsed = time.time() - start + + # 情景变化检测(跨轮对比) + if HAS_TREE and _SCENARIO_CACHE.get('id'): + prev_scenario = state.get('_system', {}).get('last_scenario', '') + curr_scenario = _SCENARIO_CACHE['id'] + if prev_scenario and curr_scenario != prev_scenario: + combo = _SCENARIO_CACHE.get('combo_action', '') + outputs.insert(0, f"🌀 情景切换: {prev_scenario}→{curr_scenario} | {combo}") + if outputs: + state.setdefault('_system', {})['last_scenario'] = curr_scenario + state_updated = True + elif not prev_scenario: + state.setdefault('_system', {})['last_scenario'] = curr_scenario + state_updated = True + + if outputs: + # 简短一行一个触发 + for o in outputs: + print(o) + # 推送XMPP(只推关键事件:止损跌破+情景切换+资金流异动,不推买入区进出/重评等操作细节) + critical = [o for o in outputs if o.startswith(("⚠️", "🌀", "💰"))] + if critical: + try: + body = "\n".join([f"{now_str}"] + critical) + payload = json.dumps({ + "to": "hmo@yoin.fun", "body": body, "type": "chat", + }).encode("utf-8") + req = urllib.request.Request( + "http://127.0.0.1:5805/", data=payload, + headers={"Content-Type": "application/json"}, + ) + urllib.request.urlopen(req, timeout=5) + except Exception: + pass + # else: SILENT — 无触发,无输出,不推 + + if state_updated: + save_state(state) + + +def main(): + """每cron触发跑一轮""" + run_once() + + +if __name__ == "__main__": + main() diff --git a/scripts/stock_quote.py b/scripts/stock_quote.py new file mode 100644 index 0000000..fa96a62 --- /dev/null +++ b/scripts/stock_quote.py @@ -0,0 +1,434 @@ +#!/usr/bin/env python3 +""" +stock_quote.py — 统一股票行情查询工具(唯一权威价格源) + +用法: + python3 stock_quote.py 688411 # 单个A股 + python3 stock_quote.py 688411 01211 300750 # 批量 + python3 stock_quote.py --all-holdings # 所有持仓 + +输出:每只股票一行JSON,格式统一、无歧义。 +LLM 禁止直接解析新浪/腾讯原始CSV,只能读本脚本输出。 + +数据来源(按优先级降序): + A股: 东财push2 → 新浪hq → 腾讯qt + 港股: 东财并行限速(5 workers) → 新浪批量 → 腾讯15min延迟(兜底) + +验证规则: + - price 必须在 [low, high] 范围内 + - change_pct 必须与 (price - prev_close) / prev_close 一致(±0.1%容差) + - 任一验证失败 → 该数据源降级,尝试下个源 +""" + +import json, sys, re, time, urllib.request +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Semaphore +from pathlib import Path +from datetime import datetime + +UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" +DATA_DIR = Path("/home/hmo/MoFin/data") + +# ── 工具 ── + +def _http_get(url, headers=None, timeout=10): + """带代理绕过的HTTP GET""" + req = urllib.request.Request(url, headers=headers or {"User-Agent": UA}) + proxy_handler = urllib.request.ProxyHandler({}) + opener = urllib.request.build_opener(proxy_handler) + try: + with opener.open(req, timeout=timeout) as r: + return r.read().decode("utf-8", errors="replace") + except Exception: + return None + + +def _detect_market(code): + """自动识别A股/港股""" + code = str(code).strip() + if code.startswith("0") or code.startswith("3") or len(code) == 6: + return "ashare" + if len(code) <= 5: + return "hk" + return "unknown" + + +def _sym(code, market): + """转成API可用的symbol""" + if market == "ashare": + pre = "sh" if code.startswith(("5", "6", "9")) else "sz" + return pre + code, code + return code, code # HK codes used directly + + +# ── 数据源:东方财富(A股+港股通用) ── + +def _em_ashare(code): + """东财A股API""" + url = f"https://push2.eastmoney.com/api/qt/stock/get?secid=1.{code}&fields=f43,f44,f45,f46,f47,f50,f57,f58,f60,f170,f169&fltt=2" + raw = _http_get(url, headers={"User-Agent": UA, "Referer": "https://quote.eastmoney.com/"}) + if not raw: + return None + try: + resp = json.loads(raw) + if resp.get("rc") != 0: + return None + d = resp.get("data") + if not d: + return None + price = d.get("f43") + if price is None or float(price) <= 0: + return None + return { + "code": code, + "name": None, # 东财不返名称 + "price": float(price), + "change_pct": float(d.get("f170", 0)) if d.get("f170") is not None else None, + "high": float(d.get("f44", 0)) if d.get("f44") else None, + "low": float(d.get("f45", 0)) if d.get("f45") else None, + "open": float(d.get("f46", 0)) if d.get("f46") else None, + "prev_close": float(d.get("f60", 0)) if d.get("f60") else None, + "volume": d.get("f47"), + "amount": d.get("f50"), + "source": "eastmoney", + } + except (json.JSONDecodeError, ValueError, TypeError): + return None + + +def _em_hk(code): + """东财港股API(单股)""" + url = f"https://push2.eastmoney.com/api/qt/stock/get?secid=116.{code}&fields=f43,f44,f45,f46,f47,f50,f57,f58,f60,f170,f169&fltt=2" + raw = _http_get(url, headers={"User-Agent": UA, "Referer": "https://quote.eastmoney.com/"}) + if not raw: + return None + try: + resp = json.loads(raw) + if resp.get("rc") != 0: + return None + d = resp.get("data") + if not d: + return None + price = d.get("f43") + if price is None or float(price) <= 0: + return None + return { + "code": code, + "name": None, + "price": float(price), + "change_pct": float(d.get("f170", 0)) if d.get("f170") is not None else None, + "high": float(d.get("f44", 0)) if d.get("f44") else None, + "low": float(d.get("f45", 0)) if d.get("f45") else None, + "open": float(d.get("f46", 0)) if d.get("f46") else None, + "prev_close": float(d.get("f60", 0)) if d.get("f60") else None, + "volume": d.get("f47"), + "amount": d.get("f50"), + "source": "eastmoney", + } + except (json.JSONDecodeError, ValueError, TypeError): + return None + + +# ── 数据源:新浪(A股) ── + +def _sina_ashare(code): + """新浪A股API(批量)""" + pre = "sh" if code.startswith(("5", "6", "9")) else "sz" + url = f"https://hq.sinajs.cn/list={pre}{code}" + raw = _http_get(url, headers={ + "User-Agent": UA, + "Referer": "https://finance.sina.com.cn", + }) + if not raw: + return None + try: + fields = raw.split("\"")[1].split(",") + # 新浪格式: name,open,prev_close,current,high,low,buy,sell,volume,amount + if len(fields) < 8: + return None + name = fields[0].strip() + price = float(fields[3]) if fields[3] else 0 + prev_close = float(fields[2]) if fields[2] else 0 + if price <= 0: + return None + change_pct = round((price - prev_close) / prev_close * 100, 2) if prev_close > 0 else None + return { + "code": code, + "name": name, + "price": price, + "change_pct": change_pct, + "high": float(fields[4]) if fields[4] else None, + "low": float(fields[5]) if fields[5] else None, + "open": float(fields[1]) if fields[1] else None, + "prev_close": prev_close, + "volume": int(fields[8]) if len(fields) > 8 and fields[8] else None, + "amount": float(fields[9]) if len(fields) > 9 and fields[9] else None, + "source": "sina", + } + except (ValueError, IndexError): + return None + + +# ── 数据源:腾讯(A股兜底) ── + +def _tencent_ashare(code): + """腾讯A股API(批量,15min延迟兜底)""" + pre = "sh" if code.startswith(("5", "6", "9")) else "sz" + url = f"https://qt.gtimg.cn/q={pre}{code}" + raw = _http_get(url, headers={"User-Agent": UA}) + if not raw: + return None + try: + fields = raw.split("~") + # Tencent格式: ~分隔 + if len(fields) < 10: + return None + name = fields[1] + price = float(fields[3]) if fields[3] else 0 + prev_close = float(fields[4]) if fields[4] else 0 + if price <= 0: + return None + change_pct = round((price - prev_close) / prev_close * 100, 2) if prev_close > 0 else None + return { + "code": code, + "name": name, + "price": price, + "change_pct": change_pct, + "high": float(fields[33]) if len(fields) > 33 and fields[33] else None, + "low": float(fields[34]) if len(fields) > 34 and fields[34] else None, + "open": float(fields[5]) if fields[5] else None, + "prev_close": prev_close, + "volume": int(fields[6]) if fields[6] else None, + "amount": float(fields[37]) if len(fields) > 37 and fields[37] else None, + "source": "tencent", + } + except (ValueError, IndexError): + return None + + +# ── 数据验证 ── + +def _validate(q): + """验证行情数据自洽性。返回 (is_valid, reason)""" + if q is None: + return False, "no_data" + if q["price"] <= 0: + return False, "price_zero" + # price 必须在 [low, high] 范围内(如果low/high存在) + if q.get("high") and q.get("low"): + if q["price"] < q["low"] or q["price"] > q["high"]: + return False, f"price_out_of_range: {q['price']} not in [{q['low']},{q['high']}]" + # change_pct一致性(如果prev_close存在) + if q.get("prev_close") and q["prev_close"] > 0 and q.get("change_pct") is not None: + expected = round((q["price"] - q["prev_close"]) / q["prev_close"] * 100, 2) + if abs(expected - q["change_pct"]) > 0.5: + return False, f"change_pct_mismatch: reported={q['change_pct']} expected={expected}" + return True, "ok" + + +# ── 主查询逻辑 ── + +def get_quote(code): + """ + 获取单只股票行情,按优先级尝试多个数据源。 + 返回统一dict,失败返回None。 + """ + market = _detect_market(code) + result = None + + if market == "ashare": + # A股: 东财 → 新浪 → 腾讯 + for fetcher in [_em_ashare, _sina_ashare, _tencent_ashare]: + q = fetcher(code) + valid, reason = _validate(q) + if valid: + q["market"] = "A股" + if q.get("name") is None: + q["name"] = _get_name_from_cache(code) + return q + if q is not None: + result = q # keep last attempt + + elif market == "hk": + for fetcher in [_em_hk, _sina_hk, _tencent_hk]: + q = fetcher(code) + valid, reason = _validate(q) + if valid: + q["market"] = "港股" + q["code"] = code + if q.get("name") is None: + q["name"] = _get_name_from_cache(code) + return q + if q is not None: + result = q + + return result + + +def get_quotes_batch(codes, max_workers=5): + """ + 批量获取,并行执行。 + 返回 {code: quote_dict or None} + """ + if max_workers == 1: + return {c: get_quote(c) for c in codes} + results = {} + with ThreadPoolExecutor(max_workers=max_workers) as ex: + fut_map = {ex.submit(get_quote, c): c for c in codes} + for fut in as_completed(fut_map): + c = fut_map[fut] + try: + results[c] = fut.result() + except Exception: + results[c] = None + return results + + +# ── 名称缓存(从portfolio.json/decisions.json补充) ── + +def _get_name_from_cache(code): + """从本地数据文件补充股票名称(东财API不返回名称时用)""" + try: + pf = DATA_DIR / "portfolio.json" + if pf.exists(): + d = json.loads(pf.read_text()) + for h in d.get("holdings", []): + if str(h.get("code", "")) == str(code): + return h.get("name", "") + except Exception: + pass + try: + wl = DATA_DIR / "watchlist.json" + if wl.exists(): + d = json.loads(wl.read_text()) + for item in d if isinstance(d, list) else d.get("stocks", []): + if str(item.get("code", "")) == str(code): + return item.get("name", "") + except Exception: + pass + return "" + + +# ── 港股备用数据源 ── + +def _sina_hk(code): + """新浪港股API(批量友好,单股也支持)""" + url = f"https://hq.sinajs.cn/list=hk{code}" + raw = _http_get(url, headers={ + "User-Agent": UA, + "Referer": "https://finance.sina.com.cn", + }) + if not raw: + return None + try: + fields = raw.split("\"")[1].split(",") + if len(fields) < 9: + return None + name = fields[1] + price = float(fields[2]) if fields[2] else 0 + prev_close = float(fields[3]) if fields[3] else 0 + if price <= 0: + return None + change_amt = float(fields[7]) if fields[7] else 0 + # 更可靠的price计算 + if prev_close > 0 and abs(change_amt) > 0: + price = round(prev_close + change_amt, 2) + change_pct = float(fields[8]) if fields[8] else 0 + return { + "code": code, + "name": name, + "price": price, + "change_pct": change_pct, + "high": float(fields[5]) if fields[5] else None, + "low": float(fields[6]) if fields[6] else None, + "open": float(fields[4]) if fields[4] else None, + "prev_close": prev_close, + "volume": None, + "amount": None, + "source": "sina", + } + except (ValueError, IndexError): + return None + + +def _tencent_hk(code): + """腾讯港股(15min延迟,兜底)""" + url = f"https://qt.gtimg.cn/q=hk{code}" + raw = _http_get(url, headers={"User-Agent": UA}) + if not raw: + return None + try: + fields = raw.split("~") + if len(fields) < 10: + return None + name = fields[1] + price = float(fields[3]) if fields[3] else 0 + prev_close = float(fields[4]) if fields[4] else 0 + if price <= 0: + return None + change_pct = float(fields[7]) if len(fields) > 7 and fields[7] else 0 + return { + "code": code, + "name": name, + "price": price, + "change_pct": change_pct, + "high": float(fields[33]) if len(fields) > 33 and fields[33] else None, + "low": float(fields[34]) if len(fields) > 34 and fields[34] else None, + "open": float(fields[5]) if fields[5] else None, + "prev_close": prev_close, + "volume": None, + "amount": None, + "source": "tencent", + } + except (ValueError, IndexError): + return None + + +# ── 从portfolio.json读取所有持仓代码 ── + +def get_holding_codes(): + """从portfolio.json提取所有持仓代码""" + try: + pf = DATA_DIR / "portfolio.json" + d = json.loads(pf.read_text()) + return [h["code"] for h in d.get("holdings", []) if h.get("code")] + except Exception: + return [] + + +# ── CLI入口 ── + +def main(): + if len(sys.argv) < 2: + print("用法: python3 stock_quote.py [code2 ...]", file=sys.stderr) + print(" python3 stock_quote.py --all-holdings", file=sys.stderr) + sys.exit(1) + + codes = [] + if sys.argv[1] == "--all-holdings": + codes = get_holding_codes() + if not codes: + print(json.dumps({"error": "无法读取持仓列表", "timestamp": datetime.now().isoformat()})) + sys.exit(1) + else: + codes = [c.strip() for c in sys.argv[1:] if c.strip()] + + results = get_quotes_batch(codes) + timestamp = datetime.now().isoformat() + + # 输出:每行一个JSON(方便批量处理) + for code in codes: + q = results.get(code) + if q: + q["fetched_at"] = timestamp + print(json.dumps(q, ensure_ascii=False, default=str)) + else: + print(json.dumps({ + "code": code, + "error": "无法获取行情", + "fetched_at": timestamp, + }, ensure_ascii=False)) + + +if __name__ == "__main__": + main() diff --git a/scripts/strategy_lifecycle.py b/scripts/strategy_lifecycle.py new file mode 100644 index 0000000..c03aee4 --- /dev/null +++ b/scripts/strategy_lifecycle.py @@ -0,0 +1,2073 @@ +#!/usr/bin/env python3 +"""策略生命周期管理系统 — 技术面驱动版本 v2 + +核心原则: +1. 止损放在合理的技术位,不拍数字 +2. 新买入推荐:止损=弱支撑(约3%跌幅),止盈=强压力,盈亏比≥2:1 +3. 已持仓:止损=强支撑(约5-8%跌幅),目标=强压力 +4. 买入区间:弱支撑~弱压力之间 +5. 买入时机:量价齐跌不买,缩量至支撑买,量价齐升追买 +""" + +import json +import urllib.request +import os +import sys +import re +from datetime import datetime +import technical_analysis as ta +import multi_timeframe as mtf + + +def is_hk_stock(code): + """判断是否港股(港股代码5位,A股6位带前导零)""" + return len(str(code)) <= 5 + + +def calc_atr(code, period=14): + """从腾讯API K线数据计算ATR(period),返回ATR值或None""" + try: + url = f"http://ifzq.gtimg.cn/appstock/app/fqkline/get?param=hk{code},day,,,60,qfq" + req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) + resp = urllib.request.urlopen(req, timeout=5).read().decode('utf-8') + data = json.loads(resp) + bars = data.get('data', {}).get(f'hk{code}', {}).get('day', []) + if len(bars) < period + 1: + return None + trs = [] + for i in range(1, min(len(bars), period + 1)): + try: + high = float(bars[i][2]) + low = float(bars[i][3]) + prev_close = float(bars[i-1][4]) if len(bars[i-1]) > 4 else float(bars[i-1][3]) + tr = max(high - low, abs(high - prev_close), abs(low - prev_close)) + trs.append(tr) + except (ValueError, IndexError): + continue + if not trs: + return None + return round(sum(trs) / len(trs), 2) + except Exception: + return None + +# 提示词版本追踪 +try: + from prompt_manager.tracking import record_strategy_generation + HAS_PROMPT_TRACKING = True +except ImportError: + HAS_PROMPT_TRACKING = False + +PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" +WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json" + +def safe_json_load(path, default=None): + """安全加载 JSON,遇到坏数据自动修复""" + if not os.path.exists(path): + return default if default is not None else {} + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + except json.JSONDecodeError: + # 尝试修复:替换字符串内未转义的换行符,去多余括号 + with open(path, "r", encoding="utf-8") as f: + raw = f.read() + fixed = raw + + # 修复1: 字符串内未转义的换行 -> \\n + result = [] + in_str = False + for ch in fixed: + if ch == '"': + in_str = not in_str + result.append(ch) + elif in_str and ch in '\n\r': + result.append('\\n') + else: + result.append(ch) + fixed = ''.join(result) + + # 修复2: 去掉多余的尾部括号 + fixed = fixed.rstrip('}') + # 补回正确的闭合 + if not fixed.endswith('}'): + fixed += '}' + + try: + return json.loads(fixed) + except json.JSONDecodeError as e: + print(f"[WARN] watchlist.json 自动修复失败: {e}", file=sys.stderr) + return default if default is not None else {} +KNOWLEDGE_LOG = "/home/hmo/Obsidian/knowledge/finance/analyst-knowledge-log.md" +MACRO_CONTEXT_PATH = "/home/hmo/web-dashboard/data/macro_context.json" +MARKET_CONTEXT_PATH = "/home/hmo/web-dashboard/data/market.json" +STOCK_SECTOR_MAP_PATH = "/home/hmo/web-dashboard/data/stock_sector_map.json" + + +def load_stock_sector_map(): + """读取个股归属行业映射 + + stock_sector_map.json 格式: {code: [sector1, sector2, ...]} + 跳过 _note, _created_at 等元数据键。 + """ + # 优先从 SQLite 读取 + try: + from mofin_db import get_conn, query_sector_stocks + conn = get_conn() + # 从 stock_sectors 表反向构建 code→[sectors] 映射 + rows = conn.execute("SELECT code, sector_name FROM stock_sectors ORDER BY code").fetchall() + conn.close() + code_to_sectors = {} + for code, sector in rows: + if code not in code_to_sectors: + code_to_sectors[code] = [] + code_to_sectors[code].append(sector) + return code_to_sectors + except Exception: + pass + try: + with open(STOCK_SECTOR_MAP_PATH) as f: + data = json.load(f) + code_to_sectors = {} + for key, value in data.items(): + if key.startswith("_"): + continue + if isinstance(value, list): + code_to_sectors[key] = value + return code_to_sectors + except Exception: + return {} + + +def load_market_context(): + """读取市场上下文,优先 SQLite,回退 market.json""" + # 优先从 SQLite 读取 + try: + from mofin_db import get_conn, query_latest_market + conn = get_conn() + market = query_latest_market(conn) + conn.close() + if market and market.get("sectors"): + sector_perf = {} + for s in market["sectors"]: + name = s.get("name", "") + if name: + sector_perf[name] = { + "change": s.get("change_pct", 0), + "up_count": s.get("up_count", 0), + "down_count": s.get("down_count", 0), + "net_inflow": s.get("net_inflow", 0), + "lead_stock": s.get("lead_stock", ""), + "lead_stock_change": s.get("lead_stock_change", 0), + } + return { + "sector_perf": sector_perf, + "breadth": market.get("up_ratio", 50), + "mood": market.get("mood", "neutral"), + "top_gainers": {g["name"]: g["change_pct"] for g in market.get("top_gainers", [])}, + "top_losers": {g["name"]: g["change_pct"] for g in market.get("top_losers", [])}, + "total_sectors": len(market["sectors"]), + "market_timestamp": market.get("timestamp", ""), + } + except Exception: + pass + try: + with open(MARKET_CONTEXT_PATH) as f: + market = json.load(f) + sectors = market.get("sectors", []) + sector_perf = {} + for s in sectors: + name = s.get("name", "") + if name: + sector_perf[name] = { + "change": s.get("change", 0), + "up_count": s.get("up_count", 0), + "down_count": s.get("down_count", 0), + "net_inflow": s.get("net_inflow", 0), + "lead_stock": s.get("lead_stock", ""), + "lead_stock_change": s.get("lead_stock_change", 0), + } + top_gainers = {s.get("name", ""): s.get("change", 0) + for s in market.get("top_gainers", [])} + top_losers = {s.get("name", ""): s.get("change", 0) + for s in market.get("top_losers", [])} + return { + "sector_perf": sector_perf, + "breadth": market.get("up_ratio", 50), + "mood": market.get("mood", "neutral"), + "top_gainers": top_gainers, + "top_losers": top_losers, + "total_sectors": market.get("total_sectors", 0), + "market_timestamp": market.get("timestamp", ""), + } + except Exception: + return { + "sector_perf": {}, + "breadth": 50, + "mood": "neutral", + "top_gainers": {}, + "top_losers": {}, + "total_sectors": 0, + "market_timestamp": "", + } + + +def compute_sector_adjustment(code, market_ctx, stock_sector_map): + """根据个股所属行业的市场表现+小果情感,返回调整系数 + + 返回 dict: + stop_bias: 止损调整系数(<1.0收紧, >1.0放宽) + target_bias: 止盈调整系数 + note: 行业背景一句话 + sector_name: 匹配到的行业名称 + sector_change: 行业涨跌幅 + """ + # 默认无调整 + adj = {"stop_bias": 1.0, "target_bias": 1.0, "note": "", + "sector_name": "", "sector_change": 0} + + sectors_for_code = stock_sector_map.get(code, []) + if not sectors_for_code: + return adj + + sector_perf = market_ctx.get("sector_perf", {}) + breadth = market_ctx.get("breadth", 50) + + # 找第一个能匹配到的行业 + for sec in sectors_for_code: + if sec in sector_perf: + perf = sector_perf[sec] + chg = perf.get("change", 0) + adj["sector_name"] = sec + adj["sector_change"] = chg + + # 行业暴跌 > 3% + if chg <= -3: + adj["stop_bias"] = 0.92 # 止损收紧8% + adj["target_bias"] = 0.90 # 止盈下调10% + adj["note"] = f"行业{sec}大跌{chg:+.1f}%,收紧止损" + # 行业大跌 1~3% + elif chg <= -1: + adj["stop_bias"] = 0.96 + adj["target_bias"] = 0.95 + adj["note"] = f"行业{sec}下跌{chg:+.1f}%,适度防御" + # 行业大涨 > 3% + elif chg >= 3: + adj["stop_bias"] = 1.05 # 止损放宽5%(给趋势空间) + adj["target_bias"] = 1.03 + adj["note"] = f"行业{sec}大涨{chg:+.1f}%,可适度积极" + # 行业上涨 1~3% + elif chg >= 1: + adj["stop_bias"] = 1.02 + adj["note"] = f"行业{sec}上涨{chg:+.1f}%,正常" + else: + adj["note"] = f"行业{sec}{chg:+.1f}%,中性" + break + # 尝试处理命名差异:market.json中的行业名可能多了"板块"后缀 + for market_sec_name in sector_perf: + if sec in market_sec_name or market_sec_name in sec: + perf = sector_perf[market_sec_name] + chg = perf.get("change", 0) + adj["sector_name"] = market_sec_name + adj["sector_change"] = chg + if chg <= -3: + adj["stop_bias"] = 0.92 + adj["target_bias"] = 0.90 + adj["note"] = f"行业{market_sec_name}大跌{chg:+.1f}%,收紧止损" + elif chg <= -1: + adj["stop_bias"] = 0.96 + adj["target_bias"] = 0.95 + adj["note"] = f"行业{market_sec_name}下跌{chg:+.1f}%,适度防御" + elif chg >= 3: + adj["stop_bias"] = 1.05 + adj["target_bias"] = 1.03 + adj["note"] = f"行业{market_sec_name}大涨{chg:+.1f}%,可适度积极" + elif chg >= 1: + adj["stop_bias"] = 1.02 + adj["note"] = f"行业{market_sec_name}上涨{chg:+.1f}%,正常" + else: + adj["note"] = f"行业{market_sec_name}{chg:+.1f}%,中性" + break + + # 如果breath<30% (大盘极弱),再加一层收紧 + if breadth < 30: + adj["stop_bias"] *= 0.97 # 再收紧3% + breadth_note = "大盘仅{}%个股上涨".format(int(breadth)) + adj["note"] = (adj["note"] + " | " + breadth_note) if adj["note"] else breadth_note + elif breadth < 40: + adj["stop_bias"] *= 0.99 + breadth_note = "大盘偏弱({}%上涨)".format(int(breadth)) + adj["note"] = (adj["note"] + " | " + breadth_note) if adj["note"] else breadth_note + + # 小果情感约束:利空置信度>80%时收紧止损 + try: + xiaoguo_path = "/home/hmo/web-dashboard/data/xiaoguo_sentiment.json" + if os.path.exists(xiaoguo_path): + xg = json.load(open(xiaoguo_path)) + stock_sentiment = xg.get("stocks", {}).get(code, {}) + if stock_sentiment: + sentiment = stock_sentiment.get("sentiment", "") + confidence = stock_sentiment.get("confidence", 0) + summary = stock_sentiment.get("summary", "") + if sentiment == "negative" and confidence > 0.8: + adj["stop_bias"] = min(adj["stop_bias"], 0.95) + adj["note"] += f" | 小果利空{confidence:.0%}:{summary[:30]}" + except Exception: + pass + + return adj + + +def load_macro_context(): + """读取宏观上下文,返回 (bias, desc),优先 DB,回退 JSON""" + try: + import sqlite3 + from pathlib import Path + conn = sqlite3.connect(str(Path(__file__).parent.parent / "data" / "mofin.db")) + row = conn.execute( + "SELECT indices, structure FROM macro_context_log " + "WHERE has_valid_data=1 ORDER BY created_at DESC LIMIT 1" + ).fetchone() + conn.close() + if row: + indices = json.loads(row[0]) if row[0] else {} + structure = json.loads(row[1]) if row[1] else {} + overall = structure.get("overall", "neutral") + desc = structure.get("description", "") + else: + raise ValueError("no db data") + except Exception: + try: + with open(MACRO_CONTEXT_PATH) as f: + ctx = json.load(f) + overall = ctx.get("structure", {}).get("overall", "neutral") + desc = ctx.get("structure", {}).get("description", "") + except Exception: + return 1.0, "宏观未加载" + if "bearish" in overall: + return 0.8, f"宏观{desc}" + elif overall == "bullish": + return 1.05, f"宏观{desc}" + elif overall == "strong_bullish": + return 1.1, f"宏观{desc}" + else: + return 1.0, f"宏观{desc}" + + +def batch_fetch_prices(codes): + """批量获取实时价格,合并为一次API调用(自动分批,每批15只)""" + if not codes: + return {} + + # 分批处理,避免单次请求过大导致超时 + batch_size = 15 + all_results = {} + for batch_start in range(0, len(codes), batch_size): + batch = codes[batch_start:batch_start + batch_size] + symbols = [] + code_map = {} + for raw_code in batch: + raw_code = str(raw_code).split('_')[0] + if not raw_code: + continue + if len(raw_code) == 5 and raw_code.isdigit(): + prefix = "hk" + elif raw_code.startswith(("6", "5")): + prefix = "sh" + else: + prefix = "sz" + sym = f"{prefix}{raw_code}" + symbols.append(sym) + code_map[sym] = raw_code + if not symbols: + continue + + url = f"http://qt.gtimg.cn/q={','.join(symbols)}" + max_retries = 2 + for attempt in range(max_retries + 1): + try: + r = urllib.request.urlopen(url, timeout=10) + text = r.read().decode("gbk") + except Exception as e: + if attempt < max_retries: + continue + print(f" batch_fetch_prices error: {e}", file=sys.stderr) + continue + + for line in text.strip().split("\n"): + line = line.strip() + if not line or "=" not in line: + continue + try: + sym = line.split("=", 1)[0].strip().lstrip("v_") + raw_value = line.split("=", 1)[1].strip().strip('"').strip(";") + fields = raw_value.split("~") + if len(fields) < 35: + continue + orig_code = code_map.get(sym) + if not orig_code: + continue + def f(i): + try: + return float(fields[i]) if fields[i].strip() else 0.0 + except: + return 0.0 + all_results[orig_code] = { + "price": f(3), "close": f(4), "high": f(33), "low": f(34), + "code": orig_code, + } + except Exception: + continue + break # Success - break retry loop + + return all_results + + +def get_price_tencent(code): + """获取实时价格,港股转CNY统一存CNY""" + try: + from mo_models import to_cny, is_hk_stock + except ImportError: + to_cny = lambda v, r=None: v + is_hk_stock = lambda c: len(str(c).strip()) == 5 and str(c).strip().isdigit() + try: + raw_code = code.split('_')[0] + if not raw_code: + return None + if is_hk_stock(raw_code): + prefix = "hk" + elif raw_code.startswith("6") or raw_code.startswith("5"): + prefix = "sh" + else: + prefix = "sz" + url = f"http://qt.gtimg.cn/q={prefix}{raw_code}" + r = urllib.request.urlopen(url, timeout=5) + fields = r.read().decode("gbk").split('"')[1].split("~") + def f(i): + try: + return float(fields[i]) if fields[i].strip() else 0.0 + except: + return 0.0 + price = f(3) + if is_hk_stock(raw_code) and price > 0: + price = to_cny(price) + return { + "price": price, "close": f(4), "high": f(33), "low": f(34), + "code": raw_code, + } + except Exception as e: + print(f" get_price error {code}: {e}", file=sys.stderr) + return None + + +def reassess_strategy(code, name, price, cost, shares, current_action, + volume_signal="", sentiment="neutral", + is_watchlist=False): + """根据技术分析重评策略""" + + tech = ta.full_analysis(code) + if tech and "support_resistance" in tech: + sr = tech["support_resistance"] + candle = tech.get("candlestick", {}) + vol = tech.get("volume", {}) + ss = sr.get("strong_support") + ws = sr.get("weak_support") + wr = sr.get("weak_resist") + sr_resist = sr.get("strong_resist") + pivot = sr.get("pivot") + effective_range = sr.get("effective_range") + print(f" TECH: 强撑={ss} 弱撑={ws} 枢轴={pivot} 弱压={wr} 强压={sr_resist} 有效区间={effective_range}") + else: + print(f" ⚠️ 技术分析不可用", file=sys.stderr) + ss = ws = wr = sr_resist = pivot = None + candle = {} + vol = {} + + # ----- 多周期技术分析(周线/月线/均线) ----- + mtf_analysis = {} + mtf_adj = {} + try: + mtf_result = mtf.full_multi_tf_analysis(code) + if mtf_result.get("daily") and mtf_result["daily"].get("count", 0) >= 5: + mtf_analysis = mtf_result + mtf_adj = mtf_result.get("strategy_adjustment", {}) + daily_mas = mtf_result.get("daily", {}).get("mas", {}) + weekly = mtf_result.get("weekly", {}) + monthly = mtf_result.get("monthly", {}) + trend_align = mtf_adj.get("trend_alignment", "未知") + print(f" 多周期: {trend_align} | " + f"MA5={daily_mas.get('ma5','?')} MA20={daily_mas.get('ma20','?')} MA60={daily_mas.get('ma60','?')} | " + f"周线{weekly.get('trend',{}).get('description','?')} 月线{monthly.get('trend',{}).get('description','?')}") + except Exception as e: + print(f" 多周期分析失败: {e}", file=sys.stderr) + + profit_pct = (price - cost) / cost * 100 if cost else 0 + is_new_entry = (cost == 0) or (shares == 0) + is_deep_loss = profit_pct < -20 + + # ----- 股票分类(短炒/中短线/中长线/弱势/深套) ----- + stock_category = "中短线" + time_horizon = "2周~3月" + position_advice = "中等仓位" + try: + mtf_cache = json.load(open("/home/hmo/web-dashboard/data/multi_tf_cache.json")) + stock_data = mtf_cache.get(code, {}) + daily_klines = stock_data.get("daily", []) + fund = stock_data.get("fundamentals", {}) + closes = [d["close"] for d in daily_klines] if daily_klines else [] + + if len(closes) >= 10: + cur = closes[-1] + ma20 = sum(closes[-20:])/20 if len(closes)>=20 else 0 + ma60 = sum(closes[-60:])/60 if len(closes)>=60 else 0 + highs = [d["high"] for d in daily_klines[-20:]] + lows = [d["low"] for d in daily_klines[-20:]] + volatility = ((max(highs)-min(lows))/min(lows)*100) if min(lows)>0 else 0 + pe = fund.get("pe") or 0 + eps = fund.get("eps") or 0 + mcap = fund.get("mcap_total") or 0 + is_high_vol = volatility > 30 + is_high_pe = pe > 100 or pe < 0 + is_value = 0 < pe < 20 and eps > 0.5 + + if is_deep_loss: + stock_category = "深套" + time_horizon = "长期" + position_advice = "不补不割" + elif is_high_vol and is_high_pe: + stock_category = "短炒" + time_horizon = "数日~2周" + position_advice = "小仓快进快出" + elif cur < ma20 and cur < ma60 and ma20 > 0: + stock_category = "弱势" + time_horizon = "观望" + position_advice = "减仓或观望" + elif (is_value or mcap > 1000) and cur > ma20: + stock_category = "中长线" + time_horizon = "数月~1年" + position_advice = "正常配置" + elif volatility > 20: + stock_category = "中短线" + time_horizon = "2~6周" + position_advice = "中等仓位" + except Exception: + pass + + print(f" 分类: {stock_category} | {time_horizon} | {position_advice}") + + # ----- 短炒+强趋势检测:短炒分类但多周期多头时用移动止损代替弱支撑止损 ----- + is_short_term_strong_trend = False + if stock_category == "短炒": + trend_align = mtf_adj.get("trend_alignment", "") + strong_trend_indicators = ["多周期看多", "多周期多头", "上升"] + if any(ind in trend_align for ind in strong_trend_indicators): + is_short_term_strong_trend = True + print(f" ⚡ 短炒+强趋势检测: 趋势={trend_align} → 启用移动止损, 不止盈") + position_advice = "小仓强趋势让利润跑" + + # ----- 止损设置(含最小距离3%保护) ----- + if is_new_entry: + # 新买入推荐:止损 = 弱支撑(约2-3%跌幅,合理可控) + if ws and ws > 0: + new_stop = round(ws, 2) + else: + new_stop = round(price * 0.96, 2) + elif is_deep_loss: + # 深套:止损 = 强支撑再下移(不轻易割) + if ss and ss > 0: + new_stop = round(min(ss, price * 0.85), 2) + else: + new_stop = round(price * 0.85, 2) + else: + # 已持仓正常:止损 = 强支撑 + if is_short_term_strong_trend: + # 短炒+强趋势:用移动止损(距现价-5%),不止盈让利润跑 + trailing_sl = round(max(ws or 0, price * 0.95), 2) if ws else round(price * 0.95, 2) + new_stop = trailing_sl + print(f" 短炒强趋势移动止损: {new_stop} (距现价-{(1-new_stop/price)*100:.1f}%)") + elif ss and ss > 0: + new_stop = round(ss, 2) + else: + new_stop = round(price * 0.88, 2) + + # 已盈利仓位(>5%):用较紧的移动止损保护利润,但不超过成本线 + if profit_pct > 5 and not is_new_entry and not is_deep_loss: + # 取 max(弱支撑, 成本线, 当前价×0.95) 作为止损 + cost_protect = cost if cost > 0 else 0 + trailing_stop = round(max(ws or 0, cost_protect, price * 0.95), 2) + if trailing_stop > new_stop: + new_stop = trailing_stop + print(f" 已启用移动止损: {new_stop} (保护+{profit_pct:.1f}%利润)", file=sys.stderr) + + # 最小止损距离 —— 随趋势强度调整(2026-06-23 震度保护规则) + # 强趋势(多周期看多 + MA多头排列):最小1.5%下行空间 + # 普通/弱势:最小3%下行空间 + is_strong_trend = False + trend_align = mtf_adj.get("trend_alignment", "") + strong_trend_indicators = ["多周期看多", "多周期多头", "上升"] + try: + if any(ind in trend_align for ind in strong_trend_indicators) and ma20 > ma60 and cur >= ma20: + is_strong_trend = True + except (NameError, TypeError): + pass # ma20/ma60/cur may be unbound if MTF data insufficient + + if is_strong_trend: + min_stop_gap = 0.015 # 1.5% + else: + min_stop_gap = 0.03 # 3% + + min_stop = round(price * (1 - min_stop_gap), 2) + if new_stop > min_stop and not is_deep_loss: + old_stop = new_stop + new_stop = min_stop + if old_stop != new_stop: + print(f" 最小止损 {round(min_stop_gap*100)}%间距约束: {old_stop}→{new_stop} (趋势{'强' if is_strong_trend else '普通'})") + + # 港股附加:ATR波动率校验 — 止损距现价不得小于 1×ATR(14) + if is_hk_stock(code): + atr = calc_atr(code) + if atr and atr > 0: + min_atr_stop = round(price - atr, 2) + if new_stop > min_atr_stop: + old_stop_val = new_stop + new_stop = min_atr_stop + print(f" 港股ATR波动率校验({atr:.2f}): 止损 {old_stop_val}→{new_stop} (1×ATR间距)") + + # ----- 止盈设置 ----- + if is_short_term_strong_trend and not is_new_entry: + # 短炒+强趋势:不止盈让利润跑 + mtf_tp = mtf_adj.get("take_profit_reference", {}) + if mtf_tp and mtf_tp.get("level", 0) > price * 1.2: + new_target = round(mtf_tp["level"], 2) + else: + new_target = 0 # 无多周期阻力时不编造止盈 + print(f" 短炒强趋势不止盈: 止盈设为{new_target} (+{(new_target/price-1)*100:.0f}%)") + elif sr_resist and sr_resist > 0: + new_target = round(sr_resist, 2) + else: + new_target = 0 # 无技术面数据时不编造止盈 + + # ----- 风险回报比校验 ----- + stop_distance = price - new_stop if price > new_stop else price * 0.02 + target_distance = new_target - price if new_target > price else 0 + + # 1:2 检查 + min_target_distance = stop_distance * 2.0 + if target_distance < min_target_distance: + # 尝试更高的阻力位,但不超过下一个真实压力位 + candidate_targets = [] + if wr and wr > price and wr != sr_resist: + candidate_targets.append(wr) + if sr_resist and sr_resist > price: + candidate_targets.append(sr_resist) + # 检查有效区间,如果有更高的自然目标位 + if effective_range and price < effective_range * 0.9: + candidate_targets.append(effective_range) + + found = False + for level in candidate_targets: + if (level - price) >= min_target_distance: + new_target = level + found = True + break + + # 如果仍然不满足,检查是否至少能到 1:1.5 + min15_distance = stop_distance * 1.5 + if not found: + for level in candidate_targets: + if (level - price) >= min15_distance: + new_target = level + found = True + break + + # ----- 风险回报比最终计算 ----- + risk = max(price - new_stop, price * 0.01) + reward = max(new_target - price, 0) + rr_ratio = reward / risk if risk > 0 else 0 + + # ----- 状态判断 ----- + if is_deep_loss: + status = "updated" + action_note = "深套持有" + elif is_new_entry: + if rr_ratio < 1.5: + status = "review" + action_note = "⚠️盈亏比不足1:1.5,不建议买入" + elif rr_ratio < 2.0: + status = "updated" + action_note = "⚠️盈亏比偏低(1:{:.1f}),谨慎买入".format(rr_ratio) + else: + status = "updated" + action_note = "" + else: + if rr_ratio < 0.5: + status = "updated" + action_note = "⚠️盈亏比极低,关注" + elif rr_ratio < 1.5: + status = "updated" + action_note = "⚠️盈亏比偏低(1:{:.1f}),不建议加仓".format(rr_ratio) + else: + status = "updated" + action_note = "" + + # 短炒+强趋势:在action_note追加标记 + if is_short_term_strong_trend and not is_new_entry and not is_deep_loss: + extra_note = "短炒强趋势持" if "深套" not in action_note else "" + if extra_note: + action_note = f"{action_note} | {extra_note}" if action_note else extra_note + + # ----- 买入区间(有盈亏比严格约束) ----- + max_acceptable_entry = None # 最大可接受买入价(满足R/R约束) + + if new_target and new_stop and new_target > new_stop and not is_deep_loss: + # 买入价的R/R约束: + # 要求 (target - entry) / (entry - stop) >= min_rr + # 即 entry <= (target + min_rr * stop) / (1 + min_rr) + min_rr = 1.0 # 至少1:1,才不亏 + recommend_rr = 1.5 # 推荐1:1.5以上 + + max_for_recommend = (new_target + recommend_rr * new_stop) / (1 + recommend_rr) + max_for_neutral = (new_target + min_rr * new_stop) / (1 + min_rr) + + if is_new_entry: + # 新买入:要求1:1.5+ + max_acceptable_entry = max_for_recommend + else: + # 已持仓加仓:至少1:1 + max_acceptable_entry = max_for_neutral + + if is_new_entry: + # 新买入:买入区 = 弱支撑附近(不是当前价附近!) + # 只在价格跌到弱支撑附近时才推买入 + entry_low = round(price * 0.98, 2) + entry_high = round(price * 1.02, 2) + if max_acceptable_entry and entry_high > max_acceptable_entry: + entry_high = round(max_acceptable_entry, 2) + # 确保买入区不小于1% + if entry_high - entry_low < price * 0.01: + if max_acceptable_entry and price <= max_acceptable_entry: + entry_low = round(max(price * 0.99, new_stop), 2) + entry_high = round(min(price * 1.01, max_acceptable_entry), 2) + elif ws and ws > 0 and wr and wr > 0 and not is_deep_loss: + # 已持仓正常:买入区 = 弱支撑~弱支撑上方5%(给合理回调空间) + # 上限不能低于成本价×0.95(保护已有持仓不被高位逼空) + entry_low = round(ws, 2) + entry_max = round(ws * 1.05, 2) # 比弱支撑高5%,有足够空间 + # 如果当前价已远离买入区,保持买入区不变(不因价格涨了就收窄) + min_upper = round(cost * 0.95, 2) if cost > 0 else 0 + if entry_max < min_upper: + entry_max = min_upper + if max_acceptable_entry: + entry_high = round(min(entry_max, max_acceptable_entry), 2) + else: + entry_high = entry_max + # 如果当前价已远离买入区(高于买入区上沿),禁止加仓推荐 + if price > entry_high: + # 买入区锁定在弱支撑位,但标记为"价格远离" + pass + # 如果买入区过窄,标记但不扩展(加仓必须在支撑位) + if entry_high - entry_low < price * 0.005: + entry_low = round(ws * 0.995, 2) + entry_high = round(ws * 1.005, 2) + else: + entry_low = round(price * 0.90, 2) + entry_high = round(price * 1.05, 2) + + # 买入区间稳定性保护:上边界单次变动不超过5% + if 'entry_high' in dir() and entry_high: + # 读取当前策略中已有的买入区上界,如果有且变化过大则限制 + old_entry_high = None + if 'current_action' in dir() and current_action: + import re + m = re.search(r'买入区[\d.]+~([\d.]+)', current_action) + if m: + old_entry_high = float(m.group(1)) + if old_entry_high and old_entry_high > 0: + max_change = old_entry_high * 0.95 # 单次最多下降5% + if entry_high < max_change: + entry_high = round(max_change, 2) + + # ----- 买入时机信号(三维分析:大盘+行业+个股,基本面+消息面+技术面+资金流)----- + # [2026-07-01] 扩展:不再只看volume_signal + candlestick_sentiment + # 融合大盘趋势、行业板块强弱、基本面估值作为修正因子 + volume_signal = vol.get("volume_signal", "") + candlestick_sentiment = candle.get("sentiment", "neutral") + timing_signal = "neutral" + + # --- 三维分析数据装载 --- + # 因子1: 大盘环境(从macro_context_log读) + market_bearish = False + market_bullish = False + try: + import sqlite3 + _db = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5) + _mc = _db.execute( + "SELECT structure FROM macro_context_log WHERE has_valid_data=1 ORDER BY rowid DESC LIMIT 1" + ).fetchone() + if _mc and _mc[0]: + _s = json.loads(_mc[0]) + _overall = _s.get("overall", "") + if "bearish" in _overall: + market_bearish = True + elif _overall == "bullish": + market_bullish = True + _db.close() + except Exception: + pass + + # 因子2: 行业板块强弱 + sector_strong = False + sector_weak = False + try: + _db2 = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5) + _rows2 = _db2.execute( + "SELECT name, change_pct FROM sector_snapshots ORDER BY change_pct DESC" + ).fetchall() + if _rows2: + # 找到该股所属行业(简单匹配name或通过stock_sectors) + _my_sectors = _db2.execute( + "SELECT sector_name FROM stock_sectors WHERE code=?", + (code,) + ).fetchall() + if _my_sectors: + for (_sn,) in _my_sectors: + for r_name, r_chg in _rows2: + if _sn in r_name or r_name in _sn: + _rank = [r[0] for r in _rows2].index(r_name) if r_name in [x[0] for x in _rows2] else -1 + _total = len(_rows2) + if _rank >= 0: + if _rank < _total * 0.2: + sector_strong = True + if _rank > _total * 0.8: + sector_weak = True + break + _db2.close() + except Exception: + pass + + # 因子3: 基本面估值 + is_value_stock = False + try: + _db3 = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5) + _fd = _db3.execute( + "SELECT pe, eps FROM stock_fundamentals WHERE code=?", (code,) + ).fetchone() + if _fd: + _pe, _eps = _fd + is_value_stock = (0 < (_pe or 0) < 25 and (_eps or 0) > 0.3) + _db3.close() + except Exception: + pass + + # --- 三维修正规则 --- + # 大盘偏弱时收紧买入信号,大盘偏强时放宽 + # 行业领先加分,行业落后减分 + # 低估值加分(有安全边际) + + def _adjust_timing(signal, market_b, market_bb, sec_s, sec_w, is_val): + """根据三维因子修正 timing_signal""" + # 大盘偏弱时降级买入信号 + if market_b: + if signal in ("买入", "加仓"): + if not sec_s: # 大盘弱+行业不强→降级 + return "关注" + # 大盘偏强时放宽 + if market_bb: + if signal == "关注" and (sec_s or is_val): + return "买入" + # 行业弱势时降级买入信号 + if sec_w: + if signal in ("买入", "加仓"): + return "关注" + # 行业强势+低估时升级关注 + if sec_s and is_val: + if signal == "关注": + return "买入" + return signal + + if is_new_entry: + # 新买入时机 + if volume_signal == "主动买盘占优" and candlestick_sentiment == "bullish": + timing_signal = "买入" + elif volume_signal == "主动卖盘占优": + timing_signal = "观望" + elif volume_signal == "买卖均衡" and ws and price <= ws * 1.03: + timing_signal = "买入" + elif candlestick_sentiment == "bullish": + timing_signal = "买入" + elif ws and price < ws * 1.02: + timing_signal = "关注" + # 新买入时三维修正:大盘向上+行业强→升级,大盘弱→降级 + _pre_signal = timing_signal + timing_signal = _adjust_timing(timing_signal, market_bearish, market_bullish, + sector_strong, sector_weak, is_value_stock) + if timing_signal != _pre_signal: + print(f" 三维修正(新入): {_pre_signal}→{timing_signal} " + f"| 大盘{'弱' if market_bearish else '强' if market_bullish else '中性'}" + f"| 行业{'强' if sector_strong else '弱' if sector_weak else '中性'}" + f"| 估值{'低' if is_value_stock else '一般'}") + else: + # 已持仓时机(用于加仓/减仓参考) + if is_short_term_strong_trend: + # 短炒+强趋势:强趋势持有,禁止加仓信号 + timing_signal = "持有" + elif profit_pct > 5: + # 已盈利 + if volume_signal == "主动买盘占优": + timing_signal = "持有" + elif volume_signal == "主动卖盘占优" and not is_new_entry: + timing_signal = "关注" + else: + timing_signal = "持有" + elif profit_pct > 0: + # 微盈 + if volume_signal == "主动买盘占优": + timing_signal = "持有" + elif ws and price <= ws * 1.02: + timing_signal = "加仓" + else: + timing_signal = "持有" + else: + # 浮亏 + if volume_signal == "主动卖盘占优" and ss and price <= ss * 1.03: + timing_signal = "关注" + elif volume_signal == "主动买盘占优" and sr_resist and price >= sr_resist * 0.97: + timing_signal = "关注" + elif volume_signal == "买卖均衡" and ws and price <= ws * 1.02: + timing_signal = "加仓" + else: + timing_signal = "持有" + + # ----- 【v3.2新增】分类约束:弱势/深套禁止输出买入/加仓类信号 ----- + if stock_category == "弱势" or is_deep_loss: + buy_signals = ["买入", "加仓", "可追"] + if any(s in timing_signal for s in buy_signals): + old_signal = timing_signal + timing_signal = "弱势持有" if stock_category == "弱势" else "深套持有" + print(f" 分类约束: {stock_category} 原信号\"{old_signal}\" → \"{timing_signal}\"") + + # ----- 构造 action 描述(供 cron prompt 使用) ----- + action_parts = [] + if profit_pct < -20: + action_parts.append("深套持有") + elif profit_pct < -10: + action_parts.append("持有观察") + elif profit_pct < 0: + action_parts.append("持有观察") + elif profit_pct < 5: + action_parts.append("盈利持有") + else: + action_parts.append("盈利良好") + + if action_note: + action_parts.append(action_note) + + if is_watchlist: + # 自选股(未入场):有止损参考+买入区,内部算RR需要止盈位 + action_parts.append(f"目标参考{new_target}") + action_parts.append(f"止损参考{new_stop}") + action_parts.append(f"买入区{entry_low}~{entry_high}") + elif is_new_entry: + action_parts.append(f"损{new_stop}") + action_parts.append(f"盈{new_target}") + action_parts.append(f"买{entry_low}~{entry_high}") + else: + action_parts.append(f"止损{new_stop}") + action_parts.append(f"目标{new_target}") + action_parts.append(f"买入区{entry_low}~{entry_high}") + + if timing_signal != "neutral": + action_parts.append(f"信号:{timing_signal}") + + new_action = " | ".join(action_parts) + + # 技术面快照 + tech_snapshot = "" + if candle: + tech_snapshot = (f"形态:{candle.get('pattern','?')}/{candle.get('sentiment','?')} " + f"量价:{vol.get('volume_signal','?')} " + f"强撑:{ss} 弱撑:{ws} 弱压:{wr} 强压:{sr_resist}") + # 加入均线信息(如果可用) + try: + dm = mtf_analysis.get("daily", {}).get("mas", {}) + ma_parts = [] + for m in ['ma5', 'ma10', 'ma20', 'ma60']: + v = dm.get(m) + if v: + ma_parts.append(f"{m.upper()}={v}") + if ma_parts: + tech_snapshot += " | " + " ".join(ma_parts) + except (NameError, AttributeError): + pass + + # 多周期快照(追加到 tech_snapshot) + mtf_context = "" + if mtf_adj: + trend_align = mtf_adj.get("trend_alignment", "") + daily_mas = mtf_analysis.get("daily", {}).get("mas", {}) + ma20 = daily_mas.get("ma20") + ma60 = daily_mas.get("ma60") + stop_ref = mtf_adj.get("stop_loss_reference", {}) + take_ref = mtf_adj.get("take_profit_reference", {}) + + parts = [] + if trend_align: + parts.append(trend_align) + if ma20: + parts.append(f"MA20={ma20}") + if ma60: + parts.append(f"MA60={ma60}") + if stop_ref: + parts.append(f"长撑:{stop_ref.get('source','?')}={stop_ref['level']}") + if take_ref: + parts.append(f"长压:{take_ref.get('source','?')}={take_ref['level']}") + mtf_context = " | ".join(parts) + + now_str = datetime.now().strftime('%Y-%m-%d %H:%M') + return { + 'stop_loss': new_stop, + 'take_profit': new_target, + 'entry_low': entry_low, + 'entry_high': entry_high, + 'action': new_action, + 'status': status, + 'tech_snapshot': tech_snapshot, + 'timing_signal': timing_signal, + 'rr_ratio': round(rr_ratio, 2), + 'action_note': action_note, + 'reassessed_at': now_str, + 'multi_tf_context': mtf_context, # 多周期上下文 + 'stock_category': stock_category, # 股票分类:短炒/中短线/中长线/弱势/深套 + 'time_horizon': time_horizon, # 时间跨度 + 'position_advice': position_advice, # 仓位建议 + } + + +def load_stock_news_sentiment(code): + """加载小果消息面情感""" + try: + path = "/home/hmo/web-dashboard/data/xiaoguo_sentiment.json" + if not os.path.exists(path): + return {} + xg = json.load(open(path)) + return xg.get("stocks", {}).get(code, {}) + except Exception: + return {} + + +def load_fundamentals(code): + """加载个股基本面""" + try: + path = "/home/hmo/web-dashboard/data/multi_tf_cache.json" + if not os.path.exists(path): + return {} + m = json.load(open(path)) + return m.get(code, {}).get("fundamentals", {}) or {} + except Exception: + return {} + + +def _get_portfolio_risk_state(): + """读取 portfolio 组合风险状态(2026-06-23 引擎协调)""" + try: + # 数据一致性检查:警告多副本(2026-06-23 bugfix) + _check_portfolio_consistency() + p = json.load(open('/home/hmo/web-dashboard/data/portfolio.json')) + pos_pct = p.get('position_pct', 0) + cash = p.get('cash', 0) + holdings = p.get('holdings', []) + weak_cnt = sum(1 for h in holdings if h.get('change_pct', 0) < -15) + total = len(holdings) or 1 + weak_ratio = weak_cnt / total + return { + 'position_pct': pos_pct, + 'cash': cash, + 'is_high_position': pos_pct > 80, + 'is_very_high_position': pos_pct > 90, + 'is_high_weak': weak_ratio > 0.35, + 'weak_ratio': round(weak_ratio * 100), + 'total_holdings': total, + } + except: + return {} + + +def _is_buy_signal(signal): + """判断信号是否为买入/持有类(用于防洗盘)""" + if not signal: + return False + buy_keywords = ['买入', '持有', '加仓', '关注'] + for kw in buy_keywords: + if kw in signal: + return True + return False + + +def _check_portfolio_consistency(): + """数据一致性检查:如果存在多份 portfolio.json 则报警(2026-06-23 bugfix)""" + main = '/home/hmo/web-dashboard/data/portfolio.json' + main_cash = None + try: + import json + main_cash = json.load(open(main)).get('cash') + except Exception: + return + for path in [ + '/home/hmo/data/portfolio.json', + '/home/hmo/projects/MoFin/data/portfolio.json', + '/home/hmo/web-dashboard.bak/data/portfolio.json', + ]: + if os.path.exists(path): + try: + other = json.load(open(path)) + if other.get('cash') != main_cash: + print(f"⚠️ 数据一致性: {os.path.realpath(path)} cash={other.get('cash')} ≠ 主文件 cash={main_cash} (需清理)", file=sys.stderr) + except Exception: + pass + + +def _check_contradiction(code, today_only=True): + """反馈循环核——检查本股是否有刚卖出的记录 + + 返回 dict or None: + - sold_reason: 'portfolio_trim'|'stop_loss' + - sold_at: 卖出日期 + - days_ago: 卖出距今交易日数 + - is_today: 是否今日卖出 + - tag: 追加到信号的标注 + """ + try: + from datetime import datetime, date + dec = json.load(open('/home/hmo/web-dashboard/data/decisions.json')) + for e in dec.get('decisions', []): + if e.get('code') != code: + continue + sold_at = e.get('sold_at', '') + if not sold_at: + return None + try: + sd = datetime.strptime(sold_at, '%Y-%m-%d').date() + td = date.today() + days = (td - sd).days + except: + return None + + reason = e.get('sold_reason', 'portfolio_trim') + if reason == 'stop_loss': + tag = '止损离场(逻辑破坏,短期不关注)' + else: + tag = '组合减仓后关注(已清仓,等回踩确认)' + + return { + 'sold_reason': reason, + 'sold_at': sold_at, + 'days_ago': days, + 'is_today': days == 0, + 'tag': tag, + } + except: + return None + return None + + +def _get_sell_priority_list(): + """减仓优先级排序:深套>亏损>微盈>盈利(2026-06-23 反馈循环) + + 返回 [(code, name, change_pct, position_pct, priority_label), ...] + 按卖出的优先顺序排列(最先应该卖的在最前) + """ + try: + p = json.load(open('/home/hmo/web-dashboard/data/portfolio.json')) + holdings = p.get('holdings', []) + ranked = [] + for h in holdings: + chg = h.get('change_pct', 0) + pos = h.get('position_pct', 0) + if chg < -30: + label = '深套(>30%),优先减' + rank = 0 + elif chg < -20: + label = '深套(>20%),优先减' + rank = 1 + elif chg < -10: + label = '亏损,建议减' + rank = 2 + elif chg < 0: + label = '微亏,可减' + rank = 3 + elif chg < 10: + label = '微盈,持有' + rank = 4 + else: + label = '盈利,最后减' + rank = 5 + ranked.append((rank, h['code'], h.get('name',''), chg, pos, label)) + ranked.sort(key=lambda x: (x[0], -x[4])) # 优先 rank, 其次仓位大优先 + return [{'code':c,'name':n,'change_pct':chg,'position_pct':pos,'label':l} + for r,c,n,chg,pos,l in ranked] + except: + return [] + + +def enrich_timing_signal(base_signal, macro_desc="", sector_note="", + profit_pct=0, stock_category="", is_new_entry=False, + fundamentals=None, news_sentiment=None, + timing_signal_override=None, + portfolio_context=None, + rr_ratio=0): # 2026-06-24 新参:盈亏比约束 + """多因子合成timing_signal——大盘+行业+基本面+技术+组合风险+盈亏比 + + 返回 (enriched_signal, factors_list) + - enriched_signal: 可读的多因子信号描述 + - factors_list: 各因子的摘要列表(用于后续显示) + """ + # 如果已手动设定,尊重手动 + if timing_signal_override and timing_signal_override != "neutral": + return timing_signal_override, [timing_signal_override] + + factors = [] + + # 1. 大盘因子 + if "偏强" in macro_desc or "大涨" in macro_desc or "bullish" in macro_desc.lower(): + macro_txt = "大盘偏强" + factors.append(macro_txt) + elif "偏弱" in macro_desc or "大跌" in macro_desc or "bearish" in macro_desc.lower(): + macro_txt = "大盘偏弱" + factors.append(macro_txt) + elif macro_desc and macro_desc != "宏观未加载": + factors.append("大盘中性") + + # 2. 行业因子 + if sector_note: + # 把"行业X大跌3%+"简化为"行业偏弱","行业X大涨3%+"简化为"行业偏强" + if "大跌" in sector_note or "下跌" in sector_note: + factors.append("行业偏弱") + elif "大涨" in sector_note: + factors.append("行业偏强") + elif "上涨" in sector_note: + factors.append("行业偏强") + else: + factors.append("行业中性") + + # 3. 基本面因子 + if fundamentals: + pe = fundamentals.get("pe", 0) + eps = fundamentals.get("eps", 0) + profit_growth = fundamentals.get("profit_growth", fundamentals.get("yoy_profit", "")) + revenue_growth = fundamentals.get("revenue_growth", fundamentals.get("yoy_revenue", "")) + mcap = fundamentals.get("mcap_total", 0) + + pe = pe or 0 + eps = eps or 0 + profit_growth_str = str(profit_growth or "") + revenue_growth_str = str(revenue_growth or "") + + # 净利增长 + for val in [profit_growth_str, revenue_growth_str]: + try: + v = float(val.replace("%", "").replace("+", "")) + if v > 50: + factors.append("净利增50%+") + break + elif v > 20: + factors.append(f"净利增{int(v)}%") + break + elif v < -20: + factors.append("净利降20%+") + break + except (ValueError, AttributeError): + continue + + # PE估值 + if 0 < pe < 15: + factors.append("低估值") + elif pe > 100 or pe < 0: + factors.append("高估值") + + # 市值 + if mcap and mcap > 5000: + factors.append("蓝筹") + + # 4. 消息面因子(小果情感) + if news_sentiment: + ns = news_sentiment.get("sentiment", "") + nc = news_sentiment.get("confidence", 0) + if ns == "positive" and nc >= 0.7: + kws = news_sentiment.get("keywords", []) + kw_str = f"({'/'.join(kws[:3])})" if kws else "" + factors.append(f"消息偏多{kw_str}") + elif ns == "negative" and nc >= 0.7: + kws = news_sentiment.get("keywords", []) + kw_str = f"({'/'.join(kws[:3])})" if kws else "" + factors.append(f"消息偏空{kw_str}") + + # 5. 技术面(基础信号) + if base_signal and base_signal != "neutral": + factors.append(base_signal) + + # 5.5 组合风险因子(2026-06-23 双引擎协调) + if portfolio_context and not is_new_entry: + if portfolio_context.get('is_very_high_position'): + factors.append("组合仓位极重(>90%)") + elif portfolio_context.get('is_high_position'): + factors.append("组合仓位偏重(>80%)") + if portfolio_context.get('is_high_weak'): + factors.append(f"弱势占{portfolio_context.get('weak_ratio')}%") + elif portfolio_context and is_new_entry: + # 新买入推荐:注明组合上下文 + if portfolio_context.get('is_high_position'): + factors.append(f"仓{portfolio_context.get('position_pct')}%现金有限") + elif portfolio_context.get('is_high_weak'): + factors.append("组合风险信号") + + # 5.7 盈亏比因子(2026-06-24 新增——RR<1.5降级买入信号) + if rr_ratio > 0: + if rr_ratio < 1.5: + factors.append(f"RR{rr_ratio}过低") + elif rr_ratio >= 3: + factors.append(f"RR{rr_ratio}") + # 1.5~3之间:中性,不特别标注 + + # 如果没有足够因素,返回信号不充分 + if not factors: + return "信号不充分", [] + + # 信号只应包含明确的买卖方向,不能从行业/大盘等上下文因子拼凑 + # base_signal 存在且非 neutral → 用 base_signal + # 否则 → 信号不充分(不拿 factors[-1] 当信号) + if base_signal and base_signal != "neutral": + clean_signal = base_signal + else: + # 从 factors 中找第一个有效的操作方向信号 + valid_direction = {"买入", "加仓", "观望", "持有", "关注", "信号不充分"} + signal_found = "" + for f in reversed(factors): + if f in valid_direction: + signal_found = f + break + clean_signal = signal_found if signal_found else "信号不充分" + + # 6. RR约束降级(2026-06-24 新增) + # 买入/加仓信号但RR<1.5 → 降级为"信号不充分" + buy_signals = {"买入", "加仓"} + if clean_signal in buy_signals and 0 < rr_ratio < 1.5: + clean_signal = "信号不充分" + factors.append("RR过低降级") + + return clean_signal, factors + + +def reassess_with_context(code, name, price, cost, shares, current_action, + volume_signal="", sentiment="neutral", is_watchlist=False): + """reassess_strategy + 多因子信号合成(大盘+行业+技术) + + 为 per_stock_reassess 等单只场景提供一站式多因子分析 + """ + result = reassess_strategy( + code, name, price, cost, shares, + current_action, volume_signal, sentiment, is_watchlist + ) + if not result: + return result + + # 加载宏观+行业+消息+基本面上下文 + try: + macro_bias, macro_desc = load_macro_context() + market_ctx = load_market_context() + stock_sector_map = load_stock_sector_map() + sector_adj = compute_sector_adjustment(code, market_ctx, stock_sector_map) + sector_note = sector_adj.get("note", "") + news_sentiment = load_stock_news_sentiment(code) + fund = load_fundamentals(code) + except Exception: + macro_desc = "" + sector_note = "" + news_sentiment = {} + fund = {} + + # ── DSA 集成:注入大盘复盘 + 新闻情报 ────────────────────────── + try: + from mo_bridge import enrich_analysis_context + region = "hk" if len(str(code)) == 5 and str(code)[0] in ('0','1') else "cn" + dsa_ctx = enrich_analysis_context(stock_code=code, stock_name=name, + region=region, include_news=True) + if dsa_ctx: + macro_desc = (macro_desc + "\n\n" + dsa_ctx).strip() + except Exception: + pass # DSA 不可用时静默跳过 + + enriched, factors = enrich_timing_signal( + base_signal=result.get("timing_signal", ""), + macro_desc=macro_desc, + sector_note=sector_note, + profit_pct=(price - cost) / cost * 100 if cost else 0, + stock_category=result.get("stock_category", ""), + is_new_entry=is_watchlist, + fundamentals=fund, + news_sentiment=news_sentiment, + portfolio_context=_get_portfolio_risk_state(), + rr_ratio=result.get("rr_ratio", 0), + ) + result["timing_signal"] = enriched + result["signal_factors"] = factors + + # 6. 防洗盘:信号不要一天一翻(2026-06-23) + # 如果旧信号是买入/持有类,新信号是谨慎/等待类,但中期趋势未破→维持旧信号 + try: + dec = json.load(open('/home/hmo/web-dashboard/data/decisions.json')) + for e in dec.get('decisions', []): + if e.get('code') == code: + old_signal = e.get('timing_signal', '') + if old_signal and _is_buy_signal(old_signal) and not _is_buy_signal(enriched): + # 中等趋势检查:MA5 > MA20 + 多周期看多 + mtf = result.get('multi_tf_context', '') + if '看多' in mtf or '多头' in mtf: + try: + closes = [float(k.split()[2]) for k in mtf.split('|') if 'MA5' in k] + except: + closes = [] + has_uptrend = 'MA5' in mtf and 'MA20' in mtf + if has_uptrend: + print(f" 防洗盘: {old_signal}→保持旧信号(中期趋势完整)") + result["timing_signal"] = f"{old_signal}(正常回调价稳)" + sf = result.get("signal_factors") or [] + if "正常回调价稳" not in sf: + result["signal_factors"] = sf + ["正常回调价稳"] + break + except Exception as e: + print(f" 防洗盘跳过: {e}") + + # 7. 反馈循环核:检查本股是否有刚卖出的记录(2026-06-23) + contradiction = _check_contradiction(code) + if contradiction and contradiction.get('is_today'): + # 今日刚卖出 → 不屏蔽信号,但必须自标注矛盾 + print(f" 反馈循环: {contradiction.get('tag')} (sold_at={contradiction.get('sold_at')})") + if _is_buy_signal(result.get('timing_signal', '')): + result['action_note'] = contradiction['tag'] + # 在 timing_signal 中追加反馈标注,供报告层可见 + curr_signal = result.get('timing_signal', '') + if '⚠️' not in curr_signal: + result['timing_signal'] = f"⚠️{contradiction['tag']}|{curr_signal}" + elif contradiction: + # 非今日卖出但近期卖出 → 标注已清仓 + print(f" 近期清仓: sold_at={contradiction.get('sold_at')} ({contradiction.get('days_ago')}日前)") + if _is_buy_signal(result.get('timing_signal', '')): + curr_signal = result.get('timing_signal', '') + if '已清仓' not in curr_signal: + result['timing_signal'] = f"已清仓,{curr_signal}" + + # 重建 action 文本(同步多因子信号) + try: + if new_action_needs_refresh(result, {"source": "auto"}, price): + _refresh_action_text(result, price, name) + except Exception: + pass + + return result + + +def new_action_needs_refresh(result, old_entry, price): + """判断宏观/行业调整后是否需要刷新action文本""" + # 自选股和手动策略不做调整,不需要刷新 + if old_entry.get("source") == "manual": + return False + return True + + +def _refresh_action_text(result, price, name): + """根据调整后的止损/止盈重建action文本""" + sl = result.get("stop_loss", 0) + tp = result.get("take_profit", 0) + el = result.get("entry_low", 0) + eh = result.get("entry_high", 0) + ts = result.get("timing_signal", "") + an = result.get("action_note", "") + old_action = result.get("action", "") + + # 保持原action的前缀(持有状态部分不变) + # action格式一般是: "状态 | 止损X | 目标Y | 买入区X~Y | 信号:Z" + parts = old_action.split(" | ") + new_parts = [] + for p in parts: + p = p.strip() + # 替换止损数字 + if p.startswith("止损") or p.startswith("止损参考"): + if sl: + p = f"止损{sl}" if "止损参考" not in old_action.split(" | ")[0] else f"止损参考{sl}" + # 替换目标/止盈数字 + if p.startswith("目标") or p.startswith("止盈"): + if tp: + p = f"目标{tp}" + # 替换买入区数字 + if "买入区" in p and "~" in p: + if el and eh: + p = f"买入区{el}~{eh}" + new_parts.append(p) + result["action"] = " | ".join(new_parts) + + +def check_sector_alerts(market_ctx, stock_sector_map, holdings, wl): + """行业轮动主动预警:检测板块崩盘级别信号→查持仓→输出预警 + + 返回 list of alerts: [{code, name, sector, chg, action}] + """ + alerts = [] + if not market_ctx: + return alerts + + sector_perf = market_ctx.get("sector_perf", {}) + + # 找出所有跌幅>3%的行业 + crashing_sectors = {name: data for name, data in sector_perf.items() + if data.get("change", 0) <= -3} + + if not crashing_sectors: + return alerts + + # 构建 code→持仓信息 的映射 + holding_map = {} + for h in holdings: + c = h.get("code", "") + if c: + holding_map[c] = {"name": h.get("name", c), "type": "持仓"} + for s in wl.get("stocks", []): + c = s.get("code", "") + if c and c not in holding_map: + holding_map[c] = {"name": s.get("name", c), "type": "自选"} + + # 对每个暴跌行业,查持仓中是否有股票属于该行业 + for sec_name, sec_data in sorted(crashing_sectors.items(), + key=lambda x: x[1].get("change", 0)): + chg = sec_data.get("change", 0) + for code, sectors in stock_sector_map.items(): + if code in holding_map and sec_name in sectors: + info = holding_map[code] + alerts.append({ + "code": code, + "name": info["name"], + "sector": sec_name, + "sector_change": chg, + "type": info["type"], + "action": f"行业{sec_name}跌{chg:+.1f}%,{info['type']}需关注", + }) + + alerts.sort(key=lambda a: a["sector_change"]) + return alerts + + +def regenerate_all(stdout=True): + """全量重评所有持仓+自选策略""" + # 优先从 SQLite 读取 + try: + from mofin_db import get_conn, query_holdings, query_watchlist + conn = get_conn() + holdings = query_holdings(conn) + wl_stocks = query_watchlist(conn) + conn.close() + pf = {"holdings": holdings} + wl = {"stocks": wl_stocks} + except Exception: + pf = safe_json_load(PORTFOLIO_PATH, {}) + wl = safe_json_load(WATCHLIST_PATH, {}) + + all_stocks = {} + for item in pf.get("holdings", []): + code = item.get("code", "") + if code: + all_stocks[code] = {"source": "portfolio", "data": item} + for item in wl.get("stocks", []): + code = item.get("code", "") + if code and code not in all_stocks: + all_stocks[code] = {"source": "watchlist", "data": item} + + total = len(all_stocks) + ok = 0 + errors = 0 + results = [] + decisions = [] + + # 加载现有 decisions.json 以便追踪变更 + decisions_path = "/home/hmo/web-dashboard/data/decisions.json" + try: + existing_decisions = {d["code"]: d for d in json.load(open(decisions_path)).get("decisions", []) if d.get("code")} + except: + existing_decisions = {} + + # 加载宏观上下文(影响策略参数调整) + macro_bias, macro_desc = load_macro_context() + if stdout: + print(f" 宏观参考: {macro_desc} (bias={macro_bias})") + + # 加载市场上下文 — 行业板块表现 + 大盘宽度(策略参数调整用) + market_ctx = load_market_context() + stock_sector_map = load_stock_sector_map() + market_breadth = market_ctx.get("breadth", 50) + market_mood = market_ctx.get("mood", "neutral") + if stdout: + sectors_found = sum(1 for c in all_stocks if stock_sector_map.get(c)) + print(f" 市场参考: {market_mood} 上涨比{market_breadth}% 已匹配{sectors_found}/{total}只个股行业") + + # 批量预取所有价格(一次API调用 vs 之前N次) + prices_map = batch_fetch_prices(list(all_stocks.keys())) + if stdout: + print(f" 批量获取价格: {len(prices_map)}/{total} 成功") + + for code, info in sorted(all_stocks.items()): + stock = info["data"] + name = stock.get("name", code) + cost = stock.get("cost", 0) or 0 + shares = stock.get("shares", 0) or 0 + source = info["source"] + + q = prices_map.get(code) + if not q or not q.get("price"): + results.append({"code": code, "name": name, "error": "腾讯API无数据"}) + errors += 1 + if stdout: + print(f" ❌ {name}({code}): 腾讯API无数据") + continue + + price = q["price"] + profit_pct = (price - cost) / cost * 100 if cost else 0 + current_action = stock.get("analysis", {}).get("action", "") + close_yest = q.get("close", 0) + sentiment = "neutral" + if close_yest and price > close_yest * 1.02: + sentiment = "bullish" + elif close_yest and price < close_yest * 0.98: + sentiment = "bearish" + + try: + is_wl = (source == "watchlist") + result = reassess_strategy( + code, name, price, cost, shares, + current_action, volume_signal="中性", sentiment=sentiment, + is_watchlist=(source == "watchlist"), + ) + + # --- Manual param preservation: 用户手动策略永不覆盖 --- + old_entry = existing_decisions.get(code, {}) + if old_entry.get("source") == "manual": + # 仅覆盖策略参数,技术分析/信号/价格照常保留 + for key in ["entry_low", "entry_high", "stop_loss", "take_profit"]: + if key in old_entry and old_entry[key] is not None: + result[key] = old_entry[key] + # 重算盈亏比(基于手动参数) + manual_stop = result.get("stop_loss", 0) or 0 + manual_target = result.get("take_profit", 0) or 0 + risk = max(price - manual_stop, price * 0.01) if manual_stop > 0 else price * 0.01 + reward = max(manual_target - price, 0) if manual_target > 0 else 0 + result["rr_ratio"] = round(reward / risk, 2) if risk > 0 else 0 + # 重建 action 文本(引用手动参数,不引用自动计算的) + profit_pct = (price - cost) / cost * 100 if cost else 0 + manual_action_parts = [] + if profit_pct < -20: + manual_action_parts.append("深套持有") + elif profit_pct < -10: + manual_action_parts.append("持有观察") + elif profit_pct < 0: + manual_action_parts.append("持有观察") + elif profit_pct < 5: + manual_action_parts.append("盈利持有") + else: + manual_action_parts.append("盈利良好") + if result.get("action_note"): + manual_action_parts.append(result["action_note"]) + if is_wl: + if manual_stop > 0: + manual_action_parts.append(f"止损参考{manual_stop}") + manual_action_parts.append(f"买入区{result['entry_low']}~{result['entry_high']}") + else: + if manual_stop > 0: + manual_action_parts.append(f"止损{manual_stop}") + if manual_target > 0: + manual_action_parts.append(f"目标{manual_target}") + manual_action_parts.append(f"买入区{result['entry_low']}~{result['entry_high']}") + ts = result.get("timing_signal", "") + if ts and ts != "neutral": + manual_action_parts.append(f"信号:{ts}") + result["action"] = " | ".join(manual_action_parts) + result["status"] = "manual" # 标记为手动管理,变更追踪不受影响 + if stdout: + print(f" [手动保留] {name}({code}) 策略参数未覆盖") + + # 宏观偏差调整:收盘后重评时根据宏观方向微调止损/止盈 + # 自选股不做止盈宏观调整(无持仓) + # 手动策略不做宏观偏差调整(尊重用户设定) + if macro_bias != 1.0 and not is_wl and old_entry.get("source") != "manual": + old_stop = result.get("stop_loss", 0) + old_target = result.get("take_profit", 0) + if macro_bias < 1.0 and old_stop > 0: # 宏观偏弱 → 收紧止损 + # 止损上移(但保留最小3%间距) + adjusted_stop = round(old_stop * (1 + (1 - macro_bias) * 0.3), 2) + min_stop = round(price * 0.97, 2) + result["stop_loss"] = min(adjusted_stop, min_stop) + if old_target > 0: + result["take_profit"] = round(old_target * (1 - (1 - macro_bias) * 0.2), 2) + elif macro_bias > 1.0 and old_target > 0: # 宏观偏强 → 止盈上调让利润跑 + result["take_profit"] = round(old_target * (1 + (macro_bias - 1) * 0.3), 2) + + # 行业偏差调整:根据个股所在行业的市场表现微调止损/止盈 + # 手动策略不做行业调整(尊重用户设定) + sector_adj = compute_sector_adjustment(code, market_ctx, stock_sector_map) + sector_note = sector_adj.get("note", "") + if sector_note and old_entry.get("source") != "manual": + old_stop = result.get("stop_loss", 0) + old_target = result.get("take_profit", 0) + stop_bias = sector_adj.get("stop_bias", 1.0) + target_bias = sector_adj.get("target_bias", 1.0) + if stop_bias != 1.0 and old_stop > 0: + # 行业偏差调整(在宏观调整之后叠加) + adjusted = round(old_stop * stop_bias, 2) + # 保留最小3%间距 + min_stop = round(price * 0.97, 2) + result["stop_loss"] = min(adjusted, min_stop) + if target_bias != 1.0 and old_target > 0 and not is_wl: + result["take_profit"] = round(old_target * target_bias, 2) + + # 加载消息面+基本面(逐个股) + news_sentiment = load_stock_news_sentiment(code) + fund = load_fundamentals(code) + + # 多因子合成 timing_signal:大盘+行业+消息+基本面+技术 + if old_entry.get("source") != "manual": + enriched, _ = enrich_timing_signal( + base_signal=result.get("timing_signal", ""), + macro_desc=macro_desc, + sector_note=sector_note, + profit_pct=profit_pct, + stock_category=result.get("stock_category", ""), + is_new_entry=(source == "watchlist"), + fundamentals=fund, + news_sentiment=news_sentiment, + rr_ratio=result.get("rr_ratio", 0), + ) + result["timing_signal"] = enriched + + # 在宏观/行业/多因子调整后重建 action 文本(同步调整后的止损/止盈数字) + if new_action_needs_refresh(result, old_entry, price): + _refresh_action_text(result, price, name) + + extra = { + "rr_ratio": result.get("rr_ratio"), + "action_note": result.get("action_note", ""), + "timing_signal": result.get("timing_signal", ""), + } + analysis = { + "stop_loss": result["stop_loss"], + "take_profit": result["take_profit"], + "entry_low": result["entry_low"], + "entry_high": result["entry_high"], + "action": result["action"], + "tech_snapshot": result.get("tech_snapshot", ""), + "multi_tf_context": result.get("multi_tf_context", ""), + "reassessed_at": result["reassessed_at"], + "status": result["status"], + **extra, + } + stock["analysis"] = analysis + # 同步 top-level 字段 → zone_breach/price_monitor 依赖这些字段 + # (2026-06-24 bugfix: analysis 子对象有但顶层没有,导致新持仓的止损检测盲区) + stock["stop_loss"] = result.get("stop_loss", 0) + stock["take_profit"] = result.get("take_profit", 0) + stock["entry_low"] = result.get("entry_low", 0) + stock["entry_high"] = result.get("entry_high", 0) + # 同步 trigger 字段 -> price_monitor 依赖 + sl = result.get("stop_loss", 0) + tp = result.get("take_profit", 0) + el = result.get("entry_low", 0) + eh = result.get("entry_high", 0) + trig = {} + if sl and float(sl) > 0: + trig["stop_loss"] = float(sl) + if el and eh and float(el) > 0 and float(eh) > 0: + trig["entry_zone"] = f"{float(el)}~{float(eh)}" + if tp and float(tp) > 0: + trig["take_profit_zone"] = f"0~{float(tp)}" + stock["trigger"] = trig + results.append({ + "code": code, "name": name, + "price": price, "cost": cost, + "action": result["action"], + "stop_loss": result["stop_loss"], + "take_profit": result["take_profit"], + "rr_ratio": result["rr_ratio"], + }) + ok += 1 + if stdout: + rr_str = f" RR={result['rr_ratio']}" if "rr_ratio" in result else "" + print(f" ✅ {name}({code}) {price} {result['action']}{rr_str}") + + # 记录所有股票的决策日志(含变更追踪) + status_display = result.get("status", "active") + # 构建行业上下文 + sector_ctx_str = "" + sec_name = sector_adj.get("sector_name", "") + sec_chg = sector_adj.get("sector_change", 0) + if sec_name: + sector_ctx_str = f"行业{sec_name}{sec_chg:+.1f}%" + if sector_adj.get("note"): + # note 已包含大盘宽度信息 + sector_ctx_str = sector_adj["note"] + elif market_breadth < 40: + # 无行业映射时至少记录大盘宽度 + sector_ctx_str = f"大盘上涨比{market_breadth}%" + new_entry = { + "code": code, "name": name, "price": price, + "cost": old_entry.get("cost", cost) if old_entry else cost, # 优先保留旧成本(holding.xls权威) + "shares": old_entry.get("shares", 0), # 保留持仓股数 + "avg_price": old_entry.get("avg_price", 0), # 保留持仓均价 + "action": result["action"], + "stop_loss": result.get("stop_loss"), + "entry_low": result["entry_low"], + "entry_high": result["entry_high"], + "tech_snapshot": result.get("tech_snapshot", ""), + "timing_signal": result.get("timing_signal", ""), + "rr_ratio": result.get("rr_ratio", 0), + "status": status_display, + "note": result.get("action_note", ""), + "timestamp": result["reassessed_at"], + "updated_at": result["reassessed_at"], + "type": "自选策略" if is_wl else "持仓策略", + "source": old_entry.get("source", "auto"), # manual/auto,继承旧标记 + "sector_context": sector_ctx_str, # 市场上下文:行业表现+大盘宽度 + "stock_category": result.get("stock_category", "中短线"), # 组合监测用 + "position_advice": result.get("position_advice", "中等仓位"), + "time_horizon": result.get("time_horizon", "2周~3月"), + } + new_entry["trigger"] = trig + # created_at: 首次创建时设置,后续 preserve + old_entry = existing_decisions.get(code, {}) + if old_entry.get("created_at"): + new_entry["created_at"] = old_entry["created_at"] + else: + new_entry["created_at"] = result["reassessed_at"] + # 保留 last_reassessed_price(per_stock_reassess 维护的防抖字段) + if old_entry.get("last_reassessed_price"): + new_entry["last_reassessed_price"] = old_entry["last_reassessed_price"] + # 自选股也写止盈位(用于RR校验),但标签用"目标参考"非"止盈" + new_entry["take_profit"] = result.get("take_profit") + + # --- 变更追踪 --- + old_action = old_entry.get("action", "") + old_stop = old_entry.get("stop_loss") + old_target = old_entry.get("take_profit") + + # 构建旧策略摘要和变更理由 + update_reason = "" + changelog_entry = None + + if old_action and old_action != result["action"]: + # 策略有变化 → 记录变更 + old_summary = old_action + new_summary = result["action"] + + # 判断触发原因 + if abs(price - old_entry.get("price", price)) / max(price, 0.01) > 0.03: + trigger = f"价格变动({old_entry.get('price','?')}→{price})" + elif result.get("timing_signal") and result["timing_signal"] != old_entry.get("timing_signal", ""): + trigger = f"技术信号变化: {result['timing_signal']}" + else: + trigger = "技术面重评" + + # 格式化的变更理由(自选股只看止损,不看止盈) + diff_parts = [] + if old_stop and result["stop_loss"] != old_stop: + diff_parts.append(f"止损{old_stop}→{result['stop_loss']}") + if not is_wl and old_target and result.get("take_profit") and result["take_profit"] != old_target: + diff_parts.append(f"止盈{old_target}→{result['take_profit']}") + if diff_parts: + update_reason = f"{trigger}: {', '.join(diff_parts)} | {result.get('tech_snapshot','')[:60]}" + else: + update_reason = f"{trigger}: 策略文字调整" + + changelog_entry = { + "date": result["reassessed_at"], + "old_action": old_action, + "new_action": result["action"], + "reason": update_reason, + "trigger": trigger, + } + new_entry["updated_reason"] = update_reason + + elif not old_action: + # 首次创建策略 + update_reason = f"初始策略创建 | {result.get('tech_snapshot','')[:60]}" + changelog_entry = { + "date": result["reassessed_at"], + "old_action": "", + "new_action": result["action"], + "reason": update_reason, + "trigger": "初始创建", + } + + # 合并changelog + old_changelog = old_entry.get("changelog", []) if old_entry else [] + if changelog_entry: + new_entry["changelog"] = old_changelog + [changelog_entry] + else: + new_entry["changelog"] = old_changelog + + # 保留执行记录 + if old_entry and old_entry.get("execution"): + new_entry["execution"] = old_entry["execution"] + elif stock.get("analysis", {}).get("status") == "executing": + new_entry["execution"] = { + "status": "executing", + "entry_price": cost if cost else 0, + "shares": shares, + "notes": "", + } + + # --- 自动标记 current_recommend --- + # 只在真正执行中的持仓才自动推荐:execution.status 为 executing 或 partial_exit + exec_status = old_entry.get("execution", {}).get("status", "") if old_entry else "" + is_active = exec_status in ("executing", "partial_exit") + + profit_pct = (price - cost) / cost * 100 if cost else 0 + is_deep_loss_stock = profit_pct < -20 + rr = result.get("rr_ratio", 0) + ts = result.get("timing_signal", "") + note = result.get("action_note", "") + + # 计算是否在/接近买入区 + entry_low_val = result.get("entry_low", 0) + entry_high_val = result.get("entry_high", 0) + in_buy_zone = (entry_low_val > 0 and entry_high_val > 0 and + entry_low_val <= price <= entry_high_val) + near_buy_zone_low = (entry_low_val > 0 and + price >= entry_low_val * 0.98 and + price <= entry_high_val) + + # 推荐条件:必须是执行中的持仓 + 基本面条件达标 + is_recommendable = ( + is_active + and not is_deep_loss_stock + and rr >= 1.5 + and ts != "neutral" + and "不建议" not in note + ) + if is_recommendable: + new_entry["tag"] = "current_recommend" + else: + # 不清除 active_manual(用户手动标记),只清除自动推荐的 + old_tag = old_entry.get("tag", "") if old_entry else "" + if old_tag != "active_manual": + new_entry.pop("tag", None) + + decisions.append(new_entry) + + except Exception as e: + results.append({"code": code, "name": name, "error": str(e)}) + errors += 1 + if stdout: + print(f" ❌ {name}({code}): {e}") + + # 写回数据文件 — 保留现有字段(现金、总资产等)不丢 + try: + existing_pf = json.load(open(PORTFOLIO_PATH)) + except Exception: + existing_pf = {} + # 保留 price/change_pct — price_monitor 维护的实时价,regenerate_all 不应清除 + _existing_holdings_map = {} + for _h in existing_pf.get('holdings', []): + if _h.get('code'): + _existing_holdings_map[_h['code']] = _h + _new_holdings = pf.get("holdings", []) + for _h in _new_holdings: + _code = _h.get('code') + if _code and _code in _existing_holdings_map: + _old = _existing_holdings_map[_code] + _h['price'] = _old.get('price', 0) + _h['change_pct'] = _old.get('change_pct', 0) + existing_pf["holdings"] = _new_holdings + existing_pf["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M') + + # ── Watchlist ↔ Holdings 双向自动迁移(2026-06-27 Dad要求)── + # ① 持仓已有 → 从自选移除(买入自动清除) + wl_codes = {s.get("code") for s in wl.get("stocks", []) if s.get("code")} + pf_codes = {h.get("code") for h in _new_holdings if h.get("code") and h.get("shares", 0) > 0} + removed_from_wl = [] + for h_code in wl_codes & pf_codes: + # 持仓>0且量够 → 自选移除 + wl["stocks"] = [s for s in wl.get("stocks", []) if s.get("code") != h_code] + removed_from_wl.append(h_code) + if removed_from_wl and stdout: + print(f" 自选→持仓自动移除: {', '.join(removed_from_wl)}") + + # ② 清仓/卖光 → 加回自选(只要仍有关注价值) + added_to_wl = [] + old_pf_codes = {_h.get("code") for _h in existing_pf.get("holdings", []) if _h.get("code")} + sold_codes = old_pf_codes - pf_codes # 曾持仓但现在没有(或不在了) + for sc in sold_codes: + # 已有自选就不重复加 + if sc in wl_codes: + continue + # 从现有decisions看是否有关注价值 + for d in decisions: + if d.get("code") == sc and d.get("entry_low") and d.get("entry_high"): + wl["stocks"].append({ + "code": sc, "name": d.get("name", sc), + "entry_low": d.get("entry_low"), "entry_high": d.get("entry_high"), + "stop_loss": d.get("stop_loss", 0), + "analysis": {"action": d.get("action", ""), "tech_snapshot": d.get("tech_snapshot", "")} + }) + added_to_wl.append(sc) + break + if added_to_wl and stdout: + print(f" 清仓→自选自动加入: {', '.join(added_to_wl)}") + + # DB 写入(替代 JSON dump — 强制币种约束) + try: + from mofin_db import get_conn, write_holdings_batch, write_portfolio_summary, write_watchlist_stock, write_holding_strategy + conn = get_conn() + write_holdings_batch(conn, existing_pf.get('holdings', [])) + write_portfolio_summary(conn, existing_pf) + for s in wl.get('stocks', []): + s.setdefault('currency', 'CNY') + write_watchlist_stock(conn, s) + for d in decisions: + write_holding_strategy(conn, d.get('code', ''), d.get('name', ''), d) + conn.close() + except Exception as e: + print(f" [DB写入失败] {e}", flush=True) + # JSON 冷备 + json.dump(existing_pf, open(PORTFOLIO_PATH, "w"), ensure_ascii=False, indent=2) + json.dump(wl, open(WATCHLIST_PATH, "w"), ensure_ascii=False, indent=2) + + # 写 decisions.json + decisions_path = "/home/hmo/web-dashboard/data/decisions.json" + decisions_data = { + "decisions": decisions, # 全部保留 + "total": len(decisions), + "regenerated_at": datetime.now().strftime('%Y-%m-%d %H:%M'), + } + json.dump(decisions_data, open(decisions_path, "w"), ensure_ascii=False, indent=2) + # DB 已在上方写入(和 portfolio/watchlist 一起) + + # 记录策略→提示词版本关联 + if HAS_PROMPT_TRACKING: + try: + for d in decisions: + if d.get("code") and d.get("action"): + record_strategy_generation( + d["code"], d.get("name", ""), d.get("action", "") + ) + except Exception as e: + if stdout: + print(f" ⚠️ 提示词版本追踪失败: {e}", file=sys.stderr) + + # 刷新多周期缓存到磁盘 + try: + import multi_timeframe as _mtf + _mtf.flush_mtf_cache() + except Exception: + pass + + summary = {"total": total, "ok": ok, "errors": errors} + if stdout: + print(f"\n✅ 全量重评完成: {ok}/{total}成功, {errors}错误") + return summary + + +if __name__ == "__main__": + regenerate_all() diff --git a/scripts/strategy_review.py b/scripts/strategy_review.py index 15e133c..82a4e46 100644 --- a/scripts/strategy_review.py +++ b/scripts/strategy_review.py @@ -38,6 +38,10 @@ SIGNAL_FAILURES = { def fetch_price(code): + # DB 优先 + try: from mofin_db import get_price_from_db; p, _ = get_price_from_db(code); return p if p else 0 + except: pass + # Fallback: 腾讯 API try: prefix = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk" url = f"http://qt.gtimg.cn/q={prefix}{code}" diff --git a/scripts/xiaoguo_scanner.py b/scripts/xiaoguo_scanner.py index f2297b0..6240022 100644 --- a/scripts/xiaoguo_scanner.py +++ b/scripts/xiaoguo_scanner.py @@ -15,7 +15,7 @@ try: except ImportError: HAS_AKSHARE = False -DATA_DIR = Path(__file__).parent / "data" +DATA_DIR = Path("/home/hmo/MoFin/data") DB_PATH = DATA_DIR / "mofin.db" XIAOGUO_API = "http://node122:18003/v1/chat/completions" XIAOGUO_MODEL = "Qwen3.6-27B-MTPLX-Optimized-Speed" diff --git a/scripts/xiaoguo_signal_consumer.py b/scripts/xiaoguo_signal_consumer.py index 50c1cd5..63ee5d6 100644 --- a/scripts/xiaoguo_signal_consumer.py +++ b/scripts/xiaoguo_signal_consumer.py @@ -30,7 +30,11 @@ def clean_proxy(): def fetch_quote(code): - """拉腾讯行情,返回 dict""" + """拉行情。DB 优先,腾讯 fallback""" + # DB 优先 + try: from mofin_db import get_price_from_db; p, chg = get_price_from_db(code); return {"name":"", "code":code, "price":p, "change_pct":chg or 0} if p else None + except: pass + # Fallback: 腾讯 try: prefix = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk" url = f"http://qt.gtimg.cn/q={prefix}{code}"