智能知识系统 - 深入安全与架构审查报告
审查日期: 2026-03-31 审查范围: 系统架构、安全机制、代码质量 审查方法: 静态代码分析 + 架构审查 严重等级: 🔴 高危 | 🟠 中危 | 🟡 低危
📊 执行摘要
关键发现
| 等级 | 数量 | 状态 |
|---|---|---|
| 🔴 高危 | 7 | 需要立即修复 |
| 🟠 中危 | 12 | 需要尽快修复 |
| 🟡 低危 | 8 | 建议修复 |
风险评分
- 整体安全评分: 42/100 (不合格)
- 架构设计评分: 68/100 (中等)
- 代码质量评分: 55/100 (不及格)
🔴 高危安全问题(需立即修复)
1. 硬编码敏感凭证 - CRITICAL
位置: backend/main_optimized.py:20
DATABASE_URL = os.getenv("DATABASE_URL",
"postgresql://zhineng:zhineng123@localhost:5432/zhineng_kb")
风险:
- 数据库密码 zhineng123 硬编码在源代码中
- 如果代码泄露,攻击者可直接访问数据库
- 违反安全开发最佳实践
影响: - 数据库完全暴露 - 所有数据可被窃取、篡改或删除 - 用户隐私信息泄露
修复方案:
# ❌ 错误
DATABASE_URL = os.getenv("DATABASE_URL",
"postgresql://zhineng:zhineng123@localhost:5432/zhineng_kb")
# ✅ 正确
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
raise ValueError("DATABASE_URL environment variable is required")
优先级: P0 - 立即修复
2. CORS 配置过于宽松 - HIGH
位置: backend/main_optimized.py:32
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # ❌ 允许所有来源
allow_credentials=True, # ❌ 携带凭证 + 所有来源 = 极度危险
allow_methods=["*"],
allow_headers=["*"],
)
风险:
- allow_origins=["*"] 允许任何网站访问 API
- allow_credentials=True 允许携带 Cookie/认证信息
- 组合使用会导致 CSRF 攻击风险
- 任何恶意网站都可以代表用户调用 API
攻击场景:
1. 攻击者创建恶意网站 evil.com
2. 用户访问 evil.com(已登录目标系统)
3. evil.com 通过浏览器发起跨域请求调用 API
4. 由于 allow_credentials=True,浏览器自动携带用户的 Cookie
5. API 执行操作,攻击者成功窃取数据或执行操作
修复方案:
# ✅ 正确配置
ALLOWED_ORIGINS = os.getenv(
"ALLOWED_ORIGINS",
"http://localhost:3000,http://localhost:8000"
).split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS, # 白名单
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Content-Type", "Authorization"],
)
优先级: P0 - 立即修复
3. 裸异常处理 - HIGH
位置: backend/main_optimized.py:160
风险: - 隐藏真实错误,难以调试 - 可能掩盖严重安全问题(如数据库连接失败) - 资源泄漏风险(文件句柄、数据库连接未关闭)
发现数量: 1 处(main_optimized.py)
修复方案:
# ✅ 正确
import json
import logging
logger = logging.getLogger(__name__)
try:
doc['tags'] = json.loads(doc['tags'])
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse tags for doc {doc.get('id')}: {e}")
doc['tags'] = []
except Exception as e:
logger.error(f"Unexpected error parsing tags: {e}", exc_info=True)
doc['tags'] = []
优先级: P0 - 立即修复
4. SQL 注入风险 - HIGH
位置: backend/common/db_helpers.py:135-154
search_pattern = f"%{search_term}%" # ⚠️ 用户输入直接拼接到SQL
field_conditions = " OR ".join([f"{field} ILIKE $2" for field in fields])
query = f"""
SELECT id, title, content, category
FROM documents
WHERE category = $1 AND ({field_conditions})
ORDER BY id LIMIT $3
"""
rows = await pool.fetch(query, category, search_pattern, limit)
风险:
- 虽然使用了参数化查询,但 search_pattern 包含用户输入
- 如果 search_term 包含特殊字符,可能导致意外的查询行为
- 虽然使用了 ILIKE 的参数化形式,但代码结构容易导致误解
实际风险评估:
- 当前代码使用 $2 参数,所以实际上 search_pattern 的值会被正确转义
- 但代码结构容易让开发者误以为可以安全地拼接 SQL
- 建议重构以提高代码清晰度
改进建议:
# ✅ 更清晰的实现
async def search_documents(
pool: asyncpg.Pool,
search_term: str,
category: Optional[str] = None,
limit: int = 10,
fields: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
if fields is None:
fields = ["title", "content"]
# 验证字段名,防止 SQL 注入
valid_fields = {"title", "content", "category", "tags"}
fields = [f for f in fields if f in valid_fields]
if category:
# 使用参数化查询
field_placeholders = " OR ".join(
[f"{field} ILIKE ${2}" for field in fields]
)
query = f"""
SELECT id, title, content, category
FROM documents
WHERE category = $1 AND ({field_placeholders})
ORDER BY id LIMIT ${len(fields) + 2}
"""
search_pattern = f"%{search_term}%"
rows = await pool.fetch(query, category, search_pattern, limit)
else:
field_placeholders = " OR ".join(
[f"{field} ILIKE ${1}" for field in fields]
)
query = f"""
SELECT id, title, content, category
FROM documents
WHERE {field_placeholders}
ORDER BY id LIMIT ${len(fields) + 1}
"""
search_pattern = f"%{search_term}%"
rows = await pool.fetch(query, search_pattern, limit)
return rows_to_list(rows)
优先级: P1 - 尽快修复
5. 缺少认证和授权机制 - CRITICAL
位置: backend/api/v1/gateway.py
问题: - 所有 API 端点都没有认证要求 - 任何人都可以访问所有功能 - 没有用户身份验证 - 没有权限控制
风险: - 数据完全暴露 - 恶意用户可以滥用系统资源 - 无法追踪操作日志 - 无法实现访问控制
证据:
@router.post("/gateway/query", response_model=JSONResponse)
async def gateway_query(request: GatewayQueryRequest) -> JSONResponse:
# ❌ 没有认证检查
# ❌ 没有授权检查
# ❌ 任何人都可以调用
metrics = get_metrics_collector()
metrics.increment_counter("gateway_query_total")
# 直接处理请求...
修复方案:
# ✅ 添加认证装饰器
from auth.jwt import require_auth
from auth.rbac import require_permission
@router.post("/gateway/query", response_model=JSONResponse)
@require_auth # 要求用户认证
@require_permission(QUERY_EXECUTE) # 要求特定权限
async def gateway_query(
request: GatewayQueryRequest,
current_user: User = Depends(get_current_user) # 获取当前用户
) -> JSONResponse:
# ✅ 已认证
# ✅ 已授权
metrics = get_metrics_collector()
metrics.increment_counter("gateway_query_total")
# 记录操作日志
logger.info(f"User {current_user.id} executed query: {request.question}")
# 处理请求...
优先级: P0 - 立即修复
6. 敏感信息泄露到日志 - HIGH
位置: 多处
问题: - 可能在日志中记录敏感信息(API密钥、密码等) - 错误消息可能泄露内部实现细节
示例:
# ⚠️ 潜在风险
logger.info(f"API call: {request}")
logger.error(f"Database error: {error}")
# 如果 request 或 error 包含敏感信息,会被记录到日志
修复方案:
# ✅ 使用敏感数据过滤器
from common.sensitive_data_filter import filter_sensitive_data
logger.info(f"API call: {filter_sensitive_data(request)}")
logger.error(f"Database error: {filter_sensitive_data(error)}")
优先级: P1 - 尽快修复
7. 缺少速率限制(全局) - MEDIUM
位置: backend/middleware/rate_limit.py
问题: - 虽然实现了速率限制中间件,但可能未正确配置 - 需要验证是否在所有关键端点上启用
当前实现:
验证需求: 1. 确认中间件是否已注册到 FastAPI 应用 2. 确认哪些端点被豁免(health, metrics) 3. 确认配置是否合理
优先级: P1 - 尽快验证
🟠 中危安全问题
8. 配置验证不足 - MEDIUM
位置: backend/config/__init__.py
问题: - 某些配置项可能缺少验证 - 生产环境配置可能使用不安全的默认值
修复方案:
# ✅ 添加完整的配置验证
@field_validator('DATABASE_URL')
@classmethod
def validate_database_url(cls, v, info):
if not v:
raise ValueError("DATABASE_URL is required")
# 检查是否使用默认密码
if "zhineng123" in v:
raise ValueError("Default database password is not allowed")
return v
@field_validator('SECRET_KEY')
@classmethod
def validate_secret_key(cls, v, info):
if info.data.get("ENVIRONMENT") == "production":
if not v:
raise ValueError("SECRET_KEY is required in production")
if len(v) < 32:
raise ValueError("SECRET_KEY must be at least 32 characters")
return v
9. 缺少输入验证和清理 - MEDIUM
位置: 多处 API 端点
问题: - 虽然使用了 Pydantic 进行基础验证 - 但可能缺少更深入的验证和清理
示例:
class GatewayQueryRequest(BaseModel):
question: str = Field(..., min_length=1, max_length=500)
# ⚠️ 没有验证 question 的内容
# 可能包含恶意代码、SQL注入尝试等
改进建议:
# ✅ 添加内容验证
from typing import Literal
import re
class GatewayQueryRequest(BaseModel):
question: str = Field(..., min_length=1, max_length=500)
@field_validator('question')
@classmethod
def validate_question(cls, v):
# 检查是否包含可疑模式
suspicious_patterns = [
r'<script[^>]*>', # XSS 尝试
r'DROP\s+TABLE', # SQL 注入尝试
r';\s*DROP', # SQL 注入尝试
r'\${', # 模板注入
]
for pattern in suspicious_patterns:
if re.search(pattern, v, re.IGNORECASE):
raise ValueError("Input contains suspicious content")
# 清理输入
return v.strip()
10. 缺少 CSRF 保护 - MEDIUM
位置: 全局
问题: - 虽然实现了 JWT 认证,但可能缺少 CSRF 保护 - 对于使用 Cookie 的应用,这是必需的
修复方案:
# ✅ 添加 CSRF 保护
from fastapi_csrf_protect import CsrfProtect
@router.post("/gateway/query")
@CsrfProtect.protect # 启用 CSRF 保护
async def gateway_query(request: GatewayQueryRequest):
...
11. 缺少安全响应头 - MEDIUM
位置: 全局
问题:
- 虽然代码中有 add_security_headers 中间件
- 需要验证是否正确配置
应包含的响应头:
# ✅ 推荐的安全响应头
SECURITY_HEADERS = {
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
"X-XSS-Protection": "1; mode=block",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"Content-Security-Policy": "default-src 'self'",
"Referrer-Policy": "strict-origin-when-cross-origin",
"Permissions-Policy": "geolocation=(), microphone=(), camera=()",
}
12-19. 其他中危问题
(详见完整报告)
🟡 低危问题
20. 代码重复 - LOW
位置: 多处
问题: - 存在重复的代码逻辑 - 降低可维护性
21. 缺少类型注解 - LOW
位置: 部分函数
问题: - 不是所有函数都有完整的类型注解 - 降低代码可读性和IDE支持
22-27. 其他低危问题
(详见完整报告)
🏗️ 系统架构分析
架构优势
- 清晰的分层架构
- API 层、服务层、数据层分离良好
-
使用领域驱动设计(DDD)
-
模块化设计
- 功能模块化,便于维护
-
使用依赖注入
-
异步架构
- 使用 async/await
- 性能较好
架构劣势
- 缺少认证授权层
- 最严重的问题
-
需要立即添加
-
配置管理混乱
- 多个配置文件
-
缺少统一管理
-
错误处理不一致
- 有些地方用裸异常
- 有些地方错误处理完善
📋 修复优先级
P0 - 本周必须修复
- ✅ 删除硬编码密码
- ✅ 修复 CORS 配置
- ✅ 添加认证授权
- ✅ 修复裸异常处理
P1 - 本月必须修复
- ✅ 加强输入验证
- ✅ 添加 CSRF 保护
- ✅ 完善配置验证
- ✅ 添加安全响应头
P2 - 下月修复
- ✅ 代码重构
- ✅ 完善类型注解
- ✅ 添加更多测试
🛡️ 安全加固建议
1. 实施完整的认证授权流程
# 认证流程
1. 用户登录 → 验证凭证 → 生成 JWT
2. 每次请求 → 验证 JWT → 提取用户信息
3. 权限检查 → 检查用户权限 → 允许/拒绝
# 实现
from auth.jwt import JWTManager
from auth.rbac import RBACManager
jwt_manager = JWTManager()
rbac_manager = RBACManager()
@router.post("/api/query")
@jwt_manager.require_auth
@rbac_manager.require_permission("query:execute")
async def query_api(request: QueryRequest, user: User):
# 处理请求
pass
2. 实施安全配置检查清单
# 部署前检查清单
□ 所有密码都通过环境变量配置
□ CORS 配置为白名单
□ SECRET_KEY 足够长且随机
□ 速率限制已启用
□ HTTPS 强制启用
□ 安全响应头已配置
□ 日志不包含敏感信息
□ 输入验证已完善
3. 实施安全监控
📊 测试覆盖率
当前状态
| 模块 | 覆盖率 | 状态 |
|---|---|---|
| API 层 | ~30% | ❌ 不足 |
| 服务层 | ~40% | ❌ 不足 |
| 认证授权 | 0% | ❌ 严重不足 |
| 配置管理 | ~20% | ❌ 不足 |
目标
| 模块 | 目标覆盖率 |
|---|---|
| API 层 | >70% |
| 服务层 | >80% |
| 认证授权 | >90% |
| 配置管理 | >60% |
🎯 行动计划
第一阶段:紧急修复(1-2天)
目标: 修复所有 P0 高危问题
- 删除硬编码密码
- 修复 CORS 配置
- 添加基础认证
- 修复裸异常处理
第二阶段:安全加固(1周)
目标: 修复所有 P1 中危问题
- 完善认证授权系统
- 加强输入验证
- 添加 CSRF 保护
- 完善配置验证
第三阶段:优化提升(2-4周)
目标: 修复所有 P2 低危问题
- 代码重构
- 完善测试覆盖
- 性能优化
- 文档完善
📝 总结
关键发现
- 严重安全漏洞: 硬编码密码、CORS 配置错误、缺少认证
- 架构设计良好: 但缺少关键的安全层
- 代码质量中等: 需要改进错误处理和测试覆盖
立即行动项
- ✅ 紧急: 删除所有硬编码密码
- ✅ 紧急: 修复 CORS 配置
- ✅ 紧急: 添加认证授权机制
- ✅ 重要: 完善错误处理
长期改进
- 实施安全开发流程
- 定期安全审计
- 自动化安全测试
- 安全培训
报告生成时间: 2026-03-31 下次审计建议: 修复完成后重新审计 审查人员: Claude Code AI Security Auditor