#!/usr/bin/env python3 """ 依赖审计脚本 - 检测技能间的依赖冲突 用法: python skills/dep_audit.py # 检查所有技能 python skills/dep_audit.py --json # JSON格式输出 python skills/dep_audit.py --fix # 尝试自动修复冲突 """ import os import sys import re import json import subprocess from pathlib import Path from typing import Dict, List, Set, Tuple, Optional from dataclasses import dataclass, field # Fix Windows console encoding if sys.platform == "win32": os.environ["PYTHONIOENCODING"] = "utf-8" @dataclass class Package: name: str version: str = "" operator: str = ">=" # >=, ==, > extras: List[str] = field(default_factory=list) @dataclass class Conflict: package: str required_versions: List[str] installed_version: str skills: List[str] = field(default_factory=list) severity: str = "warning" # error, warning, info def parse_requirement(line: str) -> Optional[Package]: """解析单行requirements""" line = line.strip() if not line or line.startswith("#") or line.startswith("-"): return None # 处理 extras like package[extra1,extra2]>=1.0 match = re.match(r"([a-zA-Z0-9_-]+)(?:\[([^\]]+)\])?([><=!]+)?(.+)?", line) if not match: return None name = match.group(1).lower().replace("-", "_") extras = [e.strip() for e in match.group(2).split(",")] if match.group(2) else [] operator = match.group(3) or ">=" version = match.group(4) or "" return Package(name, version, operator, extras) def get_installed_packages() -> Dict[str, str]: """获取已安装的包版本""" result = {} try: output = subprocess.run( [sys.executable, "-m", "pip", "list", "--format=json"], capture_output=True, text=True, timeout=30, ) if output.returncode == 0: for pkg in json.loads(output.stdout): result[pkg["name"].lower().replace("-", "_")] = pkg["version"] except Exception as e: print(f"Warning: Could not get pip list: {e}", file=sys.stderr) return result def find_requirements(base_path: Path) -> Dict[str, List[Package]]: """查找所有技能的requirements.txt""" skills_requirements = {} skills_dir = base_path / ".opencode" / "skills" if not skills_dir.exists(): return skills_requirements for skill_dir in skills_dir.iterdir(): if not skill_dir.is_dir(): continue req_file = skill_dir / "requirements.txt" if req_file.exists(): packages = [] for line in req_file.read_text(encoding="utf-8").splitlines(): pkg = parse_requirement(line) if pkg: packages.append(pkg) skills_requirements[skill_dir.name] = packages return skills_requirements def check_conflicts( skills_requirements: Dict[str, List[Package]], installed: Dict[str, str] ) -> Tuple[List[Conflict], Dict[str, Set[str]]]: """检测依赖冲突""" conflicts = [] all_deps = {} # package -> {skill -> version} # 收集所有依赖 for skill, packages in skills_requirements.items(): for pkg in packages: if pkg.name not in all_deps: all_deps[pkg.name] = set() all_deps[pkg.name].add( f"{pkg.operator}{pkg.version}" if pkg.version else "(any)" ) # 检测冲突 for pkg_name, versions in all_deps.items(): if len(versions) <= 1: continue installed_ver = installed.get(pkg_name, "not installed") # 检查是否满足所有要求 satisfied = True for skill, packages in skills_requirements.items(): for pkg in packages: if pkg.name == pkg_name and pkg.version: if not satisfies_version(installed_ver, pkg.operator, pkg.version): satisfied = False break if not satisfied: conflicts.append( Conflict( package=pkg_name, required_versions=list(versions), installed_version=installed_ver, severity="error", ) ) return conflicts, all_deps def satisfies_version(installed: str, operator: str, required: str) -> bool: """简单版本检查""" if not installed or installed == "not installed": return False try: from packaging import version inst = version.parse(installed) req = version.parse(required) if operator == ">=": return inst >= req elif operator == "==": return inst == req elif operator == ">": return inst > req elif operator == "<": return inst < req elif operator == "<=": return inst <= req elif operator == "!=": return inst != req except: pass return True # 无法判断时返回True避免误报 def generate_report( skills_requirements: Dict[str, List[Package]], conflicts: List[Conflict], all_deps: Dict[str, Set[str]], installed: Dict[str, str], json_output: bool = False, ) -> str: """生成报告""" if json_output: return json.dumps( { "skills": list(skills_requirements.keys()), "conflicts": [ { "package": c.package, "required": c.required_versions, "installed": c.installed_version, "severity": c.severity, } for c in conflicts ], "all_dependencies": {k: list(v) for k, v in all_deps.items()}, }, indent=2, ) lines = [] lines.append("=" * 60) lines.append("技能依赖审计报告") lines.append("=" * 60) lines.append(f"扫描技能数: {len(skills_requirements)}") lines.append(f"发现依赖数: {len(all_deps)}") lines.append(f"冲突数: {len(conflicts)}") lines.append("") if conflicts: lines.append("[!] 依赖冲突:") lines.append("-" * 40) for c in conflicts: lines.append(f" {c.package}:") lines.append(f" 需要: {', '.join(c.required_versions)}") lines.append(f" 当前: {c.installed_version}") lines.append("") else: lines.append("[+] 未发现依赖冲突") lines.append("") lines.append("各技能依赖清单:") lines.append("-" * 40) for skill, packages in sorted(skills_requirements.items()): lines.append(f" {skill}: {len(packages)} 个依赖") for pkg in packages: ver = f"{pkg.operator}{pkg.version}" if pkg.version else "" lines.append(f" - {pkg.name} {ver}") return "\n".join(lines) def main(): import argparse parser = argparse.ArgumentParser(description="技能依赖审计工具") parser.add_argument("--json", action="store_true", help="JSON格式输出") parser.add_argument("--fix", action="store_true", help="尝试自动修复") parser.add_argument("--path", default=".", help="工作目录") args = parser.parse_args() base_path = Path(args.path).resolve() # 扫描 skills_requirements = find_requirements(base_path) installed = get_installed_packages() conflicts, all_deps = check_conflicts(skills_requirements, installed) # 输出 report = generate_report( skills_requirements, conflicts, all_deps, installed, args.json ) print(report) # 退出码 sys.exit(1 if conflicts else 0) if __name__ == "__main__": main()