认知研究数据库使用策略
文档编号: LR-DB-STRATEGY-001 版本: v1.0 创建时间: 2026-04-12 适用范围: cognitive_research.db 及相关数据
一、数据库概览
1.1 当前状态
| 指标 | 数值 | 说明 |
|---|---|---|
| 数据库大小 | 938MB | 考虑未来增长,建议优化 |
| 项目数 | 13 | 灵字辈全家仓库 |
| 会话数 | 1,727 | 所有项目的交互会话 |
| 消息数 | 144,891 | 所有消息记录 |
| 认知事件 | 9 | 已记录的认知事件 |
| 表数量 | 29 | 包括主表和视图 |
1.2 数据库结构
主表
projects- 项目信息sessions- 会话信息messages- 消息内容reasoning_parts- 推理部分cognitive_events- 认知事件(核心表)
视图和分析表
cognitive_event_stats- 认知事件统计cognitive_event_analysis- 认知事件分析reasoning_stats_by_date- 按日期的推理统计session_heatmap- 会话热力图project_stats- 项目统计- 等 20+ 个分析视图
二、安全策略
2.1 数据备份
策略1:自动每日备份
# 创建每日备份脚本:scripts/backup_database.sh
#!/bin/bash
DB_PATH="/home/ai/lingresearch/database/cognitive_research.db"
BACKUP_DIR="/home/ai/lingresearch/database/backups"
DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/cognitive_research_${DATE}.db"
# 创建备份目录
mkdir -p "$BACKUP_DIR"
# 执行备份
sqlite3 "$DB_PATH" ".backup '$BACKUP_FILE'"
# 压缩备份
gzip "$BACKUP_FILE"
# 保留最近30天的备份
find "$BACKUP_DIR" -name "*.db.gz" -mtime +30 -delete
echo "✅ 备份完成: ${BACKUP_FILE}.gz"
策略2:关键操作前手动备份
# 在执行删除、修改等危险操作前
BACKUP_FILE="database/backups/manual_$(date +%Y%m%d_%H%M%S).db"
sqlite3 database/cognitive_research.db ".backup '$BACKUP_FILE'"
echo "✅ 手动备份: $BACKUP_FILE"
策略3:增量备份(针对大表)
# scripts/incremental_backup.py
import sqlite3
import json
from pathlib import Path
def incremental_backup():
"""只备份新增/修改的数据"""
db_path = "database/cognitive_research.db"
state_file = "database/backup_state.json"
# 读取上次备份时间
if Path(state_file).exists():
with open(state_file) as f:
state = json.load(f)
last_backup = state.get("last_backup_ms", 0)
else:
last_backup = 0
# 备份新增/修改的消息
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 导出新增/修改的数据
cursor.execute("""
SELECT * FROM messages
WHERE created_at_ms > ? OR updated_at_ms > ?
""", (last_backup, last_backup))
# 保存到增量备份文件
backup_file = f"database/backups/incremental_{int(time.time()*1000)}.json"
with open(backup_file, 'w') as f:
json.dump(cursor.fetchall(), f)
# 更新备份状态
with open(state_file, 'w') as f:
json.dump({"last_backup_ms": int(time.time()*1000)}, f)
print(f"✅ 增量备份完成: {backup_file}")
2.2 访问控制
策略1:文件权限管理
# 设置严格的文件权限
chmod 600 database/cognitive_research.db # 只有所有者可读写
chmod 600 database/import_state.json # 导入状态文件
chmod 700 database/ # 数据库目录
# 备份文件
chmod 644 database/backups/*.db.gz # 备份文件可读
策略2:SQLite 安全配置
# 创建数据库时设置安全选项
import sqlite3
conn = sqlite3.connect("database/cognitive_research.db")
# 启用外键约束
conn.execute("PRAGMA foreign_keys = ON")
# 设置 WAL 模式(Write-Ahead Logging)
conn.execute("PRAGMA journal_mode = WAL")
# 设置同步模式
conn.execute("PRAGMA synchronous = FULL")
# 禁用内存映射(避免文件锁定问题)
conn.execute("PRAGMA mmap_size = 0")
conn.commit()
策略3:查询审计日志
# 记录所有查询操作
import sqlite3
import time
from datetime import datetime
class AuditedConnection:
def __init__(self, db_path):
self.conn = sqlite3.connect(db_path)
self.log_file = open("database/query_audit.log", "a")
def execute(self, query, params=None):
# 记录查询
timestamp = datetime.now().isoformat()
self.log_file.write(f"[{timestamp}] {query}\n")
if params:
self.log_file.write(f" Params: {params}\n")
self.log_file.flush()
# 执行查询
cursor = self.conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor
def close(self):
self.log_file.close()
self.conn.close()
2.3 数据完整性
策略1:事务管理
# 所有写入操作使用事务
import sqlite3
def safe_write(operation_func):
"""装饰器:确保操作在事务中执行"""
def wrapper(*args, **kwargs):
conn = sqlite3.connect("database/cognitive_research.db")
try:
# 开始事务
conn.execute("BEGIN IMMEDIATE")
# 执行操作
result = operation_func(conn, *args, **kwargs)
# 提交事务
conn.commit()
return result
except Exception as e:
# 回滚事务
conn.rollback()
print(f"❌ 操作失败,已回滚: {e}")
raise
finally:
conn.close()
return wrapper
@safe_write
def insert_cognitive_event(conn, event_data):
"""插入认知事件"""
conn.execute("""
INSERT INTO cognitive_events (
session_id, message_id, reasoning_part_id,
event_type, event_subtype, title, description,
detected_at_ms, severity, confidence, metadata, tags
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", event_data)
return conn.lastrowid
策略2:数据验证
# 验证数据完整性
def validate_cognitive_event(data):
"""验证认知事件数据"""
required_fields = [
'session_id', 'event_type', 'title', 'detected_at_ms'
]
# 检查必填字段
for field in required_fields:
if field not in data or not data[field]:
raise ValueError(f"缺少必填字段: {field}")
# 验证 severity 值
valid_severities = ['info', 'warning', 'critical']
severity = data.get('severity', 'info')
if severity not in valid_severities:
raise ValueError(f"无效的 severity: {severity}")
# 验证 confidence 范围
confidence = data.get('confidence', 0.0)
if not 0.0 <= confidence <= 1.0:
raise ValueError(f"confidence 必须在 0-1 之间: {confidence}")
# 验证时间戳
detected_at = data.get('detected_at_ms')
if not isinstance(detected_at, int) or detected_at <= 0:
raise ValueError(f"无效的时间戳: {detected_at}")
return True
2.4 敏感数据保护
策略1:加密存储
from cryptography.fernet import Fernet
# 生成密钥(只运行一次)
key = Fernet.generate_key()
cipher = Fernet(key)
# 加密敏感字段
def encrypt_sensitive_data(data: str) -> str:
"""加密敏感数据"""
encrypted = cipher.encrypt(data.encode())
return encrypted.decode('latin-1')
def decrypt_sensitive_data(encrypted_data: str) -> str:
"""解密敏感数据"""
decrypted = cipher.decrypt(encrypted_data.encode('latin-1'))
return decrypted.decode()
# 使用示例
metadata = {
"internal_thought": encrypt_sensitive_data("内部敏感信息"),
"public_description": "公开描述"
}
策略2:数据脱敏(用于分析)
def anonymize_message(content: str) -> str:
"""脱敏消息内容,保留认知特征"""
# 移除个人信息(示例)
import re
content = re.sub(r'[\w\.-]+@[\w\.-]+\.\w+', '[EMAIL]', content)
content = re.sub(r'\b\d{11}\b', '[PHONE]', content)
content = re.sub(r'http[s]?://\S+', '[URL]', content)
# 保留认知相关关键词
cognitive_keywords = ['身份', '漂移', '幻觉', '审计', '认知']
for keyword in cognitive_keywords:
if keyword in content:
return content # 保留完整内容
return "[REDACTED: 非认知相关消息]"
三、效率策略
3.1 查询优化
策略1:使用预编译语句
import sqlite3
# 预编译常用查询
class QueryManager:
def __init__(self, db_path):
self.conn = sqlite3.connect(db_path)
self.queries = {}
# 预编译查询
self._prepare_queries()
def _prepare_queries(self):
"""预编译常用查询"""
# 按事件类型查询
self.queries['by_type'] = self.conn.cursor()
self.queries['by_type'].prepare("""
SELECT * FROM cognitive_events
WHERE event_type = ? AND detected_at_ms >= ?
ORDER BY detected_at_ms DESC
""")
# 按严重度查询
self.queries['by_severity'] = self.conn.cursor()
self.queries['by_severity'].prepare("""
SELECT * FROM cognitive_events
WHERE severity = ? ORDER BY detected_at_ms DESC LIMIT ?
""")
# 统计查询
self.queries['stats'] = self.conn.cursor()
self.queries['stats'].prepare("""
SELECT event_type, severity, COUNT(*) as count
FROM cognitive_events
GROUP BY event_type, severity
""")
def get_events_by_type(self, event_type, since_ms):
"""按类型查询事件"""
self.queries['by_type'].execute((event_type, since_ms))
return self.queries['by_type'].fetchall()
策略2:批量插入
def batch_insert_events(events_data):
"""批量插入认知事件"""
conn = sqlite3.connect("database/cognitive_research.db")
cursor = conn.cursor()
try:
# 开始事务
conn.execute("BEGIN IMMEDIATE")
# 批量插入
cursor.executemany("""
INSERT INTO cognitive_events (
session_id, message_id, event_type, event_subtype,
title, description, detected_at_ms, severity,
confidence, metadata, tags
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", events_data)
conn.commit()
print(f"✅ 批量插入 {len(events_data)} 个事件")
except Exception as e:
conn.rollback()
print(f"❌ 批量插入失败: {e}")
raise
finally:
conn.close()
策略3:使用视图简化复杂查询
# 创建常用分析视图
def create_analytic_views():
conn = sqlite3.connect("database/cognitive_research.db")
# 认知事件趋势视图
conn.execute("""
CREATE VIEW IF NOT EXISTS cognitive_events_trend AS
SELECT
DATE(detected_at_ms / 1000, 'unixepoch') as date,
event_type,
severity,
COUNT(*) as count
FROM cognitive_events
WHERE detected_at_ms > strftime('%s', 'now', '-30 days') * 1000
GROUP BY date, event_type, severity
ORDER BY date DESC
""")
# Agent 行为摘要视图
conn.execute("""
CREATE VIEW IF NOT EXISTS agent_behavior_summary AS
SELECT
p.name as project,
COUNT(DISTINCT s.id) as total_sessions,
COUNT(DISTINCT s.agent) as agent_count,
COUNT(m.id) as total_messages,
COUNT(ce.id) as cognitive_events_count
FROM sessions s
JOIN projects p ON s.project_id = p.id
LEFT JOIN messages m ON m.session_id = s.id
LEFT JOIN cognitive_events ce ON ce.session_id = s.id
GROUP BY p.id
""")
conn.commit()
conn.close()
3.2 索引优化
策略1:监控索引使用情况
def analyze_index_usage():
"""分析索引使用情况"""
conn = sqlite3.connect("database/cognitive_research.db")
cursor = conn.cursor()
# 获取所有表
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
for (table_name,) in tables:
# 分析表的查询计划
cursor.execute(f"EXPLAIN QUERY PLAN SELECT * FROM {table_name} LIMIT 1")
plans = cursor.fetchall()
# 检查是否使用了索引
uses_index = any("USING INDEX" in str(plan) for plan in plans)
print(f"表: {table_name}")
print(f" 使用索引: {'✅' if uses_index else '❌'}")
print(f" 记录数: {cursor.execute(f'SELECT COUNT(*) FROM {table_name}').fetchone()[0]}")
conn.close()
策略2:动态索引管理
def optimize_indexes():
"""优化索引配置"""
conn = sqlite3.connect("database/cognitive_research.db")
# 检查慢查询日志
slow_queries = """
SELECT sql FROM sqlite_master
WHERE sql IS NOT NULL AND sql LIKE '%cognitive_events%'
"""
# 常用查询模式
query_patterns = [
("event_type", "event_type = ?"),
("severity", "severity = ?"),
("detected_at_ms", "detected_at_ms > ?"),
("session_id", "session_id = ?"),
]
# 创建复合索引
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_cognitive_events_type_severity
ON cognitive_events(event_type, severity)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_cognitive_events_detected_type
ON cognitive_events(detected_at_ms DESC, event_type)
""")
conn.commit()
conn.close()
3.3 数据库维护
策略1:定期 VACUUM
#!/bin/bash
# scripts/vacuum_database.sh
DB_PATH="/home/ai/lingresearch/database/cognitive_research.db"
# 执行 VACUUM(压缩数据库)
echo "开始 VACUUM..."
sqlite3 "$DB_PATH" "VACUUM"
# 执行 ANALYZE(更新统计信息)
echo "开始 ANALYZE..."
sqlite3 "$DB_PATH" "ANALYZE"
echo "✅ 数据库优化完成"
策略2:定期检查数据库完整性
def check_database_integrity():
"""检查数据库完整性"""
conn = sqlite3.connect("database/cognitive_research.db")
cursor = conn.cursor()
# 检查完整性
cursor.execute("PRAGMA integrity_check")
result = cursor.fetchall()
if result[0][0] == "ok":
print("✅ 数据库完整性检查通过")
else:
print("❌ 数据库完整性检查失败:")
for row in result:
print(f" {row[0]}")
# 检查外键约束
cursor.execute("PRAGMA foreign_key_check")
violations = cursor.fetchall()
if not violations:
print("✅ 外键约束检查通过")
else:
print("❌ 外键约束违反:")
for row in violations:
print(f" {row}")
conn.close()
3.4 性能监控
策略1:查询性能监控
import time
from functools import wraps
def monitor_query_performance(func):
"""监控查询性能的装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
elapsed = time.time() - start_time
# 记录慢查询(超过1秒)
if elapsed > 1.0:
print(f"⚠️ 慢查询警告: {func.__name__} 耗时 {elapsed:.2f}s")
# 记录到日志
with open("database/slow_queries.log", "a") as f:
f.write(f"{time.time()} {func.__name__} {elapsed:.2f}s\n")
# 记录所有查询
with open("database/query_performance.log", "a") as f:
f.write(f"{time.time()} {func.__name__} {elapsed:.4f}s\n")
return wrapper
@monitor_query_performance
def get_all_critical_events():
"""获取所有严重事件"""
conn = sqlite3.connect("database/cognitive_research.db")
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM cognitive_events
WHERE severity = 'critical'
ORDER BY detected_at_ms DESC
""")
result = cursor.fetchall()
conn.close()
return result
策略2:数据库大小监控
import os
from pathlib import Path
def monitor_database_size():
"""监控数据库大小"""
db_path = Path("database/cognitive_research.db")
size_mb = db_path.stat().st_size / (1024 * 1024)
print(f"数据库大小: {size_mb:.2f} MB")
# 警告阈值
if size_mb > 1000:
print("⚠️ 数据库过大,建议执行 VACUUM")
elif size_mb > 2000:
print("❌ 数据库过大,建议归档旧数据")
# 检查增长速度
growth_rate = calculate_growth_rate()
if growth_rate > 10: # 每天增长超过10MB
print(f"⚠️ 数据库增长过快: {growth_rate:.2f} MB/天")
四、使用流程
4.1 数据导入流程
4.2 认知事件记录流程
# scripts/record_cognitive_event.py
def record_cognitive_event(event_data):
"""记录认知事件的完整流程"""
# 1. 验证数据
validate_cognitive_event(event_data)
# 2. 加密敏感数据
if 'metadata' in event_data:
event_data['metadata'] = encrypt_metadata(event_data['metadata'])
# 3. 写入数据库(使用事务)
@safe_write
def insert_event(conn, data):
# 检查是否重复
cursor = conn.cursor()
cursor.execute("""
SELECT id FROM cognitive_events
WHERE message_id = ? AND event_type = ?
AND detected_at_ms = ?
""", (data.get('message_id'), data['event_type'], data['detected_at_ms']))
if cursor.fetchone():
print("⚠️ 事件已存在,跳过")
return None
# 插入新事件
cursor.execute("""
INSERT INTO cognitive_events (
session_id, message_id, reasoning_part_id,
event_type, event_subtype, title, description,
detected_at_ms, severity, confidence, metadata, tags
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
data['session_id'], data.get('message_id'), data.get('reasoning_part_id'),
data['event_type'], data.get('event_subtype'), data['title'],
data.get('description', ''), data['detected_at_ms'],
data.get('severity', 'info'), data.get('confidence', 0.0),
json.dumps(data.get('metadata', {})), data.get('tags', '')
))
return cursor.lastrowid
# 4. 执行插入
event_id = insert_event(event_data)
# 5. 触发通知
if event_id and event_data.get('severity') == 'critical':
notify_critical_event(event_id, event_data)
return event_id
4.3 数据分析流程
def analyze_cognitive_patterns():
"""分析认知模式的标准流程"""
# 1. 加载最近的数据
events = load_recent_events(days=7)
# 2. 聚合分析
patterns = {
'by_agent': group_by_agent(events),
'by_type': group_by_type(events),
'by_severity': group_by_severity(events),
'by_time': group_by_time(events),
}
# 3. 异常检测
anomalies = detect_anomalies(patterns)
# 4. 生成报告
report = generate_analysis_report(patterns, anomalies)
# 5. 保存报告
save_analysis_report(report)
# 6. 如果发现严重异常,触发告警
if anomalies.get('critical'):
alert_critical_anomalies(anomalies['critical'])
return report
五、最佳实践
5.1 日常使用
# ✅ 推荐:使用上下文管理器
with sqlite3.connect("database/cognitive_research.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM cognitive_events LIMIT 10")
results = cursor.fetchall()
# 自动关闭连接
# ❌ 避免:忘记关闭连接
conn = sqlite3.connect("database/cognitive_research.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM cognitive_events")
# 忘记关闭连接!
5.2 错误处理
# ✅ 推荐:完整的错误处理
def query_with_retry(query, params=None, max_retries=3):
for attempt in range(max_retries):
try:
with sqlite3.connect("database/cognitive_research.db") as conn:
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor.fetchall()
except sqlite3.OperationalError as e:
if "database is locked" in str(e) and attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1))
continue
raise
# ❌ 避免:没有错误处理
def unsafe_query(query):
conn = sqlite3.connect("database/cognitive_research.db")
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
5.3 数据库连接池
import threading
import sqlite3
class DatabaseConnectionPool:
"""简单的数据库连接池"""
def __init__(self, db_path, max_connections=5):
self.db_path = db_path
self.max_connections = max_connections
self.connections = []
self.lock = threading.Lock()
def get_connection(self):
with self.lock:
if self.connections:
return self.connections.pop()
return sqlite3.connect(self.db_path)
def release_connection(self, conn):
with self.lock:
if len(self.connections) < self.max_connections:
self.connections.append(conn)
else:
conn.close()
# 使用示例
pool = DatabaseConnectionPool("database/cognitive_research.db")
def safe_query(query):
conn = pool.get_connection()
try:
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
finally:
pool.release_connection(conn)
六、监控和告警
6.1 健康检查
def health_check():
"""数据库健康检查"""
checks = {
'connection': check_connection(),
'integrity': check_integrity(),
'size': check_size(),
'performance': check_performance(),
'backup': check_backup(),
}
# 生成健康报告
health_score = sum(checks.values()) / len(checks)
if health_score >= 0.9:
status = "✅ 健康"
elif health_score >= 0.7:
status = "⚠️ 警告"
else:
status = "❌ 不健康"
return {
'status': status,
'score': health_score,
'checks': checks,
}
6.2 告警规则
| 指标 | 阈值 | 告警级别 | 处理建议 |
|---|---|---|---|
| 数据库大小 | > 2GB | Warning | 执行 VACUUM 或归档 |
| 慢查询 | > 5s | Critical | 优化查询或添加索引 |
| 完整性检查 | 失败 | Critical | 立即修复或恢复备份 |
| 备份延迟 | > 48h | Warning | 检查备份任务 |
| 认知事件 | Critical > 5/天 | Critical | 立即审计 |
七、紧急处理
7.1 数据库损坏
# 如果数据库损坏,尝试恢复
cd /home/ai/lingresearch
# 1. 创建损坏数据库的副本
cp database/cognitive_research.db database/cognitive_research.db.corrupted
# 2. 尝试导出数据
sqlite3 database/cognitive_research.db.corrupted ".dump" > database/recovery.sql
# 3. 从导出的 SQL 重建数据库
sqlite3 database/cognitive_research_restored.db < database/recovery.sql
# 4. 验证恢复的数据库
sqlite3 database/cognitive_research_restored.db "PRAGMA integrity_check"
7.2 数据库锁定
def resolve_locked_database():
"""解决数据库锁定问题"""
import time
import sqlite3
max_wait = 30 # 最多等待30秒
start_time = time.time()
while time.time() - start_time < max_wait:
try:
conn = sqlite3.connect("database/cognitive_research.db", timeout=5)
conn.execute("SELECT 1")
conn.close()
print("✅ 数据库锁定已解除")
return True
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
time.sleep(1)
continue
raise
print("❌ 无法解除数据库锁定")
return False
7.3 数据回滚
# 从备份恢复数据
cd /home/ai/lingresearch
# 1. 停止所有访问数据库的程序
# 2. 找到最近的备份
ls -lt database/backups/ | head -5
# 3. 恢复备份
gunzip database/backups/cognitive_research_YYYYMMDD_HHMMSS.db.gz
cp database/backups/cognitive_research_YYYYMMDD_HHMMSS.db database/cognitive_research.db
# 4. 验证恢复
sqlite3 database/cognitive_research.db "PRAGMA integrity_check"
八、总结
8.1 关键要点
- 安全第一
- 定期备份
- 严格权限控制
-
完整性验证
-
性能优化
- 使用索引
- 批量操作
-
预编译查询
-
流程规范
- 事务管理
- 错误处理
-
日志记录
-
持续监控
- 健康检查
- 性能监控
- 告警机制
8.2 下一步行动
- [ ] 设置每日自动备份
- [ ] 配置查询性能监控
- [ ] 创建健康检查脚本
- [ ] 编写数据分析模板
- [ ] 建立告警机制
文档创建时间: 2026-04-12 文档版本: v1.0 下次更新: 根据实际使用情况调整