导入阿里云听悟数据到灵知系统
目的: 将从阿里云听悟下载的音频和文字导入到灵知系统
📁 准备数据
目录结构
data/from_tingwu/
├── audio/ # 音频文件
│ ├── recording1.mp3
│ └── recording2.mp3
└── transcripts/ # 转录文字
├── recording1.txt
└── recording2.txt
🔄 导入脚本
方式1: 命令行导入
# scripts/import_from_tingwu.py
import sys
import os
import asyncio
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from backend.services.audio_importer import AudioImporter
from backend.database import get_db
async def import_from_tingwu(
audio_dir: str,
transcript_dir: str,
category: str = "teaching"
):
"""从听悟目录导入数据
Args:
audio_dir: 音频文件目录
transcript_dir: 转录文字目录
category: 分类(teaching/course/practice/meeting)
"""
db = get_db()
# 创建导入器
importer = AudioImporter(db)
# 扫描音频文件
audio_path = Path(audio_dir)
transcript_path = Path(transcript_dir)
audio_files = list(audio_path.glob("*.mp3"))
print(f"找到 {len(audio_files)} 个音频文件")
# 导入每个音频
for audio_file in audio_files:
# 对应的转录文件
transcript_file = transcript_path / f"{audio_file.stem}.txt"
if not transcript_file.exists():
print(f"⚠️ 跳过 {audio_file.name}: 缺少转录文件")
continue
print(f"导入: {audio_file.name}")
# 读取转录文字
with open(transcript_file, 'r', encoding='utf-8') as f:
transcript = f.read()
# 导入数据
result = await importer.import_audio_with_transcript(
audio_file=str(audio_file),
transcript=transcript,
title=audio_file.stem,
category=category
)
print(f" ✓ 音频ID: {result['audio_id']}")
print(f" ✓ 分段数: {result['segments_count']}")
print(f"\n导入完成!")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="导入听悟数据")
parser.add_argument("--audio-dir", required=True, help="音频文件目录")
parser.add_argument("--transcript-dir", required=True, help="转录文字目录")
parser.add_argument("--category", default="teaching", help="分类")
args = parser.parse_args()
asyncio.run(import_from_tingwu(
args.audio_dir,
args.transcript_dir,
args.category
))
方式2: Web界面导入
# backend/api/v1/import.py
from fastapi import APIRouter, UploadFile, BackgroundTasks
from typing import List
router = APIRouter(prefix="/import", tags=["import"])
@router.post("/from-tingwu")
async def import_from_tingwu(
audio_files: List[UploadFile],
transcript_files: List[UploadFile],
category: str = "teaching",
background_tasks: BackgroundTasks = None
):
"""从听悟导入数据
Args:
audio_files: 音频文件列表
transcript_files: 转录文字文件列表
category: 分类
"""
results = []
# 匹配音频和转录文件
file_pairs = []
for audio in audio_files:
# 查找对应的转录文件
stem = Path(audio.filename).stem
transcript = next(
(t for t in transcript_files if Path(t.filename).stem == stem),
None
)
if not transcript:
continue
file_pairs.append((audio, transcript))
print(f"找到 {len(file_pairs)} 对文件")
# 导入每一对文件
for audio, transcript in file_pairs:
# 保存文件
audio_path = await _save_audio_file(audio)
transcript_path = await _save_transcript_file(transcript)
# 读取转录文字
with open(transcript_path, 'r', encoding='utf-8') as f:
transcript_text = f.read()
# 创建导入任务
background_tasks.add_task(
_import_audio_with_transcript,
audio_path,
transcript_text,
audio.filename,
category
)
results.append({
"audio_file": audio.filename,
"transcript_file": transcript.filename,
"status": "queued"
})
return {
"total": len(file_pairs),
"results": results
}
async def _import_audio_with_transcript(
audio_path: str,
transcript: str,
title: str,
category: str
):
"""后台导入任务"""
from backend.services.audio_importer import AudioImporter
from backend.database import get_db
db = get_db()
importer = AudioImporter(db)
result = await importer.import_audio_with_transcript(
audio_file=audio_path,
transcript=transcript,
title=title,
category=category
)
print(f"导入完成: {title} (音频ID: {result['audio_id']})")
🎯 导入服务实现
# backend/services/audio_importer.py
from faster_whisper import WhisperModel
from pathlib import Path
import asyncio
class AudioImporter:
"""音频导入器"""
def __init__(self, db):
self.db = db
self.whisper = None # 如果需要重新转写
async def import_audio_with_transcript(
self,
audio_file: str,
transcript: str,
title: str,
category: str = "teaching"
) -> dict:
"""导入音频和转录文字
Args:
audio_file: 音频文件路径
transcript: 转录文字
title: 标题
category: 分类
Returns:
导入结果
"""
# 1. 获取音频时长
duration = await self._get_audio_duration(audio_file)
# 2. 创建音频记录
audio_record = await self.db.insert(audio_files, {
"file_name": Path(audio_file).name,
"file_path": audio_file,
"title": title,
"category": category,
"duration": duration,
"status": "completed", # 已转录
"transcript": transcript,
"language": "zh",
"upload_ip": "127.0.0.1",
"uploaded_by": "import"
})
# 3. 分段处理转录文字
segments = await self._parse_transcript(
transcript,
audio_record["id"]
)
# 4. 保存分段
for seg in segments:
await self.db.insert(audio_segments, {
"audio_file_id": audio_record["id"],
"text": seg["text"],
"start_time": seg["start_time"],
"end_time": seg["end_time"],
"confidence": 1.0 # 听悟的转录通常很准确
})
# 5. 触发向量化(与文字工程流集成)
await self._vectorize_transcript(
audio_record["id"],
segments
)
return {
"audio_id": audio_record["id"],
"title": title,
"segments_count": len(segments),
"duration": duration
}
async def _get_audio_duration(self, audio_path: str) -> float:
"""获取音频时长"""
from pydub import AudioSegment
audio = AudioSegment.from_mp3(audio_path)
return len(audio) / 1000.0 # 转换为秒
async def _parse_transcript(
self,
transcript: str,
audio_id: int
) -> list:
"""解析转录文字为分段
Args:
transcript: 转录文字
audio_id: 音频ID
Returns:
分段列表
"""
segments = []
# 尝试解析SRT格式
if "-->" in transcript:
segments = self._parse_srt(transcript)
else:
# 纯文本,按段落分割
segments = self._parse_plain_text(transcript)
return segments
def _parse_srt(self, srt_content: str) -> list:
"""解析SRT格式"""
segments = []
lines = srt_content.strip().split("\n\n")
for block in lines:
lines_block = block.split("\n")
if len(lines_block) < 3:
continue
# 解析时间戳
timestamp_line = lines_block[1]
if "-->" not in timestamp_line:
continue
start_str, end_str = timestamp_line.split("-->")
start_time = self._parse_srt_time(start_str.strip())
end_time = self._parse_srt_time(end_str.strip())
# 解析文本
text = "\n".join(lines_block[2:])
segments.append({
"start_time": start_time,
"end_time": end_time,
"text": text.strip()
})
return segments
def _parse_srt_time(self, time_str: str) -> float:
"""解析SRT时间戳
格式: 00:01:23,456
"""
time_str = time_str.replace(",", ".")
parts = time_str.split(":")
hours = int(parts[0])
minutes = int(parts[1])
seconds_parts = parts[2].split(".")
seconds = int(seconds_parts[0])
millis = int(seconds_parts[1]) if len(seconds_parts) > 1 else 0
return hours * 3600 + minutes * 60 + seconds + millis / 1000.0
def _parse_plain_text(self, text: str) -> list:
"""解析纯文本(按段落分割)"""
segments = []
paragraphs = text.split("\n\n")
current_time = 0.0
avg_duration = 5.0 # 假设每段5秒
for para in paragraphs:
para = para.strip()
if not para:
continue
# 估算时间(没有实际时间戳)
start_time = current_time
end_time = current_time + avg_duration
# 按句子细分
sentences = para.split("。")
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
segments.append({
"start_time": start_time,
"end_time": end_time,
"text": sentence + "。"
})
current_time = end_time
end_time = current_time + avg_duration
return segments
async def _vectorize_transcript(self, audio_id: int, segments: list):
"""向量化转录文字(与文字工程流集成)
Args:
audio_id: 音频ID
segments: 分段列表
"""
try:
from backend.services.text_processor import TextProcessor
processor = TextProcessor()
# 为每个分段生成向量
for segment in segments:
embedding = await processor.generate_embedding(segment["text"])
# 更新分段
await self.db.execute(
"""UPDATE audio_segments
SET embedding = $1
WHERE text = $2
AND audio_file_id = $3
""",
embedding,
segment["text"],
audio_id
)
print(f"✓ 音频 {audio_id} 向量化完成")
except Exception as e:
print(f"⚠️ 音频 {audio_id} 向量化失败: {e}")
🚀 使用方法
命令行导入
# 准备数据目录
mkdir -p data/from_tingwu/audio
mkdir -p data/from_tingwu/transcripts
# 将下载的文件放入对应目录
# 运行导入脚本
python scripts/import_from_tingwu.py \
--audio-dir data/from_tingwu/audio \
--transcript-dir data/from_tingwu/transcripts \
--category teaching
Web界面导入
# 1. 启动服务
docker-compose up -d
# 2. 访问导入页面
open http://localhost:8001/import
# 3. 上传音频和转录文件
# 4. 点击"导入"按钮
📋 支持的格式
音频格式
- MP3
- WAV
- M4A
- AAC
转录文字格式
- 纯文本(.txt)
- SRT字幕(.srt)
- 带时间戳的文本
⚠️ 注意事项
- 文件名匹配: 音频和转录文件的文件名(不含扩展名)必须相同
- 编码格式: 转录文字文件必须使用UTF-8编码
- 音频时长: 确保音频文件完整,未损坏
- 分类选择: 根据内容选择合适的分类(teaching/course/practice/meeting)
文档状态: ✅ 完成
下一步: 准备数据并运行导入脚本