CatsuCatsu Docs

Rate Limiting

Handle API rate limits effectively

All embedding providers enforce rate limits. Catsu provides automatic retry logic with exponential backoff to handle rate limits gracefully.

Automatic Retry Logic

Catsu automatically retries rate-limited requests:

client = catsu.Client(max_retries=5)

# Automatically retries up to 5 times if rate limited
response = client.embed(model="voyage-3", input="Text")

How Retries Work

When a rate limit is hit:

  1. Catsu receives RateLimitError from provider
  2. Extracts retry_after from response headers
  3. Waits the specified time (or uses exponential backoff)
  4. Retries the request
  5. Repeats up to max_retries times

Manual Rate Limit Handling

from catsu.exceptions import RateLimitError
import time

try:
    response = client.embed(model="voyage-3", input="Text")
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after} seconds")

    # Wait and retry
    time.sleep(e.retry_after)
    response = client.embed(model="voyage-3", input="Text")

Rate Limit Best Practices

Configure Appropriate Retries

# For batch processing (more retries)
batch_client = catsu.Client(max_retries=10, timeout=120)

# For interactive use (fewer retries, fail fast)
interactive_client = catsu.Client(max_retries=2, timeout=15)

Respect Provider Limits

Different providers have different rate limits:

  • Some limit requests per second
  • Others limit tokens per minute
  • Some have both

Check your provider's documentation for specific limits.

Implement Backoff for Large Jobs

import time

def process_large_dataset(texts, batch_size=50):
    results = []

    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]

        try:
            response = client.embed(model="voyage-3", input=batch)
            results.extend(response.embeddings)

        except RateLimitError as e:
            print(f"Rate limited at batch {i//batch_size + 1}")
            time.sleep(e.retry_after)

            # Retry this batch
            response = client.embed(model="voyage-3", input=batch)
            results.extend(response.embeddings)

        # Small delay between batches
        time.sleep(0.1)

    return results

Use Async with Rate Limiting

import asyncio
from catsu.exceptions import RateLimitError

async def embed_with_rate_limit(client, text, max_attempts=5):
    for attempt in range(max_attempts):
        try:
            return await client.aembed(model="voyage-3", input=text)
        except RateLimitError as e:
            if attempt < max_attempts - 1:
                await asyncio.sleep(e.retry_after or 2 ** attempt)
            else:
                raise

async def main():
    client = catsu.Client()

    # Process with automatic retry
    responses = await asyncio.gather(*[
        embed_with_rate_limit(client, text)
        for text in texts
    ])

asyncio.run(main())

Monitoring Rate Limits

Track rate limit occurrences:

class RateLimitTracker:
    def __init__(self):
        self.rate_limit_count = 0
        self.total_requests = 0

    def embed(self, client, model, input):
        self.total_requests += 1

        try:
            return client.embed(model=model, input=input)
        except RateLimitError as e:
            self.rate_limit_count += 1
            print(f"Rate limit {self.rate_limit_count}/{self.total_requests} requests")
            raise

    def get_rate_limit_percentage(self):
        if self.total_requests == 0:
            return 0
        return (self.rate_limit_count / self.total_requests) * 100

tracker = RateLimitTracker()

Strategies for High Volume

Distribute Across Providers

providers = ["voyage-3", "text-embedding-3-small", "embed-v4.0"]
current_provider = 0

def embed_with_failover(text):
    global current_provider

    for _ in range(len(providers)):
        try:
            model = providers[current_provider]
            return client.embed(model=model, input=text)
        except RateLimitError:
            # Try next provider
            current_provider = (current_provider + 1) % len(providers)

    raise Exception("All providers rate limited")

Implement Queue-Based Processing

import asyncio
from asyncio import Queue

async def worker(queue, results, client):
    while True:
        text = await queue.get()
        if text is None:
            break

        try:
            response = await client.aembed(model="voyage-3", input=text)
            results.append(response)
        except RateLimitError as e:
            # Put back in queue after delay
            await asyncio.sleep(e.retry_after)
            await queue.put(text)

        queue.task_done()

async def process_with_queue(texts, num_workers=5):
    queue = Queue()
    results = []

    # Add texts to queue
    for text in texts:
        await queue.put(text)

    # Start workers
    workers = [
        asyncio.create_task(worker(queue, results, catsu.Client()))
        for _ in range(num_workers)
    ]

    # Wait for completion
    await queue.join()

    # Stop workers
    for _ in range(num_workers):
        await queue.put(None)

    return results

Best Practices

  • Configure max_retries appropriately for your use case
  • Respect retry_after headers from providers
  • Implement exponential backoff for retries
  • Monitor rate limit frequency
  • Consider distributing load across providers
  • Add delays between batches for high-volume processing
  • Use async with controlled concurrency

Next Steps

On this page