Files
MoFin/src/prompt_manager/tracking.py
T
zhiwei 9b9c37002a Initial: MoFin 持仓分析与策略管理系统
核心模块:
- 策略生命周期管理 (strategy_lifecycle.py)
- 技术分析引擎 (technical_analysis.py)
- 双维度策略评估 (strategy_evaluator.py)
- 实时行情获取 (get_realtime_prices.py)
- Web Dashboard (server.py, :8899)

提示词版本管理:
- prompt_manager 模块 — 统一管理所有知微提示词
- 8个提示词共24个版本已录入
- 策略→提示词版本关联追踪
- Dashboard「提示词」Tab

数据源增强:
- 服务端 POST /api/update/realtime 端点已就绪
- clients/tdx-relay/ — 小小莫在Windows上开发的通达信中继
- 解决港股15分钟延迟问题
2026-06-12 22:54:51 +08:00

123 lines
4.1 KiB
Python

"""策略→提示词版本关联追踪
当 strategy_lifecycle.py 生成策略时,记录当前使用的提示词版本,
将每只股票的策略与生成它的提示词版本关联起来。
"""
import json
from pathlib import Path
from datetime import datetime
from typing import Optional
from .models import StrategyLink
from .registry import get_prompt
DATA_DIR = Path("/home/hmo/projects/MoFin/data/prompts")
ASSOCIATIONS_PATH = DATA_DIR / "associations.json"
def load_associations() -> dict:
"""加载关联记录"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
if not ASSOCIATIONS_PATH.exists():
return {"associations": [], "updated_at": datetime.now().isoformat()}
try:
return json.loads(ASSOCIATIONS_PATH.read_text(encoding="utf-8"))
except (json.JSONDecodeError, FileNotFoundError):
return {"associations": [], "updated_at": datetime.now().isoformat()}
def save_associations(data: dict):
DATA_DIR.mkdir(parents=True, exist_ok=True)
data["updated_at"] = datetime.now().isoformat()
ASSOCIATIONS_PATH.write_text(
json.dumps(data, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def get_current_strategy_prompt_version() -> dict:
"""获取当前活跃的策略生成提示词版本
返回 {prompt_id, version, label} 或 None
"""
prompt = get_prompt("strategy-generation")
if not prompt:
return None
for v in prompt.versions:
if v.version == prompt.current_version:
return {
"prompt_id": prompt.id,
"version": v.version,
"label": v.label,
}
return None
def record_strategy_generation(code: str, name: str, strategy_action: str) -> StrategyLink:
"""记录策略生成事件
由 strategy_lifecycle.py 在生成策略时调用
"""
pv = get_current_strategy_prompt_version()
if not pv:
# 如果没有提示词版本记录,创建一个默认记录
pv = {"prompt_id": "strategy-generation", "version": "unknown", "label": "未知版本"}
link = StrategyLink(
code=code,
name=name,
prompt_id=pv["prompt_id"],
prompt_version=pv["version"],
strategy_action=strategy_action[:500], # 截断保护
generated_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
)
data = load_associations()
data.setdefault("associations", []).append(link.to_dict())
# 只保留最近的10000条记录
if len(data["associations"]) > 10000:
data["associations"] = data["associations"][-10000:]
save_associations(data)
return link
def get_associations_for_stock(code: str, max_results: int = 10) -> list:
"""获取某只股票的所有策略关联记录"""
data = load_associations()
records = [a for a in data.get("associations", []) if a["code"] == code]
return sorted(records, key=lambda x: x.get("generated_at", ""), reverse=True)[:max_results]
def get_associations_for_prompt_version(prompt_id: str, version: str) -> list:
"""获取某个提示词版本生成的所有策略"""
data = load_associations()
return [a for a in data.get("associations", [])
if a.get("prompt_id") == prompt_id and a.get("prompt_version") == version]
def get_strategy_version_stats() -> dict:
"""统计每个提示词版本生成的策略数量"""
data = load_associations()
stats = {}
for a in data.get("associations", []):
key = f"{a.get('prompt_id', '?')}@{a.get('prompt_version', '?')}"
if key not in stats:
stats[key] = {
"prompt_id": a.get("prompt_id"),
"version": a.get("prompt_version"),
"count": 0,
"stocks": set(),
"last_generated": "",
}
stats[key]["count"] += 1
stats[key]["stocks"].add(a.get("code"))
if a.get("generated_at", "") > stats[key]["last_generated"]:
stats[key]["last_generated"] = a.get("generated_at", "")
# 转换 set 为 list 以便 JSON 序列化
for k in stats:
stats[k]["stocks"] = sorted(stats[k]["stocks"])
return stats