跳转至

认知研究数据库快速入门

文档编号: LR-DB-QUICKSTART-001 版本: v1.0 创建时间: 2026-04-12


一、快速开始

1.1 数据库状态检查

cd /home/ai/lingresearch

# 运行健康检查
python3 scripts/health_check.py

# 查看健康报告
cat database/health_report.json | jq

1.2 备份数据库

# 手动备份
bash scripts/backup_database.sh

# 查看备份文件
ls -lh database/backups/

1.3 基本查询

# 进入数据库
sqlite3 database/cognitive_research.db

# 查看所有表
.tables

# 查看认知事件
SELECT * FROM cognitive_events ORDER BY detected_at_ms DESC LIMIT 10;

# 退出
.quit

二、常用查询

2.1 认知事件查询

-- 查看所有严重事件
SELECT
    id, event_type, severity, title,
    datetime(detected_at_ms/1000, 'unixepoch') as time
FROM cognitive_events
WHERE severity = 'critical'
ORDER BY detected_at_ms DESC;

-- 按类型统计事件
SELECT
    event_type, severity, COUNT(*) as count
FROM cognitive_events
GROUP BY event_type, severity
ORDER BY count DESC;

-- 最近7天的趋势
SELECT
    date(detected_at_ms/1000, 'unixepoch') as date,
    event_type, COUNT(*) as count
FROM cognitive_events
WHERE detected_at_ms > strftime('%s', 'now', '-7 days') * 1000
GROUP BY date, event_type
ORDER BY date DESC;

2.2 Agent 行为分析

-- 查看各项目的统计
SELECT
    p.name as project,
    COUNT(DISTINCT s.id) as sessions,
    COUNT(DISTINCT s.agent) as agents,
    COUNT(m.id) as messages,
    COUNT(ce.id) as events
FROM projects p
JOIN sessions s ON s.project_id = p.id
LEFT JOIN messages m ON m.session_id = s.id
LEFT JOIN cognitive_events ce ON ce.session_id = s.id
GROUP BY p.id
ORDER BY sessions DESC;

-- 查看某个项目的所有事件
SELECT
    ce.*,
    s.agent
FROM cognitive_events ce
JOIN sessions s ON s.session_id = s.id
JOIN projects p ON s.project_id = p.id
WHERE p.name = 'LingYi'
ORDER BY ce.detected_at_ms DESC;

2.3 会话和消息分析

-- 查看最近的活动会话
SELECT
    s.id,
    s.agent,
    p.name as project,
    datetime(s.created_at_ms/1000, 'unixepoch') as created,
    COUNT(m.id) as messages
FROM sessions s
JOIN projects p ON s.project_id = p.id
LEFT JOIN messages m ON m.session_id = s.id
GROUP BY s.id
ORDER BY s.created_at_ms DESC
LIMIT 20;

-- 查看推理部分统计
SELECT
    s.agent,
    COUNT(rp.id) as reasoning_parts,
    AVG(rp.duration_ms) as avg_duration
FROM reasoning_parts rp
JOIN sessions s ON rp.session_id = s.id
GROUP BY s.agent
ORDER BY reasoning_parts DESC;

三、Python 接口

3.1 基本查询

import sqlite3
from pathlib import Path

DB_PATH = "database/cognitive_research.db"

def query_database(query, params=None):
    """执行查询"""
    with sqlite3.connect(DB_PATH) as conn:
        cursor = conn.cursor()
        if params:
            cursor.execute(query, params)
        else:
            cursor.execute(query)
        return cursor.fetchall()

# 示例:获取所有严重事件
events = query_database("""
    SELECT id, event_type, title
    FROM cognitive_events
    WHERE severity = 'critical'
""")
for event in events:
    print(event)

3.2 插入认知事件

import sqlite3
import json
from datetime import datetime

def insert_cognitive_event(event_data):
    """插入认知事件"""
    with sqlite3.connect(DB_PATH) as conn:
        cursor = conn.cursor()

        # 插入事件
        cursor.execute("""
            INSERT INTO cognitive_events (
                session_id, event_type, event_subtype, title, description,
                detected_at_ms, severity, confidence, metadata, tags
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            event_data.get('session_id'),
            event_data['event_type'],
            event_data.get('event_subtype', ''),
            event_data['title'],
            event_data.get('description', ''),
            event_data['detected_at_ms'],
            event_data.get('severity', 'info'),
            event_data.get('confidence', 0.0),
            json.dumps(event_data.get('metadata', {})),
            event_data.get('tags', '')
        ))

        conn.commit()
        return cursor.lastrowid

# 示例:插入新事件
event_id = insert_cognitive_event({
    'session_id': 'session_123',
    'event_type': 'hallucination',
    'title': '架构文档虚构',
    'description': 'LingClaude创建了虚构的架构文档',
    'detected_at_ms': int(datetime.now().timestamp() * 1000),
    'severity': 'critical',
    'confidence': 0.95,
    'metadata': {
        'evidence_file': 'LINGAI_STACK_ARCHITECTURE.md',
        'fake_paths': ['path1', 'path2']
    }
})
print(f"事件ID: {event_id}")

四、数据分析

4.1 导出为 CSV

import sqlite3
import csv

def export_to_csv(query, filename):
    """导出查询结果到CSV"""
    with sqlite3.connect(DB_PATH) as conn:
        cursor = conn.cursor()
        cursor.execute(query)

        with open(filename, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            # 写入表头
            writer.writerow([desc[0] for desc in cursor.description])
            # 写入数据
            writer.writerows(cursor.fetchall())

    print(f"✅ 导出到: {filename}")

# 示例:导出所有认知事件
export_to_csv(
    "SELECT * FROM cognitive_events ORDER BY detected_at_ms DESC",
    "database/cognitive_events.csv"
)

4.2 生成统计报告

import sqlite3
import json
from datetime import datetime

def generate_statistics_report():
    """生成统计报告"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    report = {
        'timestamp': datetime.now().isoformat(),
        'projects': {},
        'events': {},
        'agents': {}
    }

    # 项目统计
    cursor.execute("SELECT name, COUNT(DISTINCT s.id) FROM projects p LEFT JOIN sessions s ON s.project_id = p.id GROUP BY p.id")
    for name, count in cursor.fetchall():
        report['projects'][name] = count

    # 事件统计
    cursor.execute("SELECT event_type, COUNT(*) FROM cognitive_events GROUP BY event_type")
    for event_type, count in cursor.fetchall():
        report['events'][event_type] = count

    # Agent 统计
    cursor.execute("SELECT agent, COUNT(*) FROM sessions GROUP BY agent")
    for agent, count in cursor.fetchall():
        report['agents'][agent] = count

    conn.close()

    # 保存报告
    with open("database/statistics_report.json", 'w', encoding='utf-8') as f:
        json.dump(report, f, indent=2, ensure_ascii=False)

    print("✅ 统计报告已生成")
    return report

# 运行
report = generate_statistics_report()
print(json.dumps(report, indent=2, ensure_ascii=False))

五、维护任务

5.1 定期备份(手动)

# 每天运行一次
bash scripts/backup_database.sh

5.2 数据库优化

# 进入数据库
sqlite3 database/cognitive_research.db

# 压缩数据库
VACUUM;

# 更新统计信息
ANALYZE;

# 检查完整性
PRAGMA integrity_check;

# 退出
.quit

5.3 健康检查

# 定期运行健康检查
python3 scripts/health_check.py

# 查看详细报告
cat database/health_report.json

六、故障排查

6.1 数据库锁定

# 检查是否有进程占用数据库
lsof database/cognitive_research.db

# 如果没有结果,说明没有锁定
# 如果有结果,检查进程并决定是否终止

6.2 查询慢

# 分析查询计划
import sqlite3

conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()

# 分析查询计划
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM cognitive_events WHERE event_type = 'hallucination'")
for row in cursor.fetchall():
    print(row)

# 如果没有使用索引,考虑添加索引
conn.close()

6.3 数据恢复

# 从备份恢复
# 1. 停止所有访问数据库的程序
# 2. 找到最近的备份
ls -lt database/backups/

# 3. 解压备份
gunzip -k database/backups/cognitive_research_YYYYMMDD_HHMMSS.db.gz

# 4. 替换当前数据库
mv database/cognitive_research.db database/cognitive_research.db.backup
mv database/backups/cognitive_research_YYYYMMDD_HHMMSS.db database/cognitive_research.db

# 5. 验证
sqlite3 database/cognitive_research.db "PRAGMA integrity_check"

七、最佳实践

7.1 查询优化

# ✅ 使用索引
cursor.execute("""
    SELECT * FROM cognitive_events
    WHERE event_type = ? AND detected_at_ms > ?
""", ('hallucination', timestamp))

# ❌ 避免全表扫描
cursor.execute("SELECT * FROM cognitive_events WHERE title LIKE '%关键词%'")

# ✅ 限制返回数量
cursor.execute("SELECT * FROM messages ORDER BY created_at_ms DESC LIMIT 100")

7.2 事务管理

# ✅ 使用事务
with sqlite3.connect(DB_PATH) as conn:
    conn.execute("BEGIN IMMEDIATE")
    try:
        # 多个操作
        cursor = conn.cursor()
        cursor.execute("INSERT INTO ...")
        cursor.execute("UPDATE ...")
        conn.commit()
    except Exception as e:
        conn.rollback()
        print(f"❌ 操作失败: {e}")

7.3 错误处理

# ✅ 完整的错误处理
def safe_query(query, params=None, retries=3):
    for attempt in range(retries):
        try:
            with sqlite3.connect(DB_PATH) as conn:
                cursor = conn.cursor()
                if params:
                    cursor.execute(query, params)
                else:
                    cursor.execute(query)
                return cursor.fetchall()
        except sqlite3.OperationalError as e:
            if "database is locked" in str(e) and attempt < retries - 1:
                import time
                time.sleep(0.1)
                continue
            raise

八、常用命令速查

# 健康检查
python3 scripts/health_check.py

# 备份数据库
bash scripts/backup_database.sh

# 进入数据库
sqlite3 database/cognitive_research.db

# 查看表结构
.schema cognitive_events

# 查看索引
.indexes

# 导出数据
sqlite3 database/cognitive_research.db ".dump" > backup.sql

# 压缩数据库
sqlite3 database/cognitive_research.db "VACUUM"

# 检查完整性
sqlite3 database/cognitive_research.db "PRAGMA integrity_check"

# 查看大小
ls -lh database/cognitive_research.db

九、相关文档

  • 认知研究数据库策略: docs/COGNITIVE_RESEARCH_DATABASE_STRATEGY.md
  • 增量导入脚本: scripts/incremental_import.py
  • 导入状态文件: database/import_state.json

最后更新: 2026-04-12