跳转至

🚨 智能知识系统 - 安全与资源紧急加固方案

⚠️ **归档文档 — 数据已过时** 本报告为历史快照存档。当前版本 **v1.3.0-dev**,232 测试通过。 👉 最新工程状态请参阅 **[ENGINEERING_ALIGNMENT.md](ENGINEERING_ALIGNMENT.md)**

制定日期: 2026-03-31 状态: 🔴 紧急执行 关联事件: 近期多次P0资源危机 + 7个高危安全漏洞


📊 危机回顾

近期发生的P0资源事件

事件1:内存危机(2026-03-30)

  • 严重程度: 🔴 P0 - 紧急
  • 触发条件: 内存使用率 96% (30GB/31GB)
  • 影响: 系统几乎无响应,服务崩溃风险
  • 根因:
  • ✅ 容器无资源限制(openlist占用11.2GB)
  • ✅ 监控频率过低(每周一次)
  • ✅ 多项目混跑无隔离(21个容器)

事件2:磁盘危机(2026-03-30)

  • 严重程度: 🔴 P0 - 紧急
  • 触发条件: 根分区使用率 86% (160GB/197GB)
  • 影响: 无法写入日志,数据库操作失败
  • 根因:
  • ✅ 大文件占用(openlist 89GB在根分区)
  • ✅ 日志文件未轮转
  • ✅ VACUUM操作因空间不足失败

事件3:僵尸进程累积

  • 严重程度: 🟠 P1 - 严重
  • 触发条件: 19个僵尸进程累积
  • 影响: 进程表资源占用,系统健康度下降
  • 根因:
  • ✅ 应用代码缺陷(未正确处理子进程)
  • ✅ 缺少自动清理机制

🔗 安全漏洞与资源危机的关联

关键发现:安全漏洞可能被利用来放大资源问题

攻击场景1:无认证API + DoS攻击

当前状态:

# ❌ 所有API端点都没有认证
@router.post("/gateway/query")
async def gateway_query(request: GatewayQueryRequest):
    # 任何人都可以调用
    result = await process_expensive_query(request.question)
    return result

攻击场景: 1. 攻击者发现API无需认证 2. 编写脚本每秒发送1000个请求 3. 每个请求都触发昂贵的数据库查询 4. 内存占用迅速上升 → 触发P0内存危机 5. 系统崩溃 → 服务不可用

影响: - ✅ 资源耗尽(DoS攻击) - ✅ 合法用户无法访问 - ✅ 数据库连接池耗尽 - ✅ 系统OOM崩溃


攻击场景2:CORS配置错误 + CSRF攻击

当前状态:

# ❌ CORS配置:允许所有来源 + 携带凭证
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
)

攻击场景: 1. 攻击者创建恶意网站 evil.com 2. 诱导已登录用户访问 evil.com 3. evil.com 通过浏览器发起大量跨域请求 4. 浏览器自动携带用户Cookie 5. API执行大量操作(用户不知情) 6. 资源被滥用 → 触发P0资源危机

影响: - ✅ CSRF攻击成功 - ✅ 用户资源被滥用 - ✅ 系统负载激增 - ✅ 可能的数据泄露


攻击场景3:裸异常处理 + 错误掩盖

当前状态:

# ❌ 裸异常处理
try:
    process_data()
except:
    pass  # 隐藏所有错误

攻击场景: 1. 攻击者发送构造的恶意输入 2. 系统抛出异常(如内存不足) 3. 异常被静默吞掉 4. 系统继续处理请求 → 内存继续增长 5. 最终OOM崩溃,但没有任何错误日志

影响: - ✅ 攻击无法被追踪 - ✅ 问题无法被及时发现 - ✅ 监控系统失效 - ✅ 调试困难


攻击场景4:硬编码密码 + 数据库被入侵

当前状态:

# ❌ 硬编码数据库密码
DATABASE_URL = "postgresql://zhineng:zhineng123@..."

攻击场景: 1. 攻击者通过代码泄露获得密码 2. 直接连接数据库 3. 执行大量资源密集型查询 4. 创建大量临时表(占用磁盘空间) 5. 磁盘空间耗尽 → 触发P0磁盘危机

影响: - ✅ 数据库完全暴露 - ✅ 数据可被窃取/篡改 - ✅ 资源被恶意占用 - ✅ 系统崩溃


🎯 紧急加固方案(72小时计划)

第1阶段:立即修复(0-24小时)🔥

优先级P0-A:修复认证授权(4小时)

目标: 防止未授权访问导致的资源滥用

# 1. 启用JWT认证(2小时)
cd /home/ai/zhineng-knowledge-system

# 生成RSA密钥对
python -c "
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
import os

private_key = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048,
)

# 保存私钥
with open('jwt_private.pem', 'wb') as f:
    f.write(private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption()
    ))

# 保存公钥
public_key = private_key.public_key()
with open('jwt_public.pem', 'wb') as f:
    f.write(public_key.public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    ))

print('JWT keys generated')
"

# 2. 配置环境变量
cat >> .env << EOF
JWT_PRIVATE_KEY_PATH=/home/ai/zhineng-knowledge-system/jwt_private.pem
JWT_PUBLIC_KEY_PATH=/home/ai/zhineng-knowledge-system/jwt_public.pem
JWT_ALGORITHM=RS256
JWT_EXPIRATION=3600
EOF

# 3. 添加认证装饰器到所有API端点
# (需要修改 backend/api/v1/*.py)

验证:

# 测试未认证请求被拒绝
curl -X POST http://localhost:8000/api/v1/gateway/query \
  -H "Content-Type: application/json" \
  -d '{"question":"test"}'

# 预期结果: 401 Unauthorized


优先级P0-B:删除硬编码密码(1小时)

目标: 防止凭证泄露

# 1. 检查所有硬编码密码
grep -r "zhineng123" /home/ai/zhineng-knowledge-system/backend

# 2. 修改 main_optimized.py
cat > /tmp/fix_database_url.py << 'EOF'
import sys

file_path = "/home/ai/zhineng-knowledge-system/backend/main_optimized.py"

with open(file_path, 'r') as f:
    content = f.read()

# 替换硬编码密码
old_line = 'DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://zhineng:zhineng123@localhost:5432/zhineng_kb")'
new_line = '''DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
    raise ValueError("DATABASE_URL environment variable is required")'''

content = content.replace(old_line, new_line)

with open(file_path, 'w') as f:
    f.write(content)

print("✅ Fixed hardcoded password in main_optimized.py")
EOF

python3 /tmp/fix_database_url.py

# 3. 设置环境变量
export DATABASE_URL="postgresql://zhineng:YOUR_SECURE_PASSWORD@localhost:5432/zhineng_kb"

# 4. 添加到 .env
echo "DATABASE_URL=postgresql://zhineng:YOUR_SECURE_PASSWORD@localhost:5432/zhineng_kb" >> .env

验证:

# 确认硬编码密码已删除
grep -c "zhineng123" /home/ai/zhineng-knowledge-system/backend/main_optimized.py
# 预期结果: 0


优先级P0-C:修复CORS配置(1小时)

目标: 防止CSRF攻击

# 修改 main_optimized.py
cat > /tmp/fix_cors.py << 'EOF'
file_path = "/home/ai/zhineng-knowledge-system/backend/main_optimized.py"

with open(file_path, 'r') as f:
    lines = f.readlines()

# 找到并替换CORS配置
new_lines = []
skip_until = -1

for i, line in enumerate(lines):
    if i < skip_until:
        continue

    if 'allow_origins=["*"]' in line:
        # 替换为白名单
        new_lines.append('    allow_origins=os.getenv("ALLOWED_ORIGINS", "http://localhost:3000").split(","),\n')
    elif 'allow_credentials=True' in line:
        new_lines.append(line)  # 保持不变
    else:
        new_lines.append(line)

with open(file_path, 'w') as f:
    f.writelines(new_lines)

print("✅ Fixed CORS configuration")
EOF

python3 /tmp/fix_cors.py

# 设置允许的来源
echo "ALLOWED_ORIGINS=http://localhost:3000,http://localhost:8000" >> .env

验证:

# 从不允许的来源测试
curl -X OPTIONS http://localhost:8000/api/v1/gateway/query \
  -H "Origin: http://evil.com" \
  -H "Access-Control-Request-Method: POST"

# 预期结果: CORS错误


优先级P0-D:修复裸异常处理(2小时)

目标: 确保错误能被追踪

# 查找所有裸异常
grep -rn "except:" /home/ai/zhineng-knowledge-system/backend --include="*.py"

# 修复 main_optimized.py 中的裸异常
cat > /tmp/fix_exceptions.py << 'EOF'
file_path = "/home/ai/zhineng-knowledge-system/backend/main_optimized.py"

with open(file_path, 'r') as f:
    content = f.read()

# 替换裸异常
content = content.replace(
    'except:\n    doc[\'tags\'] = []',
    '''except json.JSONDecodeError as e:
        logger.warning(f"Failed to parse tags: {e}")
        doc['tags'] = []
    except Exception as e:
        logger.error(f"Unexpected error parsing tags: {e}", exc_info=True)
        doc['tags'] = []'''
)

with open(file_path, 'w') as f:
    f.write(content)

print("✅ Fixed bare exception handling")
EOF

python3 /tmp/fix_exceptions.py

优先级P0-E:添加全局速率限制(2小时)

目标: 防止DoS攻击

# 确认速率限制中间件已启用
cat > /tmp/verify_rate_limit.py << 'EOF'
import re

file_path = "/home/ai/zhineng-knowledge-system/backend/main.py"

with open(file_path, 'r') as f:
    content = f.read()

if 'RateLimitMiddleware' in content:
    print("✅ RateLimitMiddleware is already configured")
else:
    print("❌ RateLimitMiddleware NOT found in main.py")
    print("Adding RateLimitMiddleware...")

    # 在 middleware 导入后添加
    middleware_import = "from middleware import RateLimitMiddleware\n"
    if middleware_import not in content:
        content = content.replace(
            "from middleware import RateLimitMiddleware",
            middleware_import
        )

    # 添加中间件到应用
    app_middleware = "app.add_middleware(RateLimitMiddleware)\n"
    if app_middleware not in content:
        content = content.replace(
            "app.add_middleware(RateLimitMiddleware)",
            app_middleware + "app.add_middleware(RateLimitMiddleware)"
        )

    with open(file_path, 'w') as f:
        f.write(content)

    print("✅ RateLimitMiddleware added")
EOF

python3 /tmp/verify_rate_limit.py

# 配置速率限制
cat >> .env << EOF
RATE_LIMIT_ENABLED=true
RATE_LIMIT_REQUESTS_PER_MINUTE=60
RATE_LIMIT_REQUESTS_PER_HOUR=1000
EOF

优先级P0-F:容器资源限制(3小时)

目标: 防止单个容器耗尽系统资源

# 修改 docker-compose.yml
cat > /tmp/add_resource_limits.py << 'EOF'
import yaml

file_path = "/home/ai/zhineng-knowledge-system/docker-compose.yml"

with open(file_path, 'r') as f:
    compose = yaml.safe_load(f)

# 为每个服务添加资源限制
for service_name, service_config in compose.get('services', {}).items():
    if 'deploy' not in service_config:
        service_config['deploy'] = {}

    if 'resources' not in service_config['deploy']:
        service_config['deploy']['resources'] = {}

    # 设置限制
    if 'limits' not in service_config['deploy']['resources']:
        # 根据服务类型设置不同的限制
        if 'api' in service_name or 'backend' in service_name:
            service_config['deploy']['resources']['limits'] = {
                'cpus': '1.0',
                'memory': '1G'
            }
        elif 'postgres' in service_name or 'redis' in service_name:
            service_config['deploy']['resources']['limits'] = {
                'cpus': '0.5',
                'memory': '512M'
            }
        elif 'nginx' in service_name:
            service_config['deploy']['resources']['limits'] = {
                'cpus': '0.3',
                'memory': '128M'
            }
        else:
            service_config['deploy']['resources']['limits'] = {
                'cpus': '0.2',
                'memory': '256M'
            }

# 保存修改后的配置
with open(file_path, 'w') as f:
    yaml.dump(compose, f, default_flow_style=False)

print(f"✅ Added resource limits to {len(compose['services'])} services")
EOF

python3 /tmp/add_resource_limits.py

# 重启容器应用限制
docker-compose down
docker-compose up -d

优先级P0-G:加强监控频率(1小时)

目标: 及时发现资源问题

# 修改crontab,增加监控频率
(crontab -l 2>/dev/null; echo "*/10 * * * * /home/ai/zhineng-knowledge-system/scripts/emergency_memory_recovery.sh") | crontab -
(crontab -l 2>/dev/null; echo "0 * * * * /home/ai/zhineng-knowledge-system/scripts/monitor_disk.sh") | crontab -
(crontab -l 2>/dev/null; echo "*/30 * * * * /home/ai/zhineng-knowledge-system/scripts/monitor_docker.sh") | crontab -

# 验证crontab
crontab -l

第2阶段:深入加固(24-48小时)⚡

优先级P1-A:实施输入验证(4小时)

# 创建输入验证模块
cat > backend/common/input_validation.py << 'EOF'
"""输入验证模块"""

import re
import logging
from typing import Optional, List
from fastapi import HTTPException

logger = logging.getLogger(__name__)

# 可疑模式列表
SUSPICIOUS_PATTERNS = [
    r'<script[^>]*>',  # XSS 尝试
    r'DROP\s+TABLE',  # SQL 注入尝试
    r';\s*DROP',  # SQL 注入尝试
    r'\${',  # 模板注入
    r'\.\./',  # 路径遍历
    r'eval\s*\(',  # 代码注入
    r'exec\s*\(',  # 代码注入
]

def validate_user_input(
    input_str: str,
    max_length: int = 1000,
    field_name: str = "input"
) -> str:
    """验证用户输入

    Args:
        input_str: 用户输入字符串
        max_length: 最大长度
        field_name: 字段名称(用于错误消息)

    Returns:
        清理后的输入

    Raises:
        HTTPException: 如果输入包含可疑内容
    """
    if not input_str:
        raise HTTPException(
            status_code=400,
            detail=f"{field_name} cannot be empty"
        )

    if len(input_str) > max_length:
        raise HTTPException(
            status_code=400,
            detail=f"{field_name} exceeds maximum length of {max_length}"
        )

    # 检查可疑模式
    for pattern in SUSPICIOUS_PATTERNS:
        if re.search(pattern, input_str, re.IGNORECASE):
            logger.warning(f"Suspicious input detected in {field_name}: {input_str[:100]}")
            raise HTTPException(
                status_code=400,
                detail=f"{field_name} contains suspicious content"
            )

    return input_str.strip()

def sanitize_query(query: str) -> str:
    """清理搜索查询"""
    # 限制特殊字符
    query = re.sub(r'[^\w\s\u4e00-\u9fff.,?!:;()\-"\']', '', query)
    # 限制长度
    return query[:500]
EOF

# 在API端点中使用
# backend/api/v1/gateway.py
from common.input_validation import validate_user_input, sanitize_query

@router.post("/gateway/query")
async def gateway_query(request: GatewayQueryRequest):
    # 验证输入
    validated_question = validate_user_input(
        request.question,
        max_length=500,
        field_name="question"
    )

    # 清理查询
    clean_question = sanitize_query(validated_question)

    # 处理请求...

优先级P1-B:添加安全响应头(2小时)

# 创建安全头部中间件
cat > backend/middleware/security_headers.py << 'EOF'
"""安全响应头中间件"""

from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    """添加安全响应头"""

    async def dispatch(self, request: Request, call_next) -> Response:
        response = await call_next(request)

        # 添加安全头部
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-XSS-Protection"] = "1; mode=block"
        response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
        response.headers["Content-Security-Policy"] = "default-src 'self'"
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
        response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"

        return response
EOF

# 在 main.py 中启用
# from middleware.security_headers import SecurityHeadersMiddleware
# app.add_middleware(SecurityHeadersMiddleware)

优先级P1-C:实施日志脱敏(3小时)

# 创建敏感数据过滤器
cat > backend/common/sensitive_data_filter.py << 'EOF'
"""敏感数据过滤器"""

import re
from typing import Any, Dict

# 敏感字段列表
SENSITIVE_FIELDS = [
    'password', 'passwd', 'pwd',
    'api_key', 'apikey', 'api-key',
    'secret', 'token', 'authorization',
    'credit_card', 'ssn', 'social_security',
]

# 敏感数据模式
SENSITIVE_PATTERNS = [
    (r'Bearer\s+[A-Za-z0-9\-._~+/]+=*', 'Bearer *****'),
    (r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '***@***.***'),
    (r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '****-****-****-****'),
]

def filter_sensitive_data(data: Any) -> Any:
    """过滤敏感数据

    Args:
        data: 要过滤的数据

    Returns:
        过滤后的数据
    """
    if isinstance(data, str):
        return _filter_string(data)
    elif isinstance(data, dict):
        return _filter_dict(data)
    elif isinstance(data, list):
        return [_filter_sensitive_data(item) for item in data]
    else:
        return data

def _filter_string(text: str) -> str:
    """过滤字符串中的敏感信息"""
    for pattern, replacement in SENSITIVE_PATTERNS:
        text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
    return text

def _filter_dict(data: Dict) -> Dict:
    """过滤字典中的敏感字段"""
    filtered = {}
    for key, value in data.items():
        # 检查是否是敏感字段
        if any(sensitive in key.lower() for sensitive in SENSITIVE_FIELDS):
            filtered[key] = '*****'
        else:
            filtered[key] = filter_sensitive_data(value)
    return filtered
EOF

优先级P1-D:完善错误处理(3小时)

# 创建统一的错误处理模块
cat > backend/common/error_handler.py << 'EOF'
"""统一错误处理"""

import logging
from typing import Union
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse

logger = logging.getLogger(__name__)

class ApplicationError(Exception):
    """应用基础异常"""

    def __init__(
        self,
        message: str,
        status_code: int = 500,
        error_code: str = "APPLICATION_ERROR"
    ):
        self.message = message
        self.status_code = status_code
        self.error_code = error_code
        super().__init__(self.message)

async def application_error_handler(request: Request, exc: ApplicationError) -> JSONResponse:
    """处理应用错误"""
    logger.error(
        f"Application error: {exc.error_code} - {exc.message}",
        exc_info=True
    )

    return JSONResponse(
        status_code=exc.status_code,
        content={
            "status": "error",
            "error": {
                "code": exc.error_code,
                "message": exc.message
            }
        }
    )

async def generic_error_handler(request: Request, exc: Exception) -> JSONResponse:
    """处理未捕获的异常"""
    logger.error(
        f"Unhandled exception: {type(exc).__name__} - {str(exc)}",
        exc_info=True
    )

    return JSONResponse(
        status_code=500,
        content={
            "status": "error",
            "error": {
                "code": "INTERNAL_SERVER_ERROR",
                "message": "An unexpected error occurred"
            }
        }
    )
EOF

第3阶段:长期加固(48-72小时)🛡️

优先级P2-A:实施CSRF保护

# 安装依赖
pip install fastapi-csrf-protect

# 配置CSRF保护
cat > backend/middleware/csrf.py << 'EOF'
"""CSRF保护中间件"""

from fastapi_csrf_protect import CsrfProtect
from fastapi import Request

@CsrfProtect.load_config
def get_csrf_config():
    return CsrfProtectSecretConfig(secret="your-secret-key")

# 在需要保护的端点上使用
@router.post("/gateway/query")
@CsrfProtect.protect
async def gateway_query(request: Request, csrf_protect: CsrfProtect = Depends()):
    # 处理请求...
    pass
EOF

优先级P2-B:实施RBAC权限控制

# 创建权限检查装饰器
cat > backend/auth/permission_decorators.py << 'EOF'
"""权限检查装饰器"""

from functools import wraps
from typing import Callable
from fastapi import HTTPException, Depends
from auth.jwt import get_current_user
from auth.rbac import Permission

def require_permission(permission: Permission):
    """要求特定权限的装饰器"""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, current_user = Depends(get_current_user), **kwargs):
            # 检查用户是否有所需权限
            if permission not in current_user.permissions:
                raise HTTPException(
                    status_code=403,
                    detail=f"Permission required: {permission.value}"
                )
            return await func(*args, current_user=current_user, **kwargs)
        return wrapper
    return decorator

# 使用示例
from auth.permission_decorators import require_permission
from auth.rbac import Permission

@router.post("/documents")
@require_permission(Permission.DOCUMENT_WRITE)
async def create_document(document: DocumentCreate):
    # 创建文档...
    pass
EOF

📋 验证清单

安全加固验证

# 1. 验证认证已启用
curl -X POST http://localhost:8000/api/v1/gateway/query \
  -H "Content-Type: application/json" \
  -d '{"question":"test"}'
# 预期: 401 Unauthorized

# 2. 验证硬编码密码已删除
grep -r "zhineng123" /home/ai/zhineng-knowledge-system/backend
# 预期: 无结果

# 3. 验证CORS配置
curl -X OPTIONS http://localhost:8000/api/v1/gateway/query \
  -H "Origin: http://evil.com"
# 预期: CORS错误

# 4. 验证速率限制
for i in {1..100}; do
  curl -X POST http://localhost:8000/api/v1/gateway/query \
    -H "Content-Type: application/json" \
    -d '{"question":"test"}' &
done
wait
# 预期: 429 Too Many Requests

# 5. 验证容器资源限制
docker inspect zhineng-api | grep -A 10 "Memory"
# 预期: 显示内存限制

资源监控验证

# 1. 验证监控脚本运行
crontab -l | grep -E "emergency_memory|monitor_disk"
# 预期: 显示多个定时任务

# 2. 验证容器资源限制
docker stats --no-stream
# 预期: 所有容器都有资源限制

# 3. 验证日志文件
ls -lh /home/ai/zhineng-knowledge-system/logs/
# 预期: 有监控日志文件

🎯 预期成果

安全指标

指标 修复前 修复后 改善
认证覆盖率 0% 100% ✅ +100%
硬编码密码 1个 0个 ✅ -100%
CORS配置 危险 安全 ✅ 修复
裸异常处理 1处 0处 ✅ -100%
速率限制 部分 全局 ✅ 完整
输入验证 ✅ 加强

资源指标

指标 修复前 修复后 改善
容器资源限制 0% 100% ✅ +100%
监控频率 每周 每10分钟 ✅ +1008倍
僵尸进程清理 手动 自动 ✅ 自动化
告警响应时间 7天 <10分钟 ✅ -99.9%

📞 应急联系

  • 安全事件: 立即停止服务,保留日志
  • 资源危机: 运行 emergency_memory_recovery.sh
  • 服务崩溃: 检查日志 logs/emergency_recovery.log

🔚 总结

关键要点: 1. ✅ 安全漏洞可被利用来放大资源问题 2. ✅ 资源限制可减轻安全攻击的影响 3. ✅ 安全与资源管理必须同步进行

立即行动: 1. 修复7个高危安全漏洞 2. 为所有容器添加资源限制 3. 加强监控和告警

长期目标: 1. 建立安全开发流程 2. 实施DevSecOps 3. 定期安全审计


文档版本: 1.0 最后更新: 2026-03-31 下次审查: 加固完成后