diff --git a/capital_flow_collector.py b/capital_flow_collector.py deleted file mode 100644 index 7c8295d..0000000 --- a/capital_flow_collector.py +++ /dev/null @@ -1,145 +0,0 @@ -#!/usr/bin/env python3 -"""capital_flow_collector.py — 个股资金流数据采集器 - -每30分钟拉一次持仓+自选的超大单/大单/中单/小单资金流向。 -输出到 capital_flow_cache.json 供 price_monitor 和报告使用。 - -API: push2his.eastmoney.com 个股资金流日线 -""" -import json, os, sys, time -from datetime import datetime -from urllib.request import urlopen - -DATA_DIR = "/home/hmo/web-dashboard/data" -DECISIONS_PATH = f"{DATA_DIR}/decisions.json" -CACHE_PATH = f"{DATA_DIR}/capital_flow_cache.json" - -# eastmoney secid: 1=上海 0=深圳 -def secid(code): - code = str(code).strip() - if code.startswith(("6", "9")): - return f"1.{code}" - return f"0.{code}" - -def fetch_flow(code, days=5): - """拉取个股近N日资金流""" - sid = secid(code) - url = f"http://push2his.eastmoney.com/api/qt/stock/fflow/daykline/get?secid={sid}&fields1=f1,f2,f3,f7&fields2=f51,f52,f53,f54,f55,f56,f57&lmt={days}" - try: - resp = urlopen(url, timeout=5) - data = json.loads(resp.read().decode("utf-8")) - klines = data.get("data", {}).get("klines", []) - if not klines: - return None - result = [] - for k in klines: - p = k.split(",") - if len(p) >= 7: - result.append({ - "date": p[0], - "main_net": float(p[1]), # 主力净流入(元) - "super_large": float(p[2]), # 超大单净流入(元) - "large": float(p[3]), # 大单净流入(元) - "medium": float(p[4]), # 中单净流入(元) - "small": float(p[5]), # 小单净流入(元) - }) - return result - except Exception as e: - return None - -def fetch_flow_intraday(code): - """拉取当日分时资金流(用于盘中判断)""" - sid = secid(code) - url = f"http://push2.eastmoney.com/api/qt/stock/fflow/kline/get?secid={sid}&fields1=f1,f2,f3,f7&fields2=f51,f52,f53,f54,f55,f56,f57&klt=1&lmt=120" - try: - resp = urlopen(url, timeout=5) - data = json.loads(resp.read().decode("utf-8")) - klines = data.get("data", {}).get("klines", []) - if not klines: - return None - latest = klines[-1].split(",") - return { - "main_net": float(latest[1]), - "super_large": float(latest[2]), - "large": float(latest[3]), - } - except: - return None - -def analyze_flow(flow_data): - """分析资金流模式""" - if not flow_data or len(flow_data) < 2: - return {} - - result = {"alerts": [], "pattern": ""} - - # 最近两日对比 - d1 = flow_data[-1] # 最新日 - d2 = flow_data[-2] # 前一日 - - # 超大单信号 - sl1 = d1["super_large"] - sl2 = d2["super_large"] - - # 连续形态判断 - main_trend = sum(d["main_net"] for d in flow_data[-3:]) - sl_trend = sum(d["super_large"] for d in flow_data[-3:]) - - # 1. 主力连续流入 - if main_trend > 50000000 and sl1 > 0 and sl2 > 0: - result["pattern"] = "主力持续流入" - result["alerts"].append("主力连续3日净流入") - - # 2. 超大单突然转向(连续流入→流出 或 流出→流入) - if sl1 * sl2 < 0: # 方向反转 - if sl1 > 0 and sl2 < 0: - result["pattern"] = "超大单由出转入" - result["alerts"].append("超大单转为净买入(暗示消息即将落地)") - elif sl1 < 0 and sl2 > 0: - result["pattern"] = "超大单由入转出" - result["alerts"].append("超大单转为净卖出(利好出货嫌疑)") - - # 3. 价格与资金流背离(缺当前价格作比较,在主脚本中完成) - # 4. 单日暴量 - max_sl = max(abs(d["super_large"]) for d in flow_data) - if max_sl == abs(sl1) and abs(sl1) > 100000000: - result["pattern"] = "单日资金暴量" - result["alerts"].append(f"今日超大单异常: {sl1/100000000:.2f}亿") - - return result - -def main(): - codes = set() - # 读取持仓+自选 - try: - dec = json.load(open(DECISIONS_PATH)) - for d in dec.get("decisions", []): - c = d.get("code", "") - if c: - codes.add(c) - except: - pass - - all_flows = {} - - for code in sorted(codes): - flow = fetch_flow(code, days=5) - if flow: - analysis = analyze_flow(flow) - all_flows[code] = { - "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M"), - "flow": flow, - "analysis": analysis, - } - time.sleep(0.3) # API限流 - - # 写缓存 - cache = { - "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M"), - "stocks": all_flows, - } - json.dump(cache, open(CACHE_PATH, "w"), indent=2, ensure_ascii=False) - print(f"[capital_flow] {len(all_flows)}只更新完成") - -if __name__ == "__main__": - main() diff --git a/capital_flow_collector.py b/capital_flow_collector.py new file mode 120000 index 0000000..61357e8 --- /dev/null +++ b/capital_flow_collector.py @@ -0,0 +1 @@ +../scripts/capital_flow_collector.py \ No newline at end of file diff --git a/scripts/capital_flow_collector.py b/scripts/capital_flow_collector.py new file mode 100644 index 0000000..7c8295d --- /dev/null +++ b/scripts/capital_flow_collector.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 +"""capital_flow_collector.py — 个股资金流数据采集器 + +每30分钟拉一次持仓+自选的超大单/大单/中单/小单资金流向。 +输出到 capital_flow_cache.json 供 price_monitor 和报告使用。 + +API: push2his.eastmoney.com 个股资金流日线 +""" +import json, os, sys, time +from datetime import datetime +from urllib.request import urlopen + +DATA_DIR = "/home/hmo/web-dashboard/data" +DECISIONS_PATH = f"{DATA_DIR}/decisions.json" +CACHE_PATH = f"{DATA_DIR}/capital_flow_cache.json" + +# eastmoney secid: 1=上海 0=深圳 +def secid(code): + code = str(code).strip() + if code.startswith(("6", "9")): + return f"1.{code}" + return f"0.{code}" + +def fetch_flow(code, days=5): + """拉取个股近N日资金流""" + sid = secid(code) + url = f"http://push2his.eastmoney.com/api/qt/stock/fflow/daykline/get?secid={sid}&fields1=f1,f2,f3,f7&fields2=f51,f52,f53,f54,f55,f56,f57&lmt={days}" + try: + resp = urlopen(url, timeout=5) + data = json.loads(resp.read().decode("utf-8")) + klines = data.get("data", {}).get("klines", []) + if not klines: + return None + result = [] + for k in klines: + p = k.split(",") + if len(p) >= 7: + result.append({ + "date": p[0], + "main_net": float(p[1]), # 主力净流入(元) + "super_large": float(p[2]), # 超大单净流入(元) + "large": float(p[3]), # 大单净流入(元) + "medium": float(p[4]), # 中单净流入(元) + "small": float(p[5]), # 小单净流入(元) + }) + return result + except Exception as e: + return None + +def fetch_flow_intraday(code): + """拉取当日分时资金流(用于盘中判断)""" + sid = secid(code) + url = f"http://push2.eastmoney.com/api/qt/stock/fflow/kline/get?secid={sid}&fields1=f1,f2,f3,f7&fields2=f51,f52,f53,f54,f55,f56,f57&klt=1&lmt=120" + try: + resp = urlopen(url, timeout=5) + data = json.loads(resp.read().decode("utf-8")) + klines = data.get("data", {}).get("klines", []) + if not klines: + return None + latest = klines[-1].split(",") + return { + "main_net": float(latest[1]), + "super_large": float(latest[2]), + "large": float(latest[3]), + } + except: + return None + +def analyze_flow(flow_data): + """分析资金流模式""" + if not flow_data or len(flow_data) < 2: + return {} + + result = {"alerts": [], "pattern": ""} + + # 最近两日对比 + d1 = flow_data[-1] # 最新日 + d2 = flow_data[-2] # 前一日 + + # 超大单信号 + sl1 = d1["super_large"] + sl2 = d2["super_large"] + + # 连续形态判断 + main_trend = sum(d["main_net"] for d in flow_data[-3:]) + sl_trend = sum(d["super_large"] for d in flow_data[-3:]) + + # 1. 主力连续流入 + if main_trend > 50000000 and sl1 > 0 and sl2 > 0: + result["pattern"] = "主力持续流入" + result["alerts"].append("主力连续3日净流入") + + # 2. 超大单突然转向(连续流入→流出 或 流出→流入) + if sl1 * sl2 < 0: # 方向反转 + if sl1 > 0 and sl2 < 0: + result["pattern"] = "超大单由出转入" + result["alerts"].append("超大单转为净买入(暗示消息即将落地)") + elif sl1 < 0 and sl2 > 0: + result["pattern"] = "超大单由入转出" + result["alerts"].append("超大单转为净卖出(利好出货嫌疑)") + + # 3. 价格与资金流背离(缺当前价格作比较,在主脚本中完成) + # 4. 单日暴量 + max_sl = max(abs(d["super_large"]) for d in flow_data) + if max_sl == abs(sl1) and abs(sl1) > 100000000: + result["pattern"] = "单日资金暴量" + result["alerts"].append(f"今日超大单异常: {sl1/100000000:.2f}亿") + + return result + +def main(): + codes = set() + # 读取持仓+自选 + try: + dec = json.load(open(DECISIONS_PATH)) + for d in dec.get("decisions", []): + c = d.get("code", "") + if c: + codes.add(c) + except: + pass + + all_flows = {} + + for code in sorted(codes): + flow = fetch_flow(code, days=5) + if flow: + analysis = analyze_flow(flow) + all_flows[code] = { + "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M"), + "flow": flow, + "analysis": analysis, + } + time.sleep(0.3) # API限流 + + # 写缓存 + cache = { + "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M"), + "stocks": all_flows, + } + json.dump(cache, open(CACHE_PATH, "w"), indent=2, ensure_ascii=False) + print(f"[capital_flow] {len(all_flows)}只更新完成") + +if __name__ == "__main__": + main() diff --git a/scripts/per_stock_reassess.py b/scripts/per_stock_reassess.py new file mode 100644 index 0000000..8c8ded6 --- /dev/null +++ b/scripts/per_stock_reassess.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +""" +per_stock_reassess.py — 按个股触发重评 + +对每只传进来的 code 执行 reassess_strategy(),然后只更新 +decisions.json 中对应的那一条记录。不碰 portfolio.json,不跑全量。 +""" +import sys, json, os, re + +sys.path.insert(0, "/home/hmo/web-dashboard") +from strategy_lifecycle import reassess_with_context as reassess_strategy + +DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" + + +def main(): + codes = [a for a in sys.argv[1:] if not a.startswith("-")] + if not codes: + print("[FULL] 无指定编码,跑全量 regenerate_all()") + from strategy_lifecycle import regenerate_all + regenerate_all(stdout=False) + print("[FULL] 全量重评完成") + return + + # 读现有 decisions + with open(DECISIONS_PATH) as f: + raw = json.load(f) + decisions_map = {d["code"]: d for d in raw.get("decisions", []) if d.get("code")} + + ok = 0 + errors = 0 + skipped = 0 + for code in codes: + entry = decisions_map.get(code) + if not entry: + print(f"[SKIP] {code}: 不在 decisions.json 中") + errors += 1 + continue + + try: + # Always fetch live price for accurate reassessment + price = 0 + try: + import urllib.request + code_raw = entry.get("code", "") + sym_map = {"6":"sh","5":"sh","0":"sz","3":"sz"} + prefix = "" + for k, v in sym_map.items(): + if code_raw.startswith(k): + prefix = v + break + if not prefix: + prefix = "hk" if len(code_raw) == 5 else "sz" + url = f"http://qt.gtimg.cn/q={prefix}{code_raw}" + resp = urllib.request.urlopen(url, timeout=5) + text = resp.read().decode("gbk") + fields = text.split('"')[1].split("~") + price = float(fields[3]) if fields[3] else 0 + # 港股:腾讯API返回HKD,统一转CNY + if len(code_raw) == 5 and code_raw[0] in '01': + try: + sys.path.insert(0, '/home/hmo/MoFin') + from hk_rate import hkd_to_cny + _hkd_rate = hkd_to_cny() + except Exception: + _hkd_rate = 0.87 + price = round(price * _hkd_rate, 2) + print(f" 实时价: {price} {'(CNY)' if len(code_raw) == 5 and code_raw[0] in '01' else ''}") + except Exception as e: + print(f" 实时价获取失败: {e}", file=sys.stderr) + # Try portfolio.json as fallback (price_monitor keeps live prices) + try: + with open("/home/hmo/web-dashboard/data/portfolio.json") as _pf: + _pf_data = json.load(_pf) + for _h in _pf_data.get("holdings", []): + if _h["code"] == code_raw: + price = float(_h.get("price", 0)) + print(f" 从portfolio.json取实时价: {price}") + break + except Exception: + pass + if price == 0: + price = entry.get("current_price") or entry.get("price") or 0 + print(f" fallback到存储价: {price}", file=sys.stderr) + + # Price diff debounce: skip reassessment if price changed < 1% since last update + last_price = entry.get("last_reassessed_price", 0) + if last_price > 0 and price > 0: + diff_pct = abs(price - last_price) / last_price * 100 + if diff_pct < 1.0: + print(f" 价差仅{diff_pct:.2f}% (<1%),跳过重评(上次价={last_price},现价={price})") + skipped += 1 + continue + result = reassess_strategy( + code=code, + name=entry.get("name", ""), + price=price, + cost=entry.get("cost", 0), + shares=entry.get("shares", 0), + current_action=entry.get("action", ""), + is_watchlist=entry.get("type", "") in ("自选策略", "watchlist"), + ) + if result and result.get("action"): + # 持仓股止损不下移(移动止损规则):已有仓位的止损只上不下 + is_held = entry.get("cost", 0) > 0 and entry.get("shares", 0) > 0 and \ + entry.get("type", "") not in ("自选策略", "watchlist") + old_stop = entry.get("stop_loss", 0) + new_stop = result.get("stop_loss", 0) + if is_held and old_stop > 0 and new_stop > 0 and new_stop < old_stop: + print(f" 移动止损保护: {new_stop}→保持{old_stop} (持仓止损不下移)") + result["stop_loss"] = old_stop + # 同时更新 action 字符串中的止损值 + act = result.get("action", "") + if act: + act = re.sub(r'止损[\d.]+', f'止损{old_stop}', act) + result["action"] = act + + # 更新 decisions_map 中对应的条目 + updated = entry.copy() + updated.update({ + "action": result["action"], + "stop_loss": result.get("stop_loss", entry.get("stop_loss")), + "entry_low": result.get("entry_low", entry.get("entry_low")), + "entry_high": result.get("entry_high", entry.get("entry_high")), + "take_profit": result.get("take_profit"), + "tech_snapshot": result.get("tech_snapshot", entry.get("tech_snapshot")), + "timing_signal": result.get("timing_signal", entry.get("timing_signal")), + "rr_ratio": result.get("rr_ratio", entry.get("rr_ratio", 0)), + "status": result.get("status", "updated"), + "price": price, + }) + # Save last reassessed price for debounce tracking + updated["last_reassessed_price"] = price + decisions_map[code] = updated + # ——— 初始化多分支策略树 ——— + try: + sys.path.insert(0, '/home/hmo/MoFin') + from strategy_tree import init_default_branches + branches = init_default_branches( + code, + entry.get('name', ''), + result.get('entry_low', 0), + result.get('entry_high', 0), + result.get('stop_loss', 0), + result.get('take_profit', 0), + ) + st = updated.setdefault('strategy_tree', {}) + st['branches'] = branches + except Exception: + pass + print(f"[OK] {code} {entry.get('name','')}: {result['action'][:80]}") + ok += 1 + else: + print(f"[SYNCED] {code}: 无变更") + ok += 1 + except Exception as e: + print(f"[ERROR] {code}: {e}", file=sys.stderr) + errors += 1 + + # 写回 decisions.json(只更新被修改的那条,其余保留原样) + raw["decisions"] = list(decisions_map.values()) + raw["total"] = len(raw["decisions"]) + from datetime import datetime + raw["regenerated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M") + with open(DECISIONS_PATH, "w") as f: + json.dump(raw, f, ensure_ascii=False, indent=2) + + print(f"[DONE] {ok}成功 {skipped}跳过 {errors}失败") + + +if __name__ == "__main__": + main()