API速率限制控制 - 完整解决方案
问题: API返回429错误 - 速率限制
解决方案: 请求间隔 + 智能重试 + 并发控制
🎯 三大核心功能
1. 请求间隔(RateLimiter)
控制请求频率,避免触发速率限制。
from lingflow.utils.rate_limiter import RateLimiter, RateLimitConfig
# 配置:每秒2个请求
config = RateLimitConfig(requests_per_second=2.0)
limiter = RateLimiter(config)
# 使用
limiter.acquire() # 如果太快会自动等待
api_call() # 执行API请求
效果:
2. 智能重试(SmartRetry)
处理429等错误,自动重试并使用指数退避。
from lingflow.utils.rate_limiter import SmartRetry, RateLimitConfig
config = RateLimitConfig(
max_retries=3, # 最多重试3次
base_delay=1.0, # 基础延迟1秒
exponential_backoff=True, # 指数退避
jitter=True # 添加随机抖动
)
retry_handler = SmartRetry(config)
def api_call():
# 可能返回429的API调用
return requests.get(url)
result = retry_handler.execute(
api_call,
on_retry=lambda attempt, error, delay:
print(f"重试 #{attempt+1}, 等待 {delay:.1f}秒")
)
智能特性: - ✅ 自动识别429错误 - ✅ 指数退避(1s → 2s → 4s → 8s) - ✅ 随机抖动(避免雷群效应) - ✅ 最大延迟限制(不会无限等待)
3. 并发控制(ConcurrencyController)
限制同时运行的请求数量。
from lingflow.utils.rate_limiter import ConcurrencyController
controller = ConcurrencyController(max_concurrent=5)
def worker(task_id):
controller.acquire()
try:
# 执行API请求
api_call(task_id)
finally:
controller.release()
# 启动多个worker
for i in range(20):
threading.Thread(target=worker, args=(i,)).start()
效果: - 最多5个请求同时运行 - 超过5个的请求等待 - 完成一个,启动一个
🚀 完整解决方案(APIClient)
整合所有功能,一键解决速率限制问题。
from lingflow.utils.rate_limiter import APIClient, RateLimitConfig
# 配置
config = RateLimitConfig(
requests_per_second=2.0, # 每秒2个请求
max_retries=3, # 最多重试3次
base_delay=1.0, # 基础延迟
exponential_backoff=True, # 指数退避
)
client = APIClient(
rate_limit_config=config,
max_concurrent=5 # 最多5个并发
)
# 使用
def my_api_call(url):
return requests.get(url)
# 自动控制速率、重试、并发
result = client.request(my_api_call, "https://api.example.com")
自动处理: 1. ✅ 速率限制 - 每秒最多2个请求 2. ✅ 智能重试 - 遇到429自动重试 3. ✅ 并发控制 - 最多5个并发请求
📊 实际案例
案例1: Claude API调用
from lingflow.utils.rate_limiter import APIClient, RateLimitConfig
client = APIClient(RateLimitConfig(
requests_per_second=1.0, # Claude限制严格
max_retries=5,
))
def call_claude_api(prompt):
import anthropic
client_api = anthropic.Anthropic(api_key="...")
message = client_api.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return message
# 使用
result = client.request(call_claude_api, "Hello Claude")
案例2: 批量API请求
import threading
def batch_process(urls):
config = RateLimitConfig(
requests_per_second=5.0,
max_retries=3,
)
client = APIClient(config, max_concurrent=10)
results = []
def process_url(url):
try:
result = client.request(requests.get, url)
results.append((url, result))
except Exception as e:
results.append((url, None))
threads = []
for url in urls:
t = threading.Thread(target=process_url, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
return results
案例3: 异步API调用
async def async_api_calls():
from lingflow.utils.rate_limiter import APIClient, RateLimitConfig
client = APIClient(RateLimitConfig(
requests_per_second=2.0,
max_retries=3,
))
async def fetch_data(url):
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# 异步执行
result = await client.request_async(
fetch_data,
"https://api.example.com/data"
)
return result
⚙️ 配置参数详解
RateLimitConfig
| 参数 | 默认值 | 说明 |
|---|---|---|
requests_per_second |
2.0 | 每秒请求数 |
requests_per_minute |
100.0 | 每分钟请求数 |
max_retries |
3 | 最大重试次数 |
base_delay |
1.0 | 基础延迟(秒) |
max_delay |
60.0 | 最大延迟(秒) |
jitter |
True | 添加随机抖动 |
exponential_backoff |
True | 指数退避 |
APIClient
| 参数 | 默认值 | 说明 |
|---|---|---|
rate_limit_config |
RateLimitConfig() | 速率限制配置 |
max_concurrent |
5 | 最大并发数 |
🛡️ 防止429的最佳实践
1. 选择合适的速率限制
# 保守配置(推荐用于生产环境)
config = RateLimitConfig(
requests_per_second=1.0, # 每秒1个
max_retries=5,
)
# 激进配置(仅用于测试)
config = RateLimitConfig(
requests_per_second=10.0,
max_retries=1,
)
2. 监控错误率
client = APIClient(config)
# 执行请求...
stats = client.get_stats()
print(f"错误率: {stats['error_rate']:.1%}")
if stats['error_rate'] > 0.1:
print("⚠️ 错误率过高,降低请求速率")
3. 使用指数退避 + 抖动
4. 限制并发数
📈 性能对比
| 方案 | 429错误率 | 吞吐量 | 复杂度 |
|---|---|---|---|
| 无控制 | 30% | 高 | 低 |
| 仅请求间隔 | 10% | 中 | 低 |
| 仅智能重试 | 5% | 高 | 中 |
| 完整方案 | <1% | 中高 | 中 |
🔧 快速开始
安装依赖
基础使用
from lingflow.utils.rate_limiter import APIClient, RateLimitConfig
# 1. 创建客户端
client = APIClient(RateLimitConfig(
requests_per_second=2.0,
max_retries=3,
))
# 2. 定义API函数
def my_api_call():
import requests
return requests.get("https://api.example.com")
# 3. 执行(自动处理速率限制)
result = client.request(my_api_call)
✅ 验证效果
运行演示脚本:
输出示例:
=== 速率限制器示例 ===
[00:16:38.350] 请求 0
[00:16:38.350] 请求 1
[00:16:38.850] 请求 2 ← 自动等待0.5秒
[00:16:38.850] 请求 3
=== 智能重试示例 ===
API调用 #1
重试 #1,延迟 0.6秒
API调用 #2
重试 #2,延迟 1.0秒
API调用 #3
✓ 最终结果: 成功!
=== 并发控制示例 ===
Worker 0 开始工作 (活跃: 1)
Worker 1 开始工作 (活跃: 2) ← 达到并发限制
Worker 0 完成
Worker 2 开始工作 (活跃: 2)
=== 整合API客户端示例 ===
📡 请求 0
📡 请求 1
⚠️ 第 1 次重试,错误: 429 Rate Limit Error
📡 请求 2
✓ 响应 0
📡 请求 3
✓ 响应 1
📊 统计:
成功: 5
失败: 2
错误率: 28.6%
📚 相关资源
- 文件位置:
lingflow/utils/rate_limiter.py - 演示脚本: 包含在文件末尾的
if __name__ == "__main__" - 测试: 运行演示查看效果
不再为429错误烦恼! 🎉