跳转至

找书查书功能 - 实现指南

⚠️ **归档文档 — 数据已过时** 本报告为历史快照存档。当前版本 **v1.3.0-dev**,232 测试通过。 👉 最新工程状态请参阅 **[ENGINEERING_ALIGNMENT.md](ENGINEERING_ALIGNMENT.md)**

实施阶段: 阶段1 - 基础功能 预计时间: 1-2周


📁 文件结构

backend/
├── api/v2/
│   ├── __init__.py
│   ├── books.py              # 书籍API
│   ├── search.py             # 搜索API
│   └── sources.py            # 数据源API
├── models/
│   ├── book.py               # 书籍模型
│   └── source.py             # 数据源模型
├── services/
│   ├── book_search.py        # 搜索服务
│   └── source_manager.py     # 数据源管理
└── schemas/
    ├── book.py               # 书籍Schema
    └── search.py             # 搜索Schema

frontend/
├── src/
│   ├── pages/
│   │   ├── Books.tsx         # 书籍列表
│   │   ├── BookDetail.tsx    # 书籍详情
│   │   └── BookReader.tsx    # 阅读器
│   └── components/
│       ├── SearchBox.tsx     # 搜索框
│       └── BookCard.tsx      # 书籍卡片

🔧 实现步骤

步骤 1: 数据库初始化

# 执行SQL脚本
docker exec -i zhineng-postgres psql -U zhineng -d zhineng_kb < scripts/init_book_search_db.sql

# 验证表创建
docker exec zhineng-postgres psql -U zhineng -d zhineng_kb -c "\dt books"

步骤 2: 创建数据模型

# backend/models/book.py
from sqlalchemy import Column, Integer, String, Text, Boolean, ForeignKey, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import JSONB, VECTOR
from datetime import datetime

from backend.core.database import Base

class Book(Base):
    __tablename__ = "books"

    id = Column(Integer, primary_key=True)
    title = Column(String(500), nullable=False)
    title_alternative = Column(String(500))
    subtitle = Column(String(500))
    author = Column(String(200))
    author_alt = Column(String(200))
    translator = Column(String(200))
    category = Column(String(50))  # 气功/中医/儒家
    dynasty = Column(String(50))
    year = Column(String(50))
    language = Column(String(10), default='zh')

    source_id = Column(Integer, ForeignKey('data_sources.id'))
    source_uid = Column(String(200))
    source_url = Column(String(500))

    description = Column(Text)
    toc = Column(JSONB)
    has_content = Column(Boolean, default=False)
    total_pages = Column(Integer, default=0)
    total_chars = Column(Integer, default=0)
    embedding = Column(VECTOR(1024))

    view_count = Column(Integer, default=0)
    bookmark_count = Column(Integer, default=0)

    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # 关系
    chapters = relationship("BookChapter", back_populates="book", cascade="all, delete-orphan")
    source = relationship("DataSource", back_populates="books")


class BookChapter(Base):
    __tablename__ = "book_chapters"

    id = Column(Integer, primary_key=True)
    book_id = Column(Integer, ForeignKey('books.id', ondelete='CASCADE'))
    chapter_num = Column(Integer, nullable=False)
    title = Column(String(500))
    level = Column(Integer, default=1)
    parent_id = Column(Integer, ForeignKey('book_chapters.id'))

    content = Column(Text)
    char_count = Column(Integer, default=0)
    order_position = Column(Integer, default=0)

    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # 关系
    book = relationship("Book", back_populates="chapters")
    parent = relationship("BookChapter", remote_side=[id])

步骤 3: 创建搜索服务

# backend/services/book_search.py
from typing import List, Optional
import asyncpg
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, or_, and_
from sqlalchemy.sql import text

from backend.models.book import Book, BookChapter
from backend.services.retrieval.vector import VectorRetriever

class BookSearchService:
    """书籍搜索服务"""

    def __init__(self, db_session: AsyncSession, db_pool: asyncpg.Pool):
        self.db = db_session
        self.pool = db_pool

    async def search_metadata(
        self,
        query: str,
        category: Optional[str] = None,
        dynasty: Optional[str] = None,
        author: Optional[str] = None,
        page: int = 1,
        size: int = 20
    ) -> dict:
        """元数据搜索(标题、作者、描述)"""

        # 构建查询
        stmt = select(Book).where(Book.has_content == True)

        # 全文搜索
        if query:
            search_condition = text(
                "textsearchable_index_col @@ to_tsquery('chinese', :query)"
            )
            stmt = stmt.where(search_condition).order_by(
                text("ts_rank(textsearchable_index_col, to_tsquery('chinese', :query)) DESC")
            ).params(query=query)

        # 筛选条件
        if category:
            stmt = stmt.where(Book.category == category)
        if dynasty:
            stmt = stmt.where(Book.dynasty == dynasty)
        if author:
            stmt = stmt.where(Book.author.ilike(f"%{author}%"))

        # 分页
        stmt = stmt.limit(size).offset((page - 1) * size)

        result = await self.db.execute(stmt)
        books = result.scalars().all()

        # 获取总数
        count_stmt = select(func.count(Book.id))
        if query:
            count_stmt = count_stmt.where(search_condition).params(query=query)
        total_result = await self.db.execute(count_stmt)
        total = total_result.scalar()

        return {
            "total": total,
            "page": page,
            "size": size,
            "results": [self._book_to_dict(book) for book in books]
        }

    async def search_content(
        self,
        query: str,
        category: Optional[str] = None,
        page: int = 1,
        size: int = 20
    ) -> dict:
        """全文内容搜索"""

        stmt = select(BookChapter).join(Book).where(
            BookChapter.content.ilike(f"%{query}%")
        )

        if category:
            stmt = stmt.where(Book.category == category)

        stmt = stmt.limit(size).offset((page - 1) * size)

        result = await self.db.execute(stmt)
        chapters = result.scalars().all()

        return {
            "total": len(chapters),
            "page": page,
            "size": size,
            "results": [self._chapter_to_dict(ch) for ch in chapters]
        }

    async def search_similar(
        self,
        book_id: int,
        top_k: int = 10
    ) -> List[dict]:
        """基于向量的相似书籍推荐"""

        async with VectorRetriever(self.pool) as retriever:
            # 获取目标书籍的向量
            book = await self.db.get(Book, book_id)
            if not book or not book.embedding:
                return []

            # 向量搜索
            results = await retriever.search_by_vector(
                book.embedding,
                top_k=top_k + 1  # +1 因为会包含自己
            )

            # 过滤掉自己
            return [r for r in results if r['id'] != book_id][:top_k]

    def _book_to_dict(self, book: Book) -> dict:
        return {
            "id": book.id,
            "title": book.title,
            "author": book.author,
            "category": book.category,
            "dynasty": book.dynasty,
            "description": book.description,
            "has_content": book.has_content,
            "view_count": book.view_count
        }

    def _chapter_to_dict(self, chapter: BookChapter) -> dict:
        return {
            "id": chapter.id,
            "book_id": chapter.book_id,
            "book_title": chapter.book.title,
            "title": chapter.title,
            "content": chapter.content[:200] + "...",  # 预览
        }

步骤 4: 创建API端点

# backend/api/v2/books.py
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession

from backend.core.database import get_db
from backend.services.book_search import BookSearchService
from backend.schemas.book import BookResponse, BookListResponse

router = APIRouter(prefix="/books", tags=["books"])

@router.get("/search", response_model=BookListResponse)
async def search_books(
    q: str = Query("", max_length=200),
    category: str = Query(None),
    dynasty: str = Query(None),
    author: str = Query(None),
    page: int = Query(1, ge=1),
    size: int = Query(20, ge=1, le=100),
    db: AsyncSession = Depends(get_db)
):
    """搜索书籍"""
    service = BookSearchService(db, get_db_pool())
    return await service.search_metadata(q, category, dynasty, author, page, size)

@router.get("/{book_id}", response_model=BookResponse)
async def get_book(
    book_id: int,
    db: AsyncSession = Depends(get_db)
):
    """获取书籍详情"""
    book = await db.get(Book, book_id)
    if not book:
        raise HTTPException(status_code=404, detail="Book not found")

    # 增加浏览计数
    book.view_count += 1
    await db.commit()

    return book

@router.get("/{book_id}/related")
async def get_related_books(
    book_id: int,
    top_k: int = Query(10, ge=1, le=50),
    db: AsyncSession = Depends(get_db)
):
    """获取相关书籍"""
    service = BookSearchService(db, get_db_pool())
    return await service.search_similar(book_id, top_k)

步骤 5: 前端实现

// frontend/src/pages/Books.tsx
import React, { useState } from 'react';
import { SearchBox } from '../components/SearchBox';
import { BookCard } from '../components/BookCard';
import { searchBooks } from '../api/books';

export const Books: React.FC = () => {
  const [books, setBooks] = useState([]);
  const [loading, setLoading] = useState(false);
  const [filters, setFilters] = useState({
    category: '',
    dynasty: '',
    author: ''
  });

  const handleSearch = async (query: string) => {
    setLoading(true);
    try {
      const results = await searchBooks({
        q: query,
        ...filters,
        page: 1,
        size: 20
      });
      setBooks(results.data);
    } catch (error) {
      console.error('Search failed:', error);
    } finally {
      setLoading(false);
    }
  };

  return (
    <div className="books-page">
      <SearchBox
        placeholder="搜索书名、作者、关键词..."
        onSearch={handleSearch}
        filters={filters}
        onFilterChange={setFilters}
      />

      <div className="books-grid">
        {books.map(book => (
          <BookCard
            key={book.id}
            book={book}
            onClick={() => window.location.href = `/books/${book.id}`}
          />
        ))}
      </div>

      {loading && <div className="loading">搜索中...</div>}
    </div>
  );
};

📝 TODO 任务列表

后端任务

  • [ ] 创建数据模型(models/book.py, models/source.py
  • [ ] 实现搜索服务(services/book_search.py
  • [ ] 创建API路由(api/v2/books.py
  • [ ] 添加Pydantic schemas(schemas/book.py
  • [ ] 编写单元测试
  • [ ] 添加API文档

前端任务

  • [ ] 创建搜索页面(pages/Books.tsx
  • [ ] 创建书籍详情页(pages/BookDetail.tsx
  • [ ] 创建搜索框组件(components/SearchBox.tsx
  • [ ] 创建书籍卡片组件(components/BookCard.tsx
  • [ ] 添加筛选器UI
  • [ ] 实现无限滚动分页

数据任务

  • [ ] 导入现有教材数据
  • [ ] 配置Elasticsearch索引
  • [ ] 生成书籍向量嵌入
  • [ ] 创建示例数据

🧪 测试

API测试

# 搜索书籍
curl "http://localhost:8000/api/v2/books/search?q=周易&category=儒家"

# 获取书籍详情
curl "http://localhost:8000/api/v2/books/1"

# 获取相关书籍
curl "http://localhost:8000/api/v2/books/1/related?top_k=10"

性能测试

# 使用 ab (Apache Bench)
ab -n 1000 -c 10 "http://localhost:8000/api/v2/books/search?q=周易"

# 目标: >100 req/s

📊 验收标准

功能验收

  • [ ] 可以按标题、作者搜索书籍
  • [ ] 可以按分类、朝代筛选
  • [ ] 全文搜索返回正确的高亮结果
  • [ ] 相关推荐书籍有语义相关性
  • [ ] 前端UI响应式设计

性能验收

  • [ ] 搜索响应时间 < 200ms
  • [ ] 支持100+并发请求
  • [ ] 全文搜索响应时间 < 500ms

下一步: 开始实施步骤1,创建数据模型和数据库表