""" Process Guard — prevent duplicate instances via Windows named mutex. On Windows: uses CreateMutexW (OS-level, auto-released on crash/kill). Fallback: PID lock file for non-Windows. Usage: from proc_guard import guard lock = guard("xmpp_bot") if not lock.ok: print(lock.message) # "already running" sys.exit(1) # ... proceed ... The mutex handle is held for the process lifetime. On normal exit, crash, or kill: Windows automatically releases it. No stale lock files, no manual cleanup needed. """ import os, sys, platform, atexit _MUTEX_CACHE: dict[str, int] = {} # name → handle, held for process lifetime class _LockResult: def __init__(self, ok: bool, message: str = ""): self.ok = ok self.message = message def guard(name: str, kill: bool = False, force: bool = False) -> _LockResult: """ Acquire a singleton lock for *name* using Windows named mutex. Args: name: unique name (e.g. "xmpp_bot", "wechat_agent", "api_proxy") kill: ignored on Windows (mutex can't be killed, OS manages it) force: ignored on Windows (mutex can't be forced) Returns: _LockResult(ok=True) — lock acquired (first instance) _LockResult(ok=False) — another instance is already running On success, holds the mutex handle until process exit. No cleanup needed — Windows auto-releases on crash/kill/exit. """ if platform.system() != "Windows": # Fallback: PID lock file for Linux/Mac return _pidfile_fallback(name) import ctypes from ctypes import wintypes kernel32 = ctypes.windll.kernel32 mutex_name = f"Global\\proc_guard_{name}" # CreateMutexW returns a handle. If ERROR_ALREADY_EXISTS → another instance holds it. ERROR_ALREADY_EXISTS = 183 handle = kernel32.CreateMutexW(None, True, mutex_name) if not handle: return _LockResult(False, f"[proc_guard] {name}: CreateMutex failed") last_err = ctypes.GetLastError() if last_err == ERROR_ALREADY_EXISTS: kernel32.CloseHandle(handle) return _LockResult(False, f"[proc_guard] {name}: another instance is already running") # We hold the mutex. Store handle so it stays alive until process dies. _MUTEX_CACHE[name] = handle return _LockResult(True, f"[proc_guard] {name}: lock acquired") # ── PID file fallback for Linux/Mac ──────────────────────────────── def _pidfile_fallback(name: str) -> _LockResult: import signal, time _LOCK_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "temp") os.makedirs(_LOCK_DIR, exist_ok=True) path = os.path.join(_LOCK_DIR, f"{name}.lock") my_pid = os.getpid() def _read_pid(p): try: with open(p) as f: return int(f.read().strip()) except: return None def _pid_alive(pid): try: os.kill(pid, 0) return True except OSError: return False if os.path.exists(path): existing_pid = _read_pid(path) if existing_pid and existing_pid != my_pid and _pid_alive(existing_pid): return _LockResult(False, f"[proc_guard] {name} already running (PID {existing_pid})") try: with open(path, "w") as f: f.write(str(my_pid)) except Exception as e: return _LockResult(False, f"[proc_guard] cannot write lock: {e}") def _cleanup(): try: if os.path.exists(path): current = os.getpid() existing = _read_pid(path) if existing == current: os.remove(path) except Exception: pass atexit.register(_cleanup) for sig_name in ("SIGTERM", "SIGINT", "SIGBREAK"): try: sig = getattr(signal, sig_name, None) if sig: signal.signal(sig, lambda *a: (_cleanup(), sys.exit(1))) except Exception: pass return _LockResult(True)