Rate Limiting
Handle API rate limits effectively
All embedding providers enforce rate limits. Catsu provides automatic retry logic with exponential backoff to handle rate limits gracefully.
Automatic Retry Logic
Catsu automatically retries rate-limited requests:
client = catsu.Client(max_retries=5)
# Automatically retries up to 5 times if rate limited
response = client.embed(model="voyage-3", input="Text")How Retries Work
When a rate limit is hit:
- Catsu receives
RateLimitErrorfrom provider - Extracts
retry_afterfrom response headers - Waits the specified time (or uses exponential backoff)
- Retries the request
- Repeats up to
max_retriestimes
Manual Rate Limit Handling
from catsu.exceptions import RateLimitError
import time
try:
response = client.embed(model="voyage-3", input="Text")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after} seconds")
# Wait and retry
time.sleep(e.retry_after)
response = client.embed(model="voyage-3", input="Text")Rate Limit Best Practices
Configure Appropriate Retries
# For batch processing (more retries)
batch_client = catsu.Client(max_retries=10, timeout=120)
# For interactive use (fewer retries, fail fast)
interactive_client = catsu.Client(max_retries=2, timeout=15)Respect Provider Limits
Different providers have different rate limits:
- Some limit requests per second
- Others limit tokens per minute
- Some have both
Check your provider's documentation for specific limits.
Implement Backoff for Large Jobs
import time
def process_large_dataset(texts, batch_size=50):
results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
try:
response = client.embed(model="voyage-3", input=batch)
results.extend(response.embeddings)
except RateLimitError as e:
print(f"Rate limited at batch {i//batch_size + 1}")
time.sleep(e.retry_after)
# Retry this batch
response = client.embed(model="voyage-3", input=batch)
results.extend(response.embeddings)
# Small delay between batches
time.sleep(0.1)
return resultsUse Async with Rate Limiting
import asyncio
from catsu.exceptions import RateLimitError
async def embed_with_rate_limit(client, text, max_attempts=5):
for attempt in range(max_attempts):
try:
return await client.aembed(model="voyage-3", input=text)
except RateLimitError as e:
if attempt < max_attempts - 1:
await asyncio.sleep(e.retry_after or 2 ** attempt)
else:
raise
async def main():
client = catsu.Client()
# Process with automatic retry
responses = await asyncio.gather(*[
embed_with_rate_limit(client, text)
for text in texts
])
asyncio.run(main())Monitoring Rate Limits
Track rate limit occurrences:
class RateLimitTracker:
def __init__(self):
self.rate_limit_count = 0
self.total_requests = 0
def embed(self, client, model, input):
self.total_requests += 1
try:
return client.embed(model=model, input=input)
except RateLimitError as e:
self.rate_limit_count += 1
print(f"Rate limit {self.rate_limit_count}/{self.total_requests} requests")
raise
def get_rate_limit_percentage(self):
if self.total_requests == 0:
return 0
return (self.rate_limit_count / self.total_requests) * 100
tracker = RateLimitTracker()Strategies for High Volume
Distribute Across Providers
providers = ["voyage-3", "text-embedding-3-small", "embed-v4.0"]
current_provider = 0
def embed_with_failover(text):
global current_provider
for _ in range(len(providers)):
try:
model = providers[current_provider]
return client.embed(model=model, input=text)
except RateLimitError:
# Try next provider
current_provider = (current_provider + 1) % len(providers)
raise Exception("All providers rate limited")Implement Queue-Based Processing
import asyncio
from asyncio import Queue
async def worker(queue, results, client):
while True:
text = await queue.get()
if text is None:
break
try:
response = await client.aembed(model="voyage-3", input=text)
results.append(response)
except RateLimitError as e:
# Put back in queue after delay
await asyncio.sleep(e.retry_after)
await queue.put(text)
queue.task_done()
async def process_with_queue(texts, num_workers=5):
queue = Queue()
results = []
# Add texts to queue
for text in texts:
await queue.put(text)
# Start workers
workers = [
asyncio.create_task(worker(queue, results, catsu.Client()))
for _ in range(num_workers)
]
# Wait for completion
await queue.join()
# Stop workers
for _ in range(num_workers):
await queue.put(None)
return resultsBest Practices
- Configure
max_retriesappropriately for your use case - Respect
retry_afterheaders from providers - Implement exponential backoff for retries
- Monitor rate limit frequency
- Consider distributing load across providers
- Add delays between batches for high-volume processing
- Use async with controlled concurrency
Next Steps
- Batch Processing - Optimize batch sizes for rate limits
- Async Usage - Handle rate limits in async code
- Error Handling - Handle RateLimitError exceptions