Initial commit to git.yoin

This commit is contained in:
hmo
2026-02-11 22:02:47 +08:00
commit cf10ab6473
153 changed files with 14581 additions and 0 deletions

View File

@@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""数据库连接器 - 支持直连和SSH隧道两种模式"""
import json
import sys
from pathlib import Path
from contextlib import contextmanager
try:
import pymysql
except ImportError as e:
print(f"缺少依赖: {e}")
print("请运行: pip install pymysql")
sys.exit(1)
def load_config():
"""加载配置文件"""
config_path = Path(__file__).parent.parent / "config" / "settings.json"
if not config_path.exists():
print(f"配置文件不存在: {config_path}")
print("请复制 settings.json.example 为 settings.json 并填写配置")
sys.exit(1)
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f)
@contextmanager
def get_db_connection():
"""获取数据库连接自动判断直连或SSH隧道"""
config = load_config()
ssh_config = config.get("ssh")
db_config = config["database"]
use_ssh = ssh_config and ssh_config.get("host")
if use_ssh:
try:
from sshtunnel import SSHTunnelForwarder
import paramiko
except ImportError:
print("SSH隧道需要额外依赖: pip install paramiko sshtunnel")
sys.exit(1)
tunnel = SSHTunnelForwarder(
(ssh_config["host"], ssh_config["port"]),
ssh_username=ssh_config["username"],
ssh_password=ssh_config.get("password"),
ssh_pkey=ssh_config.get("key_file"),
remote_bind_address=(db_config["host"], db_config["port"]),
local_bind_address=("127.0.0.1",),
)
try:
tunnel.start()
connection = pymysql.connect(
host="127.0.0.1",
port=tunnel.local_bind_port,
user=db_config["username"],
password=db_config["password"],
database=db_config["database"],
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=config["query"]["timeout"],
)
try:
yield connection
finally:
connection.close()
finally:
tunnel.stop()
else:
connection = pymysql.connect(
host=db_config["host"],
port=db_config["port"],
user=db_config["username"],
password=db_config["password"],
database=db_config["database"],
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=config["query"]["timeout"],
)
try:
yield connection
finally:
connection.close()
def test_connection():
"""测试数据库连接"""
config = load_config()
use_ssh = config.get("ssh") and config["ssh"].get("host")
if use_ssh:
print("正在建立SSH隧道...")
else:
print("正在直连数据库...")
try:
with get_db_connection() as conn:
print("数据库连接成功!")
with conn.cursor() as cursor:
cursor.execute("SELECT 1 as test")
result = cursor.fetchone()
print(f"测试查询结果: {result}")
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
print(f"\n数据库中共有 {len(tables)} 张表:")
for t in tables:
table_name = list(t.values())[0]
print(f" - {table_name}")
return True
except Exception as e:
print(f"连接失败: {e}")
return False
if __name__ == "__main__":
test_connection()

View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""智能查询主脚本 - 执行SQL并返回结果"""
import argparse
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from db_connector import get_db_connection, load_config
def execute_query(sql: str, max_rows: int = None) -> dict:
"""执行SQL查询"""
config = load_config()
if max_rows is None:
max_rows = config["query"]["max_rows"]
result = {
"success": False,
"sql": sql,
"data": [],
"row_count": 0,
"columns": [],
"message": ""
}
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(sql)
if sql.strip().upper().startswith("SELECT") or sql.strip().upper().startswith("SHOW") or sql.strip().upper().startswith("DESC"):
rows = cursor.fetchmany(max_rows)
result["data"] = rows
result["row_count"] = len(rows)
if rows:
result["columns"] = list(rows[0].keys())
total = cursor.fetchall()
if total:
result["message"] = f"返回 {result['row_count']} 行(共 {result['row_count'] + len(total)} 行,已截断)"
else:
result["message"] = f"返回 {result['row_count']}"
else:
conn.commit()
result["row_count"] = cursor.rowcount
result["message"] = f"影响 {cursor.rowcount}"
result["success"] = True
except Exception as e:
result["message"] = f"查询失败: {str(e)}"
return result
def format_result(result: dict, output_format: str = "table") -> str:
"""格式化查询结果"""
if not result["success"]:
return f"错误: {result['message']}"
if not result["data"]:
return result["message"]
if output_format == "json":
return json.dumps(result["data"], ensure_ascii=False, indent=2, default=str)
columns = result["columns"]
rows = result["data"]
col_widths = {col: len(str(col)) for col in columns}
for row in rows:
for col in columns:
col_widths[col] = max(col_widths[col], len(str(row.get(col, ""))))
header = " | ".join(str(col).ljust(col_widths[col]) for col in columns)
separator = "-+-".join("-" * col_widths[col] for col in columns)
lines = [header, separator]
for row in rows:
line = " | ".join(str(row.get(col, "")).ljust(col_widths[col]) for col in columns)
lines.append(line)
lines.append(f"\n{result['message']}")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="智能数据库查询")
parser.add_argument("sql", help="要执行的SQL语句")
parser.add_argument("-n", "--max-rows", type=int, help="最大返回行数")
parser.add_argument("-f", "--format", choices=["table", "json"], default="table", help="输出格式")
parser.add_argument("--raw", action="store_true", help="输出原始JSON结果")
args = parser.parse_args()
result = execute_query(args.sql, args.max_rows)
if args.raw:
print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
else:
print(format_result(result, args.format))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,111 @@
#!/usr/bin/env python3
"""数据库表结构加载器 - 生成表结构文档"""
import sys
from pathlib import Path
from datetime import datetime
sys.path.insert(0, str(Path(__file__).parent))
from db_connector import get_db_connection
def get_table_schema(cursor, table_name: str) -> dict:
"""获取单张表的结构信息"""
cursor.execute(f"DESCRIBE `{table_name}`")
columns = cursor.fetchall()
cursor.execute(f"SHOW CREATE TABLE `{table_name}`")
create_sql = cursor.fetchone()
try:
cursor.execute(f"SELECT COUNT(*) as cnt FROM `{table_name}`")
row_count = cursor.fetchone()["cnt"]
except:
row_count = "未知"
return {
"name": table_name,
"columns": columns,
"create_sql": create_sql.get("Create Table", ""),
"row_count": row_count
}
def generate_schema_markdown(tables: list) -> str:
"""生成Markdown格式的表结构文档"""
lines = [
"# 数据库表结构",
"",
f"> 自动生成于 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"",
"## 表清单",
"",
"| 表名 | 行数 | 说明 |",
"|------|------|------|",
]
for t in tables:
lines.append(f"| `{t['name']}` | {t['row_count']} | |")
lines.extend(["", "---", ""])
for t in tables:
lines.extend([
f"## {t['name']}",
"",
f"行数: {t['row_count']}",
"",
"### 字段",
"",
"| 字段名 | 类型 | 可空 | 键 | 默认值 | 备注 |",
"|--------|------|------|-----|--------|------|",
])
for col in t["columns"]:
field = col.get("Field", "")
col_type = col.get("Type", "")
null = col.get("Null", "")
key = col.get("Key", "")
default = col.get("Default", "") or ""
extra = col.get("Extra", "")
lines.append(f"| `{field}` | {col_type} | {null} | {key} | {default} | {extra} |")
lines.extend(["", "---", ""])
return "\n".join(lines)
def main():
print("正在连接数据库...")
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SHOW TABLES")
table_list = cursor.fetchall()
tables = []
for t in table_list:
table_name = list(t.values())[0]
print(f" 加载表结构: {table_name}")
schema = get_table_schema(cursor, table_name)
tables.append(schema)
markdown = generate_schema_markdown(tables)
output_path = Path(__file__).parent.parent / "references" / "schema.md"
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write(markdown)
print(f"\n表结构文档已生成: {output_path}")
print(f"{len(tables)} 张表")
except Exception as e:
print(f"加载失败: {e}")
sys.exit(1)
if __name__ == "__main__":
main()