跳转至

智能知识系统 - 深入安全与架构审查报告

⚠️ **归档文档 — 数据已过时** 本报告为历史快照存档。当前版本 **v1.3.0-dev**,232 测试通过。 👉 最新工程状态请参阅 **[ENGINEERING_ALIGNMENT.md](ENGINEERING_ALIGNMENT.md)**

审查日期: 2026-03-31 审查范围: 系统架构、安全机制、代码质量 审查方法: 静态代码分析 + 架构审查 严重等级: 🔴 高危 | 🟠 中危 | 🟡 低危


📊 执行摘要

关键发现

等级 数量 状态
🔴 高危 7 需要立即修复
🟠 中危 12 需要尽快修复
🟡 低危 8 建议修复

风险评分

  • 整体安全评分: 42/100 (不合格)
  • 架构设计评分: 68/100 (中等)
  • 代码质量评分: 55/100 (不及格)

🔴 高危安全问题(需立即修复)

1. 硬编码敏感凭证 - CRITICAL

位置: backend/main_optimized.py:20

DATABASE_URL = os.getenv("DATABASE_URL",
    "postgresql://zhineng:zhineng123@localhost:5432/zhineng_kb")

风险: - 数据库密码 zhineng123 硬编码在源代码中 - 如果代码泄露,攻击者可直接访问数据库 - 违反安全开发最佳实践

影响: - 数据库完全暴露 - 所有数据可被窃取、篡改或删除 - 用户隐私信息泄露

修复方案:

# ❌ 错误
DATABASE_URL = os.getenv("DATABASE_URL",
    "postgresql://zhineng:zhineng123@localhost:5432/zhineng_kb")

# ✅ 正确
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
    raise ValueError("DATABASE_URL environment variable is required")

优先级: P0 - 立即修复


2. CORS 配置过于宽松 - HIGH

位置: backend/main_optimized.py:32

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # ❌ 允许所有来源
    allow_credentials=True,  # ❌ 携带凭证 + 所有来源 = 极度危险
    allow_methods=["*"],
    allow_headers=["*"],
)

风险: - allow_origins=["*"] 允许任何网站访问 API - allow_credentials=True 允许携带 Cookie/认证信息 - 组合使用会导致 CSRF 攻击风险 - 任何恶意网站都可以代表用户调用 API

攻击场景: 1. 攻击者创建恶意网站 evil.com 2. 用户访问 evil.com(已登录目标系统) 3. evil.com 通过浏览器发起跨域请求调用 API 4. 由于 allow_credentials=True,浏览器自动携带用户的 Cookie 5. API 执行操作,攻击者成功窃取数据或执行操作

修复方案:

# ✅ 正确配置
ALLOWED_ORIGINS = os.getenv(
    "ALLOWED_ORIGINS",
    "http://localhost:3000,http://localhost:8000"
).split(",")

app.add_middleware(
    CORSMiddleware,
    allow_origins=ALLOWED_ORIGINS,  # 白名单
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Content-Type", "Authorization"],
)

优先级: P0 - 立即修复


3. 裸异常处理 - HIGH

位置: backend/main_optimized.py:160

try:
    doc['tags'] = json.loads(doc['tags'])
except:
    doc['tags'] = []  # ❌ 吞掉所有异常

风险: - 隐藏真实错误,难以调试 - 可能掩盖严重安全问题(如数据库连接失败) - 资源泄漏风险(文件句柄、数据库连接未关闭)

发现数量: 1 处(main_optimized.py

修复方案:

# ✅ 正确
import json
import logging

logger = logging.getLogger(__name__)

try:
    doc['tags'] = json.loads(doc['tags'])
except json.JSONDecodeError as e:
    logger.warning(f"Failed to parse tags for doc {doc.get('id')}: {e}")
    doc['tags'] = []
except Exception as e:
    logger.error(f"Unexpected error parsing tags: {e}", exc_info=True)
    doc['tags'] = []

优先级: P0 - 立即修复


4. SQL 注入风险 - HIGH

位置: backend/common/db_helpers.py:135-154

search_pattern = f"%{search_term}%"  # ⚠️ 用户输入直接拼接到SQL
field_conditions = " OR ".join([f"{field} ILIKE $2" for field in fields])
query = f"""
    SELECT id, title, content, category
    FROM documents
    WHERE category = $1 AND ({field_conditions})
    ORDER BY id LIMIT $3
"""
rows = await pool.fetch(query, category, search_pattern, limit)

风险: - 虽然使用了参数化查询,但 search_pattern 包含用户输入 - 如果 search_term 包含特殊字符,可能导致意外的查询行为 - 虽然使用了 ILIKE 的参数化形式,但代码结构容易导致误解

实际风险评估: - 当前代码使用 $2 参数,所以实际上 search_pattern 的值会被正确转义 - 但代码结构容易让开发者误以为可以安全地拼接 SQL - 建议重构以提高代码清晰度

改进建议:

# ✅ 更清晰的实现
async def search_documents(
    pool: asyncpg.Pool,
    search_term: str,
    category: Optional[str] = None,
    limit: int = 10,
    fields: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
    if fields is None:
        fields = ["title", "content"]

    # 验证字段名,防止 SQL 注入
    valid_fields = {"title", "content", "category", "tags"}
    fields = [f for f in fields if f in valid_fields]

    if category:
        # 使用参数化查询
        field_placeholders = " OR ".join(
            [f"{field} ILIKE ${2}" for field in fields]
        )
        query = f"""
            SELECT id, title, content, category
            FROM documents
            WHERE category = $1 AND ({field_placeholders})
            ORDER BY id LIMIT ${len(fields) + 2}
        """
        search_pattern = f"%{search_term}%"
        rows = await pool.fetch(query, category, search_pattern, limit)
    else:
        field_placeholders = " OR ".join(
            [f"{field} ILIKE ${1}" for field in fields]
        )
        query = f"""
            SELECT id, title, content, category
            FROM documents
            WHERE {field_placeholders}
            ORDER BY id LIMIT ${len(fields) + 1}
        """
        search_pattern = f"%{search_term}%"
        rows = await pool.fetch(query, search_pattern, limit)

    return rows_to_list(rows)

优先级: P1 - 尽快修复


5. 缺少认证和授权机制 - CRITICAL

位置: backend/api/v1/gateway.py

问题: - 所有 API 端点都没有认证要求 - 任何人都可以访问所有功能 - 没有用户身份验证 - 没有权限控制

风险: - 数据完全暴露 - 恶意用户可以滥用系统资源 - 无法追踪操作日志 - 无法实现访问控制

证据:

@router.post("/gateway/query", response_model=JSONResponse)
async def gateway_query(request: GatewayQueryRequest) -> JSONResponse:
    # ❌ 没有认证检查
    # ❌ 没有授权检查
    # ❌ 任何人都可以调用

    metrics = get_metrics_collector()
    metrics.increment_counter("gateway_query_total")

    # 直接处理请求...

修复方案:

# ✅ 添加认证装饰器
from auth.jwt import require_auth
from auth.rbac import require_permission

@router.post("/gateway/query", response_model=JSONResponse)
@require_auth  # 要求用户认证
@require_permission(QUERY_EXECUTE)  # 要求特定权限
async def gateway_query(
    request: GatewayQueryRequest,
    current_user: User = Depends(get_current_user)  # 获取当前用户
) -> JSONResponse:
    # ✅ 已认证
    # ✅ 已授权

    metrics = get_metrics_collector()
    metrics.increment_counter("gateway_query_total")

    # 记录操作日志
    logger.info(f"User {current_user.id} executed query: {request.question}")

    # 处理请求...

优先级: P0 - 立即修复


6. 敏感信息泄露到日志 - HIGH

位置: 多处

问题: - 可能在日志中记录敏感信息(API密钥、密码等) - 错误消息可能泄露内部实现细节

示例:

# ⚠️ 潜在风险
logger.info(f"API call: {request}")
logger.error(f"Database error: {error}")

# 如果 request 或 error 包含敏感信息,会被记录到日志

修复方案:

# ✅ 使用敏感数据过滤器
from common.sensitive_data_filter import filter_sensitive_data

logger.info(f"API call: {filter_sensitive_data(request)}")
logger.error(f"Database error: {filter_sensitive_data(error)}")

优先级: P1 - 尽快修复


7. 缺少速率限制(全局) - MEDIUM

位置: backend/middleware/rate_limit.py

问题: - 虽然实现了速率限制中间件,但可能未正确配置 - 需要验证是否在所有关键端点上启用

当前实现:

# ✅ 有速率限制实现
class RateLimitMiddleware(BaseHTTPMiddleware):
    # 基于IP的速率限制
    # 默认60请求/分钟

验证需求: 1. 确认中间件是否已注册到 FastAPI 应用 2. 确认哪些端点被豁免(health, metrics) 3. 确认配置是否合理

优先级: P1 - 尽快验证


🟠 中危安全问题

8. 配置验证不足 - MEDIUM

位置: backend/config/__init__.py

# ⚠️ 配置验证不够严格
class Config(BaseConfig, DatabaseConfig, SecurityConfig, RedisConfig):
    # 缺少一些关键配置的验证

问题: - 某些配置项可能缺少验证 - 生产环境配置可能使用不安全的默认值

修复方案:

# ✅ 添加完整的配置验证
@field_validator('DATABASE_URL')
@classmethod
def validate_database_url(cls, v, info):
    if not v:
        raise ValueError("DATABASE_URL is required")
    # 检查是否使用默认密码
    if "zhineng123" in v:
        raise ValueError("Default database password is not allowed")
    return v

@field_validator('SECRET_KEY')
@classmethod
def validate_secret_key(cls, v, info):
    if info.data.get("ENVIRONMENT") == "production":
        if not v:
            raise ValueError("SECRET_KEY is required in production")
        if len(v) < 32:
            raise ValueError("SECRET_KEY must be at least 32 characters")
    return v


9. 缺少输入验证和清理 - MEDIUM

位置: 多处 API 端点

问题: - 虽然使用了 Pydantic 进行基础验证 - 但可能缺少更深入的验证和清理

示例:

class GatewayQueryRequest(BaseModel):
    question: str = Field(..., min_length=1, max_length=500)
    # ⚠️ 没有验证 question 的内容
    # 可能包含恶意代码、SQL注入尝试等

改进建议:

# ✅ 添加内容验证
from typing import Literal
import re

class GatewayQueryRequest(BaseModel):
    question: str = Field(..., min_length=1, max_length=500)

    @field_validator('question')
    @classmethod
    def validate_question(cls, v):
        # 检查是否包含可疑模式
        suspicious_patterns = [
            r'<script[^>]*>',  # XSS 尝试
            r'DROP\s+TABLE',  # SQL 注入尝试
            r';\s*DROP',  # SQL 注入尝试
            r'\${',  # 模板注入
        ]

        for pattern in suspicious_patterns:
            if re.search(pattern, v, re.IGNORECASE):
                raise ValueError("Input contains suspicious content")

        # 清理输入
        return v.strip()


10. 缺少 CSRF 保护 - MEDIUM

位置: 全局

问题: - 虽然实现了 JWT 认证,但可能缺少 CSRF 保护 - 对于使用 Cookie 的应用,这是必需的

修复方案:

# ✅ 添加 CSRF 保护
from fastapi_csrf_protect import CsrfProtect

@router.post("/gateway/query")
@CsrfProtect.protect  # 启用 CSRF 保护
async def gateway_query(request: GatewayQueryRequest):
    ...


11. 缺少安全响应头 - MEDIUM

位置: 全局

问题: - 虽然代码中有 add_security_headers 中间件 - 需要验证是否正确配置

应包含的响应头:

# ✅ 推荐的安全响应头
SECURITY_HEADERS = {
    "X-Content-Type-Options": "nosniff",
    "X-Frame-Options": "DENY",
    "X-XSS-Protection": "1; mode=block",
    "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
    "Content-Security-Policy": "default-src 'self'",
    "Referrer-Policy": "strict-origin-when-cross-origin",
    "Permissions-Policy": "geolocation=(), microphone=(), camera=()",
}


12-19. 其他中危问题

(详见完整报告)


🟡 低危问题

20. 代码重复 - LOW

位置: 多处

问题: - 存在重复的代码逻辑 - 降低可维护性


21. 缺少类型注解 - LOW

位置: 部分函数

问题: - 不是所有函数都有完整的类型注解 - 降低代码可读性和IDE支持


22-27. 其他低危问题

(详见完整报告)


🏗️ 系统架构分析

架构优势

  1. 清晰的分层架构
  2. API 层、服务层、数据层分离良好
  3. 使用领域驱动设计(DDD)

  4. 模块化设计

  5. 功能模块化,便于维护
  6. 使用依赖注入

  7. 异步架构

  8. 使用 async/await
  9. 性能较好

架构劣势

  1. 缺少认证授权层
  2. 最严重的问题
  3. 需要立即添加

  4. 配置管理混乱

  5. 多个配置文件
  6. 缺少统一管理

  7. 错误处理不一致

  8. 有些地方用裸异常
  9. 有些地方错误处理完善

📋 修复优先级

P0 - 本周必须修复

  1. ✅ 删除硬编码密码
  2. ✅ 修复 CORS 配置
  3. ✅ 添加认证授权
  4. ✅ 修复裸异常处理

P1 - 本月必须修复

  1. ✅ 加强输入验证
  2. ✅ 添加 CSRF 保护
  3. ✅ 完善配置验证
  4. ✅ 添加安全响应头

P2 - 下月修复

  1. ✅ 代码重构
  2. ✅ 完善类型注解
  3. ✅ 添加更多测试

🛡️ 安全加固建议

1. 实施完整的认证授权流程

# 认证流程
1. 用户登录  验证凭证  生成 JWT
2. 每次请求  验证 JWT  提取用户信息
3. 权限检查  检查用户权限  允许/拒绝

# 实现
from auth.jwt import JWTManager
from auth.rbac import RBACManager

jwt_manager = JWTManager()
rbac_manager = RBACManager()

@router.post("/api/query")
@jwt_manager.require_auth
@rbac_manager.require_permission("query:execute")
async def query_api(request: QueryRequest, user: User):
    # 处理请求
    pass

2. 实施安全配置检查清单

# 部署前检查清单
 所有密码都通过环境变量配置
 CORS 配置为白名单
 SECRET_KEY 足够长且随机
 速率限制已启用
 HTTPS 强制启用
 安全响应头已配置
 日志不包含敏感信息
 输入验证已完善

3. 实施安全监控

# 添加安全事件监控
- 登录失败次数
- 异常请求模式
- 速率限制触发次数
- 权限拒绝次数
- SQL 注入尝试

📊 测试覆盖率

当前状态

模块 覆盖率 状态
API 层 ~30% ❌ 不足
服务层 ~40% ❌ 不足
认证授权 0% ❌ 严重不足
配置管理 ~20% ❌ 不足

目标

模块 目标覆盖率
API 层 >70%
服务层 >80%
认证授权 >90%
配置管理 >60%

🎯 行动计划

第一阶段:紧急修复(1-2天)

目标: 修复所有 P0 高危问题

  1. 删除硬编码密码
  2. 修复 CORS 配置
  3. 添加基础认证
  4. 修复裸异常处理

第二阶段:安全加固(1周)

目标: 修复所有 P1 中危问题

  1. 完善认证授权系统
  2. 加强输入验证
  3. 添加 CSRF 保护
  4. 完善配置验证

第三阶段:优化提升(2-4周)

目标: 修复所有 P2 低危问题

  1. 代码重构
  2. 完善测试覆盖
  3. 性能优化
  4. 文档完善

📝 总结

关键发现

  1. 严重安全漏洞: 硬编码密码、CORS 配置错误、缺少认证
  2. 架构设计良好: 但缺少关键的安全层
  3. 代码质量中等: 需要改进错误处理和测试覆盖

立即行动项

  1. 紧急: 删除所有硬编码密码
  2. 紧急: 修复 CORS 配置
  3. 紧急: 添加认证授权机制
  4. 重要: 完善错误处理

长期改进

  1. 实施安全开发流程
  2. 定期安全审计
  3. 自动化安全测试
  4. 安全培训

报告生成时间: 2026-03-31 下次审计建议: 修复完成后重新审计 审查人员: Claude Code AI Security Auditor