Files

850 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
RAPHL 日志分析器 - 全维度智能分析
Author: 翟星人
Created: 2026-01-18
支持多种日志类型的自动识别和深度分析:
- Java/应用日志异常堆栈、ERROR/WARN
- MySQL BinlogDDL/DML 操作、表变更、事务分析
- 审计日志:用户操作、权限变更
- 告警日志:告警级别、告警源
- Trace 日志:链路追踪、调用关系
- 通用日志IP、时间、关键词提取
核心能力:
1. 自动识别日志类型
2. 提取关键实体IP、用户、表名、thread_id 等)
3. 时间线分析
4. 关联分析(因果链、操作链)
5. 智能洞察和异常检测
"""
import argparse
import re
import hashlib
import json
from pathlib import Path
from collections import defaultdict, Counter
from datetime import datetime
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
class LogType(Enum):
JAVA_APP = "java_app"
MYSQL_BINLOG = "mysql_binlog"
NGINX_ACCESS = "nginx_access"
AUDIT = "audit"
TRACE = "trace"
ALERT = "alert"
GENERAL = "general"
@dataclass
class Entity:
"""提取的实体"""
type: str # ip, user, table, thread_id, trace_id, bucket, etc.
value: str
line_num: int
context: str = ""
@dataclass
class Operation:
"""操作记录"""
line_num: int
time: str
op_type: str # DELETE, UPDATE, INSERT, DROP, SELECT, API_CALL, etc.
target: str # 表名、接口名等
detail: str
entities: list = field(default_factory=list) # 关联的实体
raw_content: str = ""
@dataclass
class Alert:
"""告警记录"""
line_num: int
time: str
level: str # CRITICAL, WARNING, INFO
source: str
message: str
entities: list = field(default_factory=list)
@dataclass
class Trace:
"""链路追踪"""
trace_id: str
span_id: str
parent_id: str
service: str
operation: str
duration: float
status: str
line_num: int
@dataclass
class Insight:
"""分析洞察"""
category: str # security, performance, error, anomaly
severity: str # critical, high, medium, low
title: str
description: str
evidence: list = field(default_factory=list)
recommendation: str = ""
class SmartLogAnalyzer:
"""智能日志分析器 - 全维度感知"""
# ============ 实体提取模式 ============
ENTITY_PATTERNS = {
'ip': re.compile(r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b'),
'ip_port': re.compile(r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+)\b'),
'mac': re.compile(r'\b([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}\b'),
'email': re.compile(r'\b[\w.-]+@[\w.-]+\.\w+\b'),
'url': re.compile(r'https?://[^\s<>"\']+'),
'uuid': re.compile(r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b', re.I),
'trace_id': re.compile(r'\b(?:trace[_-]?id|traceid|x-trace-id)[=:\s]*([a-zA-Z0-9_-]{16,64})\b', re.I),
'span_id': re.compile(r'\b(?:span[_-]?id|spanid)[=:\s]*([a-zA-Z0-9_-]{8,32})\b', re.I),
'request_id': re.compile(r'\b(?:request[_-]?id|req[_-]?id)[=:\s]*([a-zA-Z0-9_-]{8,64})\b', re.I),
'user_id': re.compile(r'\b(?:user[_-]?id|uid|userid)[=:\s]*([a-zA-Z0-9_-]+)\b', re.I),
'thread_id': re.compile(r'\bthread[_-]?id[=:\s]*(\d+)\b', re.I),
'session_id': re.compile(r'\b(?:session[_-]?id|sid)[=:\s]*([a-zA-Z0-9_-]+)\b', re.I),
'ak': re.compile(r'\b(?:ak|access[_-]?key)[=:\s]*([a-zA-Z0-9]{16,64})\b', re.I),
'bucket': re.compile(r'\bbucket[=:\s]*([a-zA-Z0-9_.-]+)\b', re.I),
'database': re.compile(r'`([a-zA-Z_][a-zA-Z0-9_]*)`\.`([a-zA-Z_][a-zA-Z0-9_]*)`'),
'duration_ms': re.compile(r'\b(?:duration|cost|elapsed|time)[=:\s]*(\d+(?:\.\d+)?)\s*(?:ms|毫秒)\b', re.I),
'duration_s': re.compile(r'\b(?:duration|cost|elapsed|time)[=:\s]*(\d+(?:\.\d+)?)\s*(?:s|秒)\b', re.I),
'error_code': re.compile(r'\b(?:error[_-]?code|errno|code)[=:\s]*([A-Z0-9_-]+)\b', re.I),
'http_status': re.compile(r'\b(?:status|http[_-]?code)[=:\s]*([1-5]\d{2})\b', re.I),
}
# ============ 时间格式 ============
TIME_PATTERNS = [
(re.compile(r'(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d{3})?)'), '%Y-%m-%d %H:%M:%S'),
(re.compile(r'(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2})'), '%d/%b/%Y:%H:%M:%S'),
(re.compile(r'#(\d{6} \d{2}:\d{2}:\d{2})'), '%y%m%d %H:%M:%S'), # MySQL binlog
(re.compile(r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2})'), '%d/%b/%Y:%H:%M:%S'), # Nginx
]
# ============ 日志类型识别 ============
LOG_TYPE_SIGNATURES = {
LogType.MYSQL_BINLOG: [
re.compile(r'server id \d+.*end_log_pos'),
re.compile(r'GTID.*last_committed'),
re.compile(r'Table_map:.*mapped to number'),
re.compile(r'(Delete_rows|Update_rows|Write_rows|Query).*table id'),
],
LogType.JAVA_APP: [
re.compile(r'(ERROR|WARN|INFO|DEBUG)\s+[\w.]+\s+-'),
re.compile(r'^\s+at\s+[\w.$]+\([\w.]+:\d+\)'),
re.compile(r'Exception|Error|Throwable'),
],
LogType.NGINX_ACCESS: [
re.compile(r'\d+\.\d+\.\d+\.\d+\s+-\s+-\s+\['),
re.compile(r'"(GET|POST|PUT|DELETE|HEAD|OPTIONS)\s+'),
],
LogType.TRACE: [
re.compile(r'trace[_-]?id', re.I),
re.compile(r'span[_-]?id', re.I),
re.compile(r'parent[_-]?id', re.I),
],
LogType.ALERT: [
re.compile(r'(CRITICAL|ALERT|EMERGENCY)', re.I),
re.compile(r'告警|报警|alarm', re.I),
],
}
# ============ MySQL Binlog 分析 ============
BINLOG_PATTERNS = {
'gtid': re.compile(r"GTID_NEXT=\s*'([^']+)'"),
'thread_id': re.compile(r'thread_id=(\d+)'),
'server_id': re.compile(r'server id (\d+)'),
'table_map': re.compile(r'Table_map:\s*`(\w+)`\.`(\w+)`\s*mapped to number (\d+)'),
'delete_rows': re.compile(r'Delete_rows:\s*table id (\d+)'),
'update_rows': re.compile(r'Update_rows:\s*table id (\d+)'),
'write_rows': re.compile(r'Write_rows:\s*table id (\d+)'),
'query': re.compile(r'Query\s+thread_id=(\d+)'),
'xid': re.compile(r'Xid\s*=\s*(\d+)'),
'delete_from': re.compile(r'###\s*DELETE FROM\s*`(\w+)`\.`(\w+)`'),
'update': re.compile(r'###\s*UPDATE\s*`(\w+)`\.`(\w+)`'),
'insert': re.compile(r'###\s*INSERT INTO\s*`(\w+)`\.`(\w+)`'),
'time': re.compile(r'#(\d{6} \d{2}:\d{2}:\d{2})'),
}
# ============ 告警级别 ============
ALERT_PATTERNS = {
'CRITICAL': re.compile(r'\b(CRITICAL|FATAL|EMERGENCY|P0|严重|致命)\b', re.I),
'HIGH': re.compile(r'\b(ERROR|ALERT|P1|高|错误)\b', re.I),
'MEDIUM': re.compile(r'\b(WARN|WARNING|P2|中|警告)\b', re.I),
'LOW': re.compile(r'\b(INFO|NOTICE|P3|低|提示)\b', re.I),
}
# ============ 敏感操作 ============
SENSITIVE_OPS = {
'data_delete': re.compile(r'\b(DELETE|DROP|TRUNCATE|REMOVE)\b', re.I),
'data_modify': re.compile(r'\b(UPDATE|ALTER|MODIFY|REPLACE)\b', re.I),
'permission': re.compile(r'\b(GRANT|REVOKE|chmod|chown|赋权|权限)\b', re.I),
'auth': re.compile(r'\b(LOGIN|LOGOUT|AUTH|认证|登录|登出)\b', re.I),
'config_change': re.compile(r'\b(SET|CONFIG|配置变更)\b', re.I),
}
def __init__(self, input_path: str, output_dir: str):
self.input_path = Path(input_path)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
# 分析结果
self.log_type: LogType = LogType.GENERAL
self.total_lines = 0
self.file_size_mb = 0
self.time_range = {'start': '', 'end': ''}
# 提取的数据
self.entities: dict[str, list[Entity]] = defaultdict(list)
self.operations: list[Operation] = []
self.alerts: list[Alert] = []
self.traces: list[Trace] = []
self.insights: list[Insight] = []
# 统计数据
self.stats = defaultdict(Counter)
# Binlog 特有
self.table_map: dict[str, tuple[str, str]] = {} # table_id -> (db, table)
self.current_thread_id = ""
self.current_server_id = ""
self.current_time = ""
def run(self) -> dict:
print(f"\n{'='*60}")
print(f"RAPHL 智能日志分析器")
print(f"{'='*60}")
self.file_size_mb = self.input_path.stat().st_size / (1024 * 1024)
print(f"文件: {self.input_path.name}")
print(f"大小: {self.file_size_mb:.2f} MB")
# Phase 1: 识别日志类型
print(f"\n{''*40}")
print("Phase 1: 日志类型识别")
print(f"{''*40}")
self._detect_log_type()
print(f" ✓ 类型: {self.log_type.value}")
# Phase 2: 全量扫描提取
print(f"\n{''*40}")
print("Phase 2: 全量扫描提取")
print(f"{''*40}")
self._full_scan()
# Phase 3: 关联分析
print(f"\n{''*40}")
print("Phase 3: 关联分析")
print(f"{''*40}")
self._correlate()
# Phase 4: 生成洞察
print(f"\n{''*40}")
print("Phase 4: 智能洞察")
print(f"{''*40}")
self._generate_insights()
# Phase 5: 生成报告
print(f"\n{''*40}")
print("Phase 5: 生成报告")
print(f"{''*40}")
self._generate_reports()
print(f"\n{'='*60}")
print("分析完成")
print(f"{'='*60}")
return self._get_summary()
def _detect_log_type(self):
"""识别日志类型"""
sample_lines = []
with open(self.input_path, 'r', encoding='utf-8', errors='ignore') as f:
for i, line in enumerate(f):
sample_lines.append(line)
if i >= 100:
break
sample_text = '\n'.join(sample_lines)
scores = {t: 0 for t in LogType}
for log_type, patterns in self.LOG_TYPE_SIGNATURES.items():
for pattern in patterns:
matches = pattern.findall(sample_text)
scores[log_type] += len(matches)
best_type = max(scores.keys(), key=lambda x: scores[x])
if scores[best_type] > 0:
self.log_type = best_type
else:
self.log_type = LogType.GENERAL
def _full_scan(self):
"""全量扫描提取"""
if self.log_type == LogType.MYSQL_BINLOG:
self._scan_binlog()
elif self.log_type == LogType.JAVA_APP:
self._scan_java_app()
else:
self._scan_general()
print(f" ✓ 总行数: {self.total_lines:,}")
print(f" ✓ 时间范围: {self.time_range['start']} ~ {self.time_range['end']}")
# 实体统计
for entity_type, entities in self.entities.items():
unique = len(set(e.value for e in entities))
print(f"{entity_type}: {unique} 个唯一值, {len(entities)} 次出现")
if self.operations:
print(f" ✓ 操作记录: {len(self.operations)}")
if self.alerts:
print(f" ✓ 告警记录: {len(self.alerts)}")
def _scan_binlog(self):
"""扫描 MySQL Binlog"""
current_op = None
with open(self.input_path, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
self.total_lines += 1
# 提取时间
time_match = self.BINLOG_PATTERNS['time'].search(line)
if time_match:
self.current_time = time_match.group(1)
self._update_time_range(self.current_time)
# 提取 server_id
server_match = self.BINLOG_PATTERNS['server_id'].search(line)
if server_match:
self.current_server_id = server_match.group(1)
self._add_entity('server_id', self.current_server_id, line_num, line)
# 提取 thread_id
thread_match = self.BINLOG_PATTERNS['thread_id'].search(line)
if thread_match:
self.current_thread_id = thread_match.group(1)
self._add_entity('thread_id', self.current_thread_id, line_num, line)
# 提取 table_map
table_match = self.BINLOG_PATTERNS['table_map'].search(line)
if table_match:
db, table, table_id = table_match.groups()
self.table_map[table_id] = (db, table)
self._add_entity('database', f"{db}.{table}", line_num, line)
# 识别操作类型
for op_name, pattern in [
('DELETE', self.BINLOG_PATTERNS['delete_from']),
('UPDATE', self.BINLOG_PATTERNS['update']),
('INSERT', self.BINLOG_PATTERNS['insert']),
]:
match = pattern.search(line)
if match:
db, table = match.groups()
self.stats['operations'][op_name] += 1
self.stats['tables'][f"{db}.{table}"] += 1
if current_op is None or current_op.target != f"{db}.{table}":
if current_op:
self.operations.append(current_op)
current_op = Operation(
line_num=line_num,
time=self.current_time,
op_type=op_name,
target=f"{db}.{table}",
detail="",
entities=[
Entity('thread_id', self.current_thread_id, line_num),
Entity('server_id', self.current_server_id, line_num),
],
raw_content=line
)
# 提取行内实体IP、用户等
self._extract_entities(line, line_num)
if line_num % 50000 == 0:
print(f" 已处理 {line_num:,} 行...")
if current_op:
self.operations.append(current_op)
def _scan_java_app(self):
"""扫描 Java 应用日志"""
current_exception = None
context_buffer = []
error_pattern = re.compile(
r'^(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d{3})?)\s+'
r'(FATAL|ERROR|WARN|WARNING|INFO|DEBUG)\s+'
r'([\w.]+)\s+-\s+(.+)$'
)
stack_pattern = re.compile(r'^\s+at\s+')
exception_pattern = re.compile(r'^([a-zA-Z_$][\w.$]*(?:Exception|Error|Throwable)):\s*(.*)$')
with open(self.input_path, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
self.total_lines += 1
line = line.rstrip()
# 提取实体
self._extract_entities(line, line_num)
error_match = error_pattern.match(line)
if error_match:
time_str, level, logger, message = error_match.groups()
self._update_time_range(time_str)
if level in ('ERROR', 'FATAL', 'WARN', 'WARNING'):
if current_exception:
self._finalize_exception(current_exception)
current_exception = {
'line_num': line_num,
'time': time_str,
'level': level,
'logger': logger,
'message': message,
'stack': [],
'context': list(context_buffer),
'entities': [],
}
context_buffer.clear()
elif current_exception:
if stack_pattern.match(line) or exception_pattern.match(line):
current_exception['stack'].append(line)
elif line.startswith('Caused by:'):
current_exception['stack'].append(line)
else:
self._finalize_exception(current_exception)
current_exception = None
context_buffer.append(line)
else:
context_buffer.append(line)
if len(context_buffer) > 5:
context_buffer.pop(0)
if line_num % 50000 == 0:
print(f" 已处理 {line_num:,} 行...")
if current_exception:
self._finalize_exception(current_exception)
def _finalize_exception(self, exc: dict):
"""完成异常记录"""
level_map = {'FATAL': 'CRITICAL', 'ERROR': 'HIGH', 'WARN': 'MEDIUM', 'WARNING': 'MEDIUM'}
self.alerts.append(Alert(
line_num=exc['line_num'],
time=exc['time'],
level=level_map.get(exc['level'], 'LOW'),
source=exc['logger'],
message=exc['message'],
entities=exc.get('entities', [])
))
if exc['stack']:
self.stats['exceptions'][exc['stack'][0].split(':')[0] if ':' in exc['stack'][0] else exc['level']] += 1
def _scan_general(self):
"""通用日志扫描"""
with open(self.input_path, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
self.total_lines += 1
# 提取时间
for pattern, fmt in self.TIME_PATTERNS:
match = pattern.search(line)
if match:
self._update_time_range(match.group(1))
break
# 提取实体
self._extract_entities(line, line_num)
# 识别告警
for level, pattern in self.ALERT_PATTERNS.items():
if pattern.search(line):
self.stats['alert_levels'][level] += 1
break
# 识别敏感操作
for op_type, pattern in self.SENSITIVE_OPS.items():
if pattern.search(line):
self.stats['sensitive_ops'][op_type] += 1
if line_num % 50000 == 0:
print(f" 已处理 {line_num:,} 行...")
def _extract_entities(self, line: str, line_num: int):
"""提取行内实体"""
for entity_type, pattern in self.ENTITY_PATTERNS.items():
for match in pattern.finditer(line):
value = match.group(1) if match.lastindex else match.group(0)
self._add_entity(entity_type, value, line_num, line[:200])
def _add_entity(self, entity_type: str, value: str, line_num: int, context: str = ""):
"""添加实体"""
# 过滤无效值
if entity_type == 'ip' and value in ('0.0.0.0', '127.0.0.1', '255.255.255.255'):
return
if entity_type == 'duration_ms' and float(value) == 0:
return
self.entities[entity_type].append(Entity(
type=entity_type,
value=value,
line_num=line_num,
context=context
))
self.stats[f'{entity_type}_count'][value] += 1
def _update_time_range(self, time_str: str):
"""更新时间范围"""
if not self.time_range['start'] or time_str < self.time_range['start']:
self.time_range['start'] = time_str
if not self.time_range['end'] or time_str > self.time_range['end']:
self.time_range['end'] = time_str
def _correlate(self):
"""关联分析"""
# 操作按时间排序
if self.operations:
self.operations.sort(key=lambda x: x.time)
print(f" ✓ 操作时间线: {len(self.operations)}")
# 聚合相同操作
if self.log_type == LogType.MYSQL_BINLOG:
op_summary: dict[str, dict] = {}
for op in self.operations:
key = op.op_type
if key not in op_summary:
op_summary[key] = {'count': 0, 'tables': Counter(), 'thread_ids': set()}
op_summary[key]['count'] += 1
op_summary[key]['tables'][op.target] += 1
for e in op.entities:
if e.type == 'thread_id':
op_summary[key]['thread_ids'].add(e.value)
for op_type, data in op_summary.items():
tables_count = len(data['tables'])
thread_count = len(data['thread_ids'])
print(f"{op_type}: {data['count']} 次, 涉及 {tables_count} 个表, {thread_count} 个 thread_id")
# IP 活动分析
if 'ip' in self.entities:
ip_activity = Counter(e.value for e in self.entities['ip'])
top_ips = ip_activity.most_common(5)
if top_ips:
print(f" ✓ Top IP:")
for ip, count in top_ips:
print(f" {ip}: {count}")
def _generate_insights(self):
"""生成智能洞察"""
# Binlog 洞察
if self.log_type == LogType.MYSQL_BINLOG:
# 大批量删除检测
delete_count = self.stats['operations'].get('DELETE', 0)
if delete_count > 100:
tables = self.stats['tables'].most_common(5)
thread_ids = list(set(e.value for e in self.entities.get('thread_id', [])))
server_ids = list(set(e.value for e in self.entities.get('server_id', [])))
self.insights.append(Insight(
category='security',
severity='high',
title=f'大批量删除操作检测',
description=f'检测到 {delete_count} 条 DELETE 操作',
evidence=[
f"时间范围: {self.time_range['start']} ~ {self.time_range['end']}",
f"涉及表: {', '.join(f'{t[0]}({t[1]}次)' for t in tables)}",
f"Server ID: {', '.join(server_ids)}",
f"Thread ID: {', '.join(thread_ids[:5])}{'...' if len(thread_ids) > 5 else ''}",
],
recommendation='确认操作来源1. 根据 thread_id 查询应用连接 2. 检查对应时间段的应用日志 3. 确认是否为正常业务行为'
))
# 操作来源分析
if self.entities.get('server_id'):
unique_servers = set(e.value for e in self.entities['server_id'])
if len(unique_servers) == 1:
server_id = list(unique_servers)[0]
self.insights.append(Insight(
category='audit',
severity='medium',
title='操作来源确认',
description=f'所有操作来自同一数据库实例 server_id={server_id}',
evidence=[
f"Server ID: {server_id}",
f"这是数据库主库的标识,不是客户端 IP",
f"Binlog 不记录客户端 IP需查 general_log 或审计日志",
],
recommendation='如需确认操作者 IP请检查1. MySQL general_log 2. 审计插件日志 3. 应用服务连接日志'
))
# 异常洞察
if self.alerts:
critical_count = sum(1 for a in self.alerts if a.level == 'CRITICAL')
if critical_count > 0:
self.insights.append(Insight(
category='error',
severity='critical',
title=f'严重异常检测',
description=f'检测到 {critical_count} 个严重级别异常',
evidence=[f"L{a.line_num}: {a.message[:100]}" for a in self.alerts if a.level == 'CRITICAL'][:5],
recommendation='立即检查相关服务状态'
))
# IP 异常检测
if 'ip' in self.entities:
ip_counter = Counter(e.value for e in self.entities['ip'])
for ip, count in ip_counter.most_common(3):
if count > 100:
self.insights.append(Insight(
category='anomaly',
severity='medium',
title=f'高频 IP 活动',
description=f'IP {ip} 出现 {count}',
evidence=[e.context[:100] for e in self.entities['ip'] if e.value == ip][:3],
recommendation='确认该 IP 的活动是否正常'
))
print(f" ✓ 生成 {len(self.insights)} 条洞察")
for insight in self.insights:
print(f" [{insight.severity.upper()}] {insight.title}")
def _generate_reports(self):
"""生成报告"""
self._write_summary()
self._write_entities()
self._write_operations()
self._write_insights()
self._write_json()
print(f"\n输出文件:")
for f in sorted(self.output_dir.iterdir()):
size = f.stat().st_size
print(f" - {f.name} ({size/1024:.1f} KB)")
def _write_summary(self):
"""写入摘要报告"""
path = self.output_dir / "summary.md"
with open(path, 'w', encoding='utf-8') as f:
f.write(f"# 日志分析报告\n\n")
f.write(f"## 概览\n\n")
f.write(f"| 项目 | 内容 |\n|------|------|\n")
f.write(f"| 文件 | {self.input_path.name} |\n")
f.write(f"| 大小 | {self.file_size_mb:.2f} MB |\n")
f.write(f"| 类型 | {self.log_type.value} |\n")
f.write(f"| 总行数 | {self.total_lines:,} |\n")
f.write(f"| 时间范围 | {self.time_range['start']} ~ {self.time_range['end']} |\n\n")
# 实体统计
if self.entities:
f.write(f"## 实体统计\n\n")
f.write(f"| 类型 | 唯一值 | 出现次数 | Top 值 |\n|------|--------|----------|--------|\n")
for entity_type, entities in sorted(self.entities.items()):
counter = Counter(e.value for e in entities)
unique = len(counter)
total = len(entities)
top = counter.most_common(1)[0] if counter else ('', 0)
f.write(f"| {entity_type} | {unique} | {total} | {top[0][:30]}({top[1]}) |\n")
f.write(f"\n")
# 操作统计
if self.stats['operations']:
f.write(f"## 操作统计\n\n")
f.write(f"| 操作类型 | 次数 |\n|----------|------|\n")
for op, count in self.stats['operations'].most_common():
f.write(f"| {op} | {count:,} |\n")
f.write(f"\n")
if self.stats['tables']:
f.write(f"## 表操作统计\n\n")
f.write(f"| 表名 | 操作次数 |\n|------|----------|\n")
for table, count in self.stats['tables'].most_common(10):
f.write(f"| {table} | {count:,} |\n")
f.write(f"\n")
# 洞察
if self.insights:
f.write(f"## 分析洞察\n\n")
for i, insight in enumerate(self.insights, 1):
f.write(f"### {i}. [{insight.severity.upper()}] {insight.title}\n\n")
f.write(f"{insight.description}\n\n")
if insight.evidence:
f.write(f"**证据:**\n")
for e in insight.evidence:
f.write(f"- {e}\n")
f.write(f"\n")
if insight.recommendation:
f.write(f"**建议:** {insight.recommendation}\n\n")
f.write(f"---\n\n")
def _write_entities(self):
"""写入实体详情"""
path = self.output_dir / "entities.md"
with open(path, 'w', encoding='utf-8') as f:
f.write(f"# 实体详情\n\n")
for entity_type, entities in sorted(self.entities.items()):
counter = Counter(e.value for e in entities)
f.write(f"## {entity_type} ({len(counter)} 个唯一值)\n\n")
f.write(f"| 值 | 出现次数 | 首次行号 |\n|-----|----------|----------|\n")
first_occurrence = {}
for e in entities:
if e.value not in first_occurrence:
first_occurrence[e.value] = e.line_num
for value, count in counter.most_common(50):
f.write(f"| {value[:50]} | {count} | {first_occurrence[value]} |\n")
f.write(f"\n")
def _write_operations(self):
"""写入操作详情"""
if not self.operations:
return
path = self.output_dir / "operations.md"
with open(path, 'w', encoding='utf-8') as f:
f.write(f"# 操作详情\n\n")
f.write(f"{len(self.operations)} 条操作记录\n\n")
# 按表分组
by_table = defaultdict(list)
for op in self.operations:
by_table[op.target].append(op)
for table, ops in sorted(by_table.items(), key=lambda x: len(x[1]), reverse=True):
f.write(f"## {table} ({len(ops)} 次操作)\n\n")
op_types = Counter(op.op_type for op in ops)
f.write(f"操作类型: {dict(op_types)}\n\n")
thread_ids = set()
for op in ops:
for e in op.entities:
if e.type == 'thread_id':
thread_ids.add(e.value)
if thread_ids:
f.write(f"Thread IDs: {', '.join(sorted(thread_ids))}\n\n")
f.write(f"时间范围: {ops[0].time} ~ {ops[-1].time}\n\n")
f.write(f"---\n\n")
def _write_insights(self):
"""写入洞察报告"""
if not self.insights:
return
path = self.output_dir / "insights.md"
with open(path, 'w', encoding='utf-8') as f:
f.write(f"# 分析洞察\n\n")
# 按严重程度分组
by_severity = defaultdict(list)
for insight in self.insights:
by_severity[insight.severity].append(insight)
for severity in ['critical', 'high', 'medium', 'low']:
if severity not in by_severity:
continue
f.write(f"## {severity.upper()} 级别\n\n")
for insight in by_severity[severity]:
f.write(f"### {insight.title}\n\n")
f.write(f"**类别:** {insight.category}\n\n")
f.write(f"**描述:** {insight.description}\n\n")
if insight.evidence:
f.write(f"**证据:**\n")
for e in insight.evidence:
f.write(f"- {e}\n")
f.write(f"\n")
if insight.recommendation:
f.write(f"**建议:** {insight.recommendation}\n\n")
f.write(f"---\n\n")
def _write_json(self):
"""写入 JSON 数据"""
path = self.output_dir / "analysis.json"
data = {
'file': str(self.input_path),
'size_mb': self.file_size_mb,
'log_type': self.log_type.value,
'total_lines': self.total_lines,
'time_range': self.time_range,
'entities': {
k: {
'unique': len(set(e.value for e in v)),
'total': len(v),
'top': Counter(e.value for e in v).most_common(10)
}
for k, v in self.entities.items()
},
'stats': {k: dict(v) for k, v in self.stats.items()},
'insights': [
{
'category': i.category,
'severity': i.severity,
'title': i.title,
'description': i.description,
'evidence': i.evidence,
'recommendation': i.recommendation
}
for i in self.insights
]
}
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _get_summary(self) -> dict:
return {
'log_type': self.log_type.value,
'total_lines': self.total_lines,
'entity_types': len(self.entities),
'operation_count': len(self.operations),
'insight_count': len(self.insights),
'output_dir': str(self.output_dir)
}
def main():
parser = argparse.ArgumentParser(description='RAPHL 智能日志分析器')
parser.add_argument('input', help='输入日志文件')
parser.add_argument('-o', '--output', default='./log_analysis', help='输出目录')
args = parser.parse_args()
analyzer = SmartLogAnalyzer(args.input, args.output)
result = analyzer.run()
print(f"\n请查看 {result['output_dir']}/summary.md")
if __name__ == '__main__':
main()