跳转至

API速率限制控制 - 完整解决方案

问题: API返回429错误 - 速率限制

解决方案: 请求间隔 + 智能重试 + 并发控制


🎯 三大核心功能

1. 请求间隔(RateLimiter)

控制请求频率,避免触发速率限制。

from lingflow.utils.rate_limiter import RateLimiter, RateLimitConfig

# 配置:每秒2个请求
config = RateLimitConfig(requests_per_second=2.0)
limiter = RateLimiter(config)

# 使用
limiter.acquire()  # 如果太快会自动等待
api_call()  # 执行API请求

效果:

[00:16:38.350] 请求 0
[00:16:38.350] 请求 1
[00:16:38.850] 请求 2  ← 自动等待0.5秒
[00:16:38.850] 请求 3


2. 智能重试(SmartRetry)

处理429等错误,自动重试并使用指数退避。

from lingflow.utils.rate_limiter import SmartRetry, RateLimitConfig

config = RateLimitConfig(
    max_retries=3,                # 最多重试3次
    base_delay=1.0,               # 基础延迟1秒
    exponential_backoff=True,     # 指数退避
    jitter=True                   # 添加随机抖动
)

retry_handler = SmartRetry(config)

def api_call():
    # 可能返回429的API调用
    return requests.get(url)

result = retry_handler.execute(
    api_call,
    on_retry=lambda attempt, error, delay:
        print(f"重试 #{attempt+1}, 等待 {delay:.1f}秒")
)

智能特性: - ✅ 自动识别429错误 - ✅ 指数退避(1s → 2s → 4s → 8s) - ✅ 随机抖动(避免雷群效应) - ✅ 最大延迟限制(不会无限等待)


3. 并发控制(ConcurrencyController)

限制同时运行的请求数量。

from lingflow.utils.rate_limiter import ConcurrencyController

controller = ConcurrencyController(max_concurrent=5)

def worker(task_id):
    controller.acquire()
    try:
        # 执行API请求
        api_call(task_id)
    finally:
        controller.release()

# 启动多个worker
for i in range(20):
    threading.Thread(target=worker, args=(i,)).start()

效果: - 最多5个请求同时运行 - 超过5个的请求等待 - 完成一个,启动一个


🚀 完整解决方案(APIClient)

整合所有功能,一键解决速率限制问题。

from lingflow.utils.rate_limiter import APIClient, RateLimitConfig

# 配置
config = RateLimitConfig(
    requests_per_second=2.0,      # 每秒2个请求
    max_retries=3,                # 最多重试3次
    base_delay=1.0,               # 基础延迟
    exponential_backoff=True,     # 指数退避
)

client = APIClient(
    rate_limit_config=config,
    max_concurrent=5              # 最多5个并发
)

# 使用
def my_api_call(url):
    return requests.get(url)

# 自动控制速率、重试、并发
result = client.request(my_api_call, "https://api.example.com")

自动处理: 1. ✅ 速率限制 - 每秒最多2个请求 2. ✅ 智能重试 - 遇到429自动重试 3. ✅ 并发控制 - 最多5个并发请求


📊 实际案例

案例1: Claude API调用

from lingflow.utils.rate_limiter import APIClient, RateLimitConfig

client = APIClient(RateLimitConfig(
    requests_per_second=1.0,      # Claude限制严格
    max_retries=5,
))

def call_claude_api(prompt):
    import anthropic
    client_api = anthropic.Anthropic(api_key="...")

    message = client_api.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )

    return message

# 使用
result = client.request(call_claude_api, "Hello Claude")

案例2: 批量API请求

import threading

def batch_process(urls):
    config = RateLimitConfig(
        requests_per_second=5.0,
        max_retries=3,
    )
    client = APIClient(config, max_concurrent=10)

    results = []

    def process_url(url):
        try:
            result = client.request(requests.get, url)
            results.append((url, result))
        except Exception as e:
            results.append((url, None))

    threads = []
    for url in urls:
        t = threading.Thread(target=process_url, args=(url,))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    return results

案例3: 异步API调用

async def async_api_calls():
    from lingflow.utils.rate_limiter import APIClient, RateLimitConfig

    client = APIClient(RateLimitConfig(
        requests_per_second=2.0,
        max_retries=3,
    ))

    async def fetch_data(url):
        import aiohttp
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()

    # 异步执行
    result = await client.request_async(
        fetch_data,
        "https://api.example.com/data"
    )

    return result

⚙️ 配置参数详解

RateLimitConfig

参数 默认值 说明
requests_per_second 2.0 每秒请求数
requests_per_minute 100.0 每分钟请求数
max_retries 3 最大重试次数
base_delay 1.0 基础延迟(秒)
max_delay 60.0 最大延迟(秒)
jitter True 添加随机抖动
exponential_backoff True 指数退避

APIClient

参数 默认值 说明
rate_limit_config RateLimitConfig() 速率限制配置
max_concurrent 5 最大并发数

🛡️ 防止429的最佳实践

1. 选择合适的速率限制

# 保守配置(推荐用于生产环境)
config = RateLimitConfig(
    requests_per_second=1.0,      # 每秒1个
    max_retries=5,
)

# 激进配置(仅用于测试)
config = RateLimitConfig(
    requests_per_second=10.0,
    max_retries=1,
)

2. 监控错误率

client = APIClient(config)

# 执行请求...
stats = client.get_stats()

print(f"错误率: {stats['error_rate']:.1%}")
if stats['error_rate'] > 0.1:
    print("⚠️  错误率过高,降低请求速率")

3. 使用指数退避 + 抖动

config = RateLimitConfig(
    exponential_backoff=True,  # 必须
    jitter=True,               # 必须
    base_delay=1.0,
)

4. 限制并发数

# 高并发可能触发速率限制
client = APIClient(config, max_concurrent=3)  # 保守

📈 性能对比

方案 429错误率 吞吐量 复杂度
无控制 30%
仅请求间隔 10%
仅智能重试 5%
完整方案 <1% 中高

🔧 快速开始

安装依赖

# 已包含在LingFlow中
pip install lingflow-core

基础使用

from lingflow.utils.rate_limiter import APIClient, RateLimitConfig

# 1. 创建客户端
client = APIClient(RateLimitConfig(
    requests_per_second=2.0,
    max_retries=3,
))

# 2. 定义API函数
def my_api_call():
    import requests
    return requests.get("https://api.example.com")

# 3. 执行(自动处理速率限制)
result = client.request(my_api_call)

✅ 验证效果

运行演示脚本:

python lingflow/utils/rate_limiter.py

输出示例:

=== 速率限制器示例 ===
[00:16:38.350] 请求 0
[00:16:38.350] 请求 1
[00:16:38.850] 请求 2  ← 自动等待0.5秒
[00:16:38.850] 请求 3

=== 智能重试示例 ===
  API调用 #1
    重试 #1,延迟 0.6秒
  API调用 #2
    重试 #2,延迟 1.0秒
  API调用 #3
✓ 最终结果: 成功!

=== 并发控制示例 ===
Worker 0 开始工作 (活跃: 1)
Worker 1 开始工作 (活跃: 2)  ← 达到并发限制
Worker 0 完成
Worker 2 开始工作 (活跃: 2)

=== 整合API客户端示例 ===
📡 请求 0
📡 请求 1
⚠️  第 1 次重试,错误: 429 Rate Limit Error
📡 请求 2
✓ 响应 0
📡 请求 3
✓ 响应 1

📊 统计:
  成功: 5
  失败: 2
  错误率: 28.6%


📚 相关资源

  • 文件位置: lingflow/utils/rate_limiter.py
  • 演示脚本: 包含在文件末尾的 if __name__ == "__main__"
  • 测试: 运行演示查看效果

不再为429错误烦恼! 🎉