🚨 智能知识系统 - 安全与资源紧急加固方案
制定日期: 2026-03-31 状态: 🔴 紧急执行 关联事件: 近期多次P0资源危机 + 7个高危安全漏洞
📊 危机回顾
近期发生的P0资源事件
事件1:内存危机(2026-03-30)
- 严重程度: 🔴 P0 - 紧急
- 触发条件: 内存使用率 96% (30GB/31GB)
- 影响: 系统几乎无响应,服务崩溃风险
- 根因:
- ✅ 容器无资源限制(openlist占用11.2GB)
- ✅ 监控频率过低(每周一次)
- ✅ 多项目混跑无隔离(21个容器)
事件2:磁盘危机(2026-03-30)
- 严重程度: 🔴 P0 - 紧急
- 触发条件: 根分区使用率 86% (160GB/197GB)
- 影响: 无法写入日志,数据库操作失败
- 根因:
- ✅ 大文件占用(openlist 89GB在根分区)
- ✅ 日志文件未轮转
- ✅ VACUUM操作因空间不足失败
事件3:僵尸进程累积
- 严重程度: 🟠 P1 - 严重
- 触发条件: 19个僵尸进程累积
- 影响: 进程表资源占用,系统健康度下降
- 根因:
- ✅ 应用代码缺陷(未正确处理子进程)
- ✅ 缺少自动清理机制
🔗 安全漏洞与资源危机的关联
关键发现:安全漏洞可能被利用来放大资源问题
攻击场景1:无认证API + DoS攻击
当前状态:
# ❌ 所有API端点都没有认证
@router.post("/gateway/query")
async def gateway_query(request: GatewayQueryRequest):
# 任何人都可以调用
result = await process_expensive_query(request.question)
return result
攻击场景: 1. 攻击者发现API无需认证 2. 编写脚本每秒发送1000个请求 3. 每个请求都触发昂贵的数据库查询 4. 内存占用迅速上升 → 触发P0内存危机 5. 系统崩溃 → 服务不可用
影响: - ✅ 资源耗尽(DoS攻击) - ✅ 合法用户无法访问 - ✅ 数据库连接池耗尽 - ✅ 系统OOM崩溃
攻击场景2:CORS配置错误 + CSRF攻击
当前状态:
# ❌ CORS配置:允许所有来源 + 携带凭证
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
)
攻击场景:
1. 攻击者创建恶意网站 evil.com
2. 诱导已登录用户访问 evil.com
3. evil.com 通过浏览器发起大量跨域请求
4. 浏览器自动携带用户Cookie
5. API执行大量操作(用户不知情)
6. 资源被滥用 → 触发P0资源危机
影响: - ✅ CSRF攻击成功 - ✅ 用户资源被滥用 - ✅ 系统负载激增 - ✅ 可能的数据泄露
攻击场景3:裸异常处理 + 错误掩盖
当前状态:
攻击场景: 1. 攻击者发送构造的恶意输入 2. 系统抛出异常(如内存不足) 3. 异常被静默吞掉 4. 系统继续处理请求 → 内存继续增长 5. 最终OOM崩溃,但没有任何错误日志
影响: - ✅ 攻击无法被追踪 - ✅ 问题无法被及时发现 - ✅ 监控系统失效 - ✅ 调试困难
攻击场景4:硬编码密码 + 数据库被入侵
当前状态:
攻击场景: 1. 攻击者通过代码泄露获得密码 2. 直接连接数据库 3. 执行大量资源密集型查询 4. 创建大量临时表(占用磁盘空间) 5. 磁盘空间耗尽 → 触发P0磁盘危机
影响: - ✅ 数据库完全暴露 - ✅ 数据可被窃取/篡改 - ✅ 资源被恶意占用 - ✅ 系统崩溃
🎯 紧急加固方案(72小时计划)
第1阶段:立即修复(0-24小时)🔥
优先级P0-A:修复认证授权(4小时)
目标: 防止未授权访问导致的资源滥用
# 1. 启用JWT认证(2小时)
cd /home/ai/zhineng-knowledge-system
# 生成RSA密钥对
python -c "
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
import os
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# 保存私钥
with open('jwt_private.pem', 'wb') as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
# 保存公钥
public_key = private_key.public_key()
with open('jwt_public.pem', 'wb') as f:
f.write(public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
))
print('JWT keys generated')
"
# 2. 配置环境变量
cat >> .env << EOF
JWT_PRIVATE_KEY_PATH=/home/ai/zhineng-knowledge-system/jwt_private.pem
JWT_PUBLIC_KEY_PATH=/home/ai/zhineng-knowledge-system/jwt_public.pem
JWT_ALGORITHM=RS256
JWT_EXPIRATION=3600
EOF
# 3. 添加认证装饰器到所有API端点
# (需要修改 backend/api/v1/*.py)
验证:
# 测试未认证请求被拒绝
curl -X POST http://localhost:8000/api/v1/gateway/query \
-H "Content-Type: application/json" \
-d '{"question":"test"}'
# 预期结果: 401 Unauthorized
优先级P0-B:删除硬编码密码(1小时)
目标: 防止凭证泄露
# 1. 检查所有硬编码密码
grep -r "zhineng123" /home/ai/zhineng-knowledge-system/backend
# 2. 修改 main_optimized.py
cat > /tmp/fix_database_url.py << 'EOF'
import sys
file_path = "/home/ai/zhineng-knowledge-system/backend/main_optimized.py"
with open(file_path, 'r') as f:
content = f.read()
# 替换硬编码密码
old_line = 'DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://zhineng:zhineng123@localhost:5432/zhineng_kb")'
new_line = '''DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
raise ValueError("DATABASE_URL environment variable is required")'''
content = content.replace(old_line, new_line)
with open(file_path, 'w') as f:
f.write(content)
print("✅ Fixed hardcoded password in main_optimized.py")
EOF
python3 /tmp/fix_database_url.py
# 3. 设置环境变量
export DATABASE_URL="postgresql://zhineng:YOUR_SECURE_PASSWORD@localhost:5432/zhineng_kb"
# 4. 添加到 .env
echo "DATABASE_URL=postgresql://zhineng:YOUR_SECURE_PASSWORD@localhost:5432/zhineng_kb" >> .env
验证:
# 确认硬编码密码已删除
grep -c "zhineng123" /home/ai/zhineng-knowledge-system/backend/main_optimized.py
# 预期结果: 0
优先级P0-C:修复CORS配置(1小时)
目标: 防止CSRF攻击
# 修改 main_optimized.py
cat > /tmp/fix_cors.py << 'EOF'
file_path = "/home/ai/zhineng-knowledge-system/backend/main_optimized.py"
with open(file_path, 'r') as f:
lines = f.readlines()
# 找到并替换CORS配置
new_lines = []
skip_until = -1
for i, line in enumerate(lines):
if i < skip_until:
continue
if 'allow_origins=["*"]' in line:
# 替换为白名单
new_lines.append(' allow_origins=os.getenv("ALLOWED_ORIGINS", "http://localhost:3000").split(","),\n')
elif 'allow_credentials=True' in line:
new_lines.append(line) # 保持不变
else:
new_lines.append(line)
with open(file_path, 'w') as f:
f.writelines(new_lines)
print("✅ Fixed CORS configuration")
EOF
python3 /tmp/fix_cors.py
# 设置允许的来源
echo "ALLOWED_ORIGINS=http://localhost:3000,http://localhost:8000" >> .env
验证:
# 从不允许的来源测试
curl -X OPTIONS http://localhost:8000/api/v1/gateway/query \
-H "Origin: http://evil.com" \
-H "Access-Control-Request-Method: POST"
# 预期结果: CORS错误
优先级P0-D:修复裸异常处理(2小时)
目标: 确保错误能被追踪
# 查找所有裸异常
grep -rn "except:" /home/ai/zhineng-knowledge-system/backend --include="*.py"
# 修复 main_optimized.py 中的裸异常
cat > /tmp/fix_exceptions.py << 'EOF'
file_path = "/home/ai/zhineng-knowledge-system/backend/main_optimized.py"
with open(file_path, 'r') as f:
content = f.read()
# 替换裸异常
content = content.replace(
'except:\n doc[\'tags\'] = []',
'''except json.JSONDecodeError as e:
logger.warning(f"Failed to parse tags: {e}")
doc['tags'] = []
except Exception as e:
logger.error(f"Unexpected error parsing tags: {e}", exc_info=True)
doc['tags'] = []'''
)
with open(file_path, 'w') as f:
f.write(content)
print("✅ Fixed bare exception handling")
EOF
python3 /tmp/fix_exceptions.py
优先级P0-E:添加全局速率限制(2小时)
目标: 防止DoS攻击
# 确认速率限制中间件已启用
cat > /tmp/verify_rate_limit.py << 'EOF'
import re
file_path = "/home/ai/zhineng-knowledge-system/backend/main.py"
with open(file_path, 'r') as f:
content = f.read()
if 'RateLimitMiddleware' in content:
print("✅ RateLimitMiddleware is already configured")
else:
print("❌ RateLimitMiddleware NOT found in main.py")
print("Adding RateLimitMiddleware...")
# 在 middleware 导入后添加
middleware_import = "from middleware import RateLimitMiddleware\n"
if middleware_import not in content:
content = content.replace(
"from middleware import RateLimitMiddleware",
middleware_import
)
# 添加中间件到应用
app_middleware = "app.add_middleware(RateLimitMiddleware)\n"
if app_middleware not in content:
content = content.replace(
"app.add_middleware(RateLimitMiddleware)",
app_middleware + "app.add_middleware(RateLimitMiddleware)"
)
with open(file_path, 'w') as f:
f.write(content)
print("✅ RateLimitMiddleware added")
EOF
python3 /tmp/verify_rate_limit.py
# 配置速率限制
cat >> .env << EOF
RATE_LIMIT_ENABLED=true
RATE_LIMIT_REQUESTS_PER_MINUTE=60
RATE_LIMIT_REQUESTS_PER_HOUR=1000
EOF
优先级P0-F:容器资源限制(3小时)
目标: 防止单个容器耗尽系统资源
# 修改 docker-compose.yml
cat > /tmp/add_resource_limits.py << 'EOF'
import yaml
file_path = "/home/ai/zhineng-knowledge-system/docker-compose.yml"
with open(file_path, 'r') as f:
compose = yaml.safe_load(f)
# 为每个服务添加资源限制
for service_name, service_config in compose.get('services', {}).items():
if 'deploy' not in service_config:
service_config['deploy'] = {}
if 'resources' not in service_config['deploy']:
service_config['deploy']['resources'] = {}
# 设置限制
if 'limits' not in service_config['deploy']['resources']:
# 根据服务类型设置不同的限制
if 'api' in service_name or 'backend' in service_name:
service_config['deploy']['resources']['limits'] = {
'cpus': '1.0',
'memory': '1G'
}
elif 'postgres' in service_name or 'redis' in service_name:
service_config['deploy']['resources']['limits'] = {
'cpus': '0.5',
'memory': '512M'
}
elif 'nginx' in service_name:
service_config['deploy']['resources']['limits'] = {
'cpus': '0.3',
'memory': '128M'
}
else:
service_config['deploy']['resources']['limits'] = {
'cpus': '0.2',
'memory': '256M'
}
# 保存修改后的配置
with open(file_path, 'w') as f:
yaml.dump(compose, f, default_flow_style=False)
print(f"✅ Added resource limits to {len(compose['services'])} services")
EOF
python3 /tmp/add_resource_limits.py
# 重启容器应用限制
docker-compose down
docker-compose up -d
优先级P0-G:加强监控频率(1小时)
目标: 及时发现资源问题
# 修改crontab,增加监控频率
(crontab -l 2>/dev/null; echo "*/10 * * * * /home/ai/zhineng-knowledge-system/scripts/emergency_memory_recovery.sh") | crontab -
(crontab -l 2>/dev/null; echo "0 * * * * /home/ai/zhineng-knowledge-system/scripts/monitor_disk.sh") | crontab -
(crontab -l 2>/dev/null; echo "*/30 * * * * /home/ai/zhineng-knowledge-system/scripts/monitor_docker.sh") | crontab -
# 验证crontab
crontab -l
第2阶段:深入加固(24-48小时)⚡
优先级P1-A:实施输入验证(4小时)
# 创建输入验证模块
cat > backend/common/input_validation.py << 'EOF'
"""输入验证模块"""
import re
import logging
from typing import Optional, List
from fastapi import HTTPException
logger = logging.getLogger(__name__)
# 可疑模式列表
SUSPICIOUS_PATTERNS = [
r'<script[^>]*>', # XSS 尝试
r'DROP\s+TABLE', # SQL 注入尝试
r';\s*DROP', # SQL 注入尝试
r'\${', # 模板注入
r'\.\./', # 路径遍历
r'eval\s*\(', # 代码注入
r'exec\s*\(', # 代码注入
]
def validate_user_input(
input_str: str,
max_length: int = 1000,
field_name: str = "input"
) -> str:
"""验证用户输入
Args:
input_str: 用户输入字符串
max_length: 最大长度
field_name: 字段名称(用于错误消息)
Returns:
清理后的输入
Raises:
HTTPException: 如果输入包含可疑内容
"""
if not input_str:
raise HTTPException(
status_code=400,
detail=f"{field_name} cannot be empty"
)
if len(input_str) > max_length:
raise HTTPException(
status_code=400,
detail=f"{field_name} exceeds maximum length of {max_length}"
)
# 检查可疑模式
for pattern in SUSPICIOUS_PATTERNS:
if re.search(pattern, input_str, re.IGNORECASE):
logger.warning(f"Suspicious input detected in {field_name}: {input_str[:100]}")
raise HTTPException(
status_code=400,
detail=f"{field_name} contains suspicious content"
)
return input_str.strip()
def sanitize_query(query: str) -> str:
"""清理搜索查询"""
# 限制特殊字符
query = re.sub(r'[^\w\s\u4e00-\u9fff.,?!:;()\-"\']', '', query)
# 限制长度
return query[:500]
EOF
# 在API端点中使用
# backend/api/v1/gateway.py
from common.input_validation import validate_user_input, sanitize_query
@router.post("/gateway/query")
async def gateway_query(request: GatewayQueryRequest):
# 验证输入
validated_question = validate_user_input(
request.question,
max_length=500,
field_name="question"
)
# 清理查询
clean_question = sanitize_query(validated_question)
# 处理请求...
优先级P1-B:添加安全响应头(2小时)
# 创建安全头部中间件
cat > backend/middleware/security_headers.py << 'EOF'
"""安全响应头中间件"""
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""添加安全响应头"""
async def dispatch(self, request: Request, call_next) -> Response:
response = await call_next(request)
# 添加安全头部
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = "default-src 'self'"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
return response
EOF
# 在 main.py 中启用
# from middleware.security_headers import SecurityHeadersMiddleware
# app.add_middleware(SecurityHeadersMiddleware)
优先级P1-C:实施日志脱敏(3小时)
# 创建敏感数据过滤器
cat > backend/common/sensitive_data_filter.py << 'EOF'
"""敏感数据过滤器"""
import re
from typing import Any, Dict
# 敏感字段列表
SENSITIVE_FIELDS = [
'password', 'passwd', 'pwd',
'api_key', 'apikey', 'api-key',
'secret', 'token', 'authorization',
'credit_card', 'ssn', 'social_security',
]
# 敏感数据模式
SENSITIVE_PATTERNS = [
(r'Bearer\s+[A-Za-z0-9\-._~+/]+=*', 'Bearer *****'),
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '***@***.***'),
(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '****-****-****-****'),
]
def filter_sensitive_data(data: Any) -> Any:
"""过滤敏感数据
Args:
data: 要过滤的数据
Returns:
过滤后的数据
"""
if isinstance(data, str):
return _filter_string(data)
elif isinstance(data, dict):
return _filter_dict(data)
elif isinstance(data, list):
return [_filter_sensitive_data(item) for item in data]
else:
return data
def _filter_string(text: str) -> str:
"""过滤字符串中的敏感信息"""
for pattern, replacement in SENSITIVE_PATTERNS:
text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
return text
def _filter_dict(data: Dict) -> Dict:
"""过滤字典中的敏感字段"""
filtered = {}
for key, value in data.items():
# 检查是否是敏感字段
if any(sensitive in key.lower() for sensitive in SENSITIVE_FIELDS):
filtered[key] = '*****'
else:
filtered[key] = filter_sensitive_data(value)
return filtered
EOF
优先级P1-D:完善错误处理(3小时)
# 创建统一的错误处理模块
cat > backend/common/error_handler.py << 'EOF'
"""统一错误处理"""
import logging
from typing import Union
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse
logger = logging.getLogger(__name__)
class ApplicationError(Exception):
"""应用基础异常"""
def __init__(
self,
message: str,
status_code: int = 500,
error_code: str = "APPLICATION_ERROR"
):
self.message = message
self.status_code = status_code
self.error_code = error_code
super().__init__(self.message)
async def application_error_handler(request: Request, exc: ApplicationError) -> JSONResponse:
"""处理应用错误"""
logger.error(
f"Application error: {exc.error_code} - {exc.message}",
exc_info=True
)
return JSONResponse(
status_code=exc.status_code,
content={
"status": "error",
"error": {
"code": exc.error_code,
"message": exc.message
}
}
)
async def generic_error_handler(request: Request, exc: Exception) -> JSONResponse:
"""处理未捕获的异常"""
logger.error(
f"Unhandled exception: {type(exc).__name__} - {str(exc)}",
exc_info=True
)
return JSONResponse(
status_code=500,
content={
"status": "error",
"error": {
"code": "INTERNAL_SERVER_ERROR",
"message": "An unexpected error occurred"
}
}
)
EOF
第3阶段:长期加固(48-72小时)🛡️
优先级P2-A:实施CSRF保护
# 安装依赖
pip install fastapi-csrf-protect
# 配置CSRF保护
cat > backend/middleware/csrf.py << 'EOF'
"""CSRF保护中间件"""
from fastapi_csrf_protect import CsrfProtect
from fastapi import Request
@CsrfProtect.load_config
def get_csrf_config():
return CsrfProtectSecretConfig(secret="your-secret-key")
# 在需要保护的端点上使用
@router.post("/gateway/query")
@CsrfProtect.protect
async def gateway_query(request: Request, csrf_protect: CsrfProtect = Depends()):
# 处理请求...
pass
EOF
优先级P2-B:实施RBAC权限控制
# 创建权限检查装饰器
cat > backend/auth/permission_decorators.py << 'EOF'
"""权限检查装饰器"""
from functools import wraps
from typing import Callable
from fastapi import HTTPException, Depends
from auth.jwt import get_current_user
from auth.rbac import Permission
def require_permission(permission: Permission):
"""要求特定权限的装饰器"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, current_user = Depends(get_current_user), **kwargs):
# 检查用户是否有所需权限
if permission not in current_user.permissions:
raise HTTPException(
status_code=403,
detail=f"Permission required: {permission.value}"
)
return await func(*args, current_user=current_user, **kwargs)
return wrapper
return decorator
# 使用示例
from auth.permission_decorators import require_permission
from auth.rbac import Permission
@router.post("/documents")
@require_permission(Permission.DOCUMENT_WRITE)
async def create_document(document: DocumentCreate):
# 创建文档...
pass
EOF
📋 验证清单
安全加固验证
# 1. 验证认证已启用
curl -X POST http://localhost:8000/api/v1/gateway/query \
-H "Content-Type: application/json" \
-d '{"question":"test"}'
# 预期: 401 Unauthorized
# 2. 验证硬编码密码已删除
grep -r "zhineng123" /home/ai/zhineng-knowledge-system/backend
# 预期: 无结果
# 3. 验证CORS配置
curl -X OPTIONS http://localhost:8000/api/v1/gateway/query \
-H "Origin: http://evil.com"
# 预期: CORS错误
# 4. 验证速率限制
for i in {1..100}; do
curl -X POST http://localhost:8000/api/v1/gateway/query \
-H "Content-Type: application/json" \
-d '{"question":"test"}' &
done
wait
# 预期: 429 Too Many Requests
# 5. 验证容器资源限制
docker inspect zhineng-api | grep -A 10 "Memory"
# 预期: 显示内存限制
资源监控验证
# 1. 验证监控脚本运行
crontab -l | grep -E "emergency_memory|monitor_disk"
# 预期: 显示多个定时任务
# 2. 验证容器资源限制
docker stats --no-stream
# 预期: 所有容器都有资源限制
# 3. 验证日志文件
ls -lh /home/ai/zhineng-knowledge-system/logs/
# 预期: 有监控日志文件
🎯 预期成果
安全指标
| 指标 | 修复前 | 修复后 | 改善 |
|---|---|---|---|
| 认证覆盖率 | 0% | 100% | ✅ +100% |
| 硬编码密码 | 1个 | 0个 | ✅ -100% |
| CORS配置 | 危险 | 安全 | ✅ 修复 |
| 裸异常处理 | 1处 | 0处 | ✅ -100% |
| 速率限制 | 部分 | 全局 | ✅ 完整 |
| 输入验证 | 弱 | 强 | ✅ 加强 |
资源指标
| 指标 | 修复前 | 修复后 | 改善 |
|---|---|---|---|
| 容器资源限制 | 0% | 100% | ✅ +100% |
| 监控频率 | 每周 | 每10分钟 | ✅ +1008倍 |
| 僵尸进程清理 | 手动 | 自动 | ✅ 自动化 |
| 告警响应时间 | 7天 | <10分钟 | ✅ -99.9% |
📞 应急联系
- 安全事件: 立即停止服务,保留日志
- 资源危机: 运行
emergency_memory_recovery.sh - 服务崩溃: 检查日志
logs/emergency_recovery.log
🔚 总结
关键要点: 1. ✅ 安全漏洞可被利用来放大资源问题 2. ✅ 资源限制可减轻安全攻击的影响 3. ✅ 安全与资源管理必须同步进行
立即行动: 1. 修复7个高危安全漏洞 2. 为所有容器添加资源限制 3. 加强监控和告警
长期目标: 1. 建立安全开发流程 2. 实施DevSecOps 3. 定期安全审计
文档版本: 1.0 最后更新: 2026-03-31 下次审查: 加固完成后