CatsuCatsu Docs

Async Usage

Maximize performance with asynchronous embedding generation

Asynchronous methods enable parallel processing and significantly better performance for multiple requests.

When to Use Async

Use aembed() when:

  • Processing multiple independent requests
  • Building async applications (FastAPI, async frameworks)
  • Need maximum throughput
  • Working with large datasets

Use embed() when:

  • Single requests
  • Synchronous/script-based workflows
  • Simpler code is preferred

Basic Async

import asyncio
import catsu

async def main():
    client = catsu.Client()

    response = await client.aembed(
        model="voyage-3",
        input="Async embedding"
    )

    print(response.embeddings)

asyncio.run(main())

Parallel Requests with asyncio.gather()

Process multiple requests concurrently:

import asyncio

async def main():
    client = catsu.Client()

    # All requests run in parallel
    responses = await asyncio.gather(
        client.aembed(model="voyage-3", input="Text 1"),
        client.aembed(model="voyage-3", input="Text 2"),
        client.aembed(model="voyage-3", input="Text 3"),
    )

    for i, response in enumerate(responses):
        print(f"Response {i+1}: {len(response.embeddings[0])} dimensions")

asyncio.run(main())

Performance Comparison

import asyncio
import time

texts = ["Sample text"] * 10

# Synchronous (sequential)
def sync_process():
    client = catsu.Client()
    start = time.time()

    for text in texts:
        response = client.embed(model="voyage-3", input=text)

    return time.time() - start

# Asynchronous (parallel)
async def async_process():
    client = catsu.Client()
    start = time.time()

    tasks = [client.aembed(model="voyage-3", input=text) for text in texts]
    await asyncio.gather(*tasks)

    return time.time() - start

sync_time = sync_process()
async_time = asyncio.run(async_process())

print(f"Sync: {sync_time:.2f}s")
print(f"Async: {async_time:.2f}s")
print(f"Speedup: {sync_time / async_time:.1f}x")

Combining Async with Batching

Maximum efficiency with both async and batching:

import asyncio

async def process_large_dataset(texts, batch_size=50):
    client = catsu.Client()

    # Split into batches
    batches = [texts[i:i + batch_size] for i in range(0, len(texts), batch_size)]

    # Process all batches in parallel
    responses = await asyncio.gather(*[
        client.aembed(model="voyage-3", input=batch)
        for batch in batches
    ])

    # Flatten results
    all_embeddings = []
    for response in responses:
        all_embeddings.extend(response.embeddings)

    return all_embeddings

# Process 1000 texts efficiently
embeddings = asyncio.run(process_large_dataset(texts, batch_size=50))

FastAPI Integration

from fastapi import FastAPI
import catsu

app = FastAPI()
client = catsu.Client()

@app.post("/embed")
async def create_embedding(text: str):
    response = await client.aembed(
        model="voyage-3",
        input=text
    )

    return {
        "embedding": response.embeddings[0],
        "cost": response.usage.cost
    }

@app.post("/embed-batch")
async def create_embeddings(texts: list[str]):
    response = await client.aembed(
        model="voyage-3",
        input=texts
    )

    return {
        "embeddings": response.embeddings,
        "total_cost": response.usage.cost
    }

Error Handling with Async

import asyncio
from catsu.exceptions import CatsuError

async def safe_aembed(client, text):
    try:
        return await client.aembed(model="voyage-3", input=text)
    except CatsuError as e:
        print(f"Error: {e}")
        return None

async def main():
    client = catsu.Client()

    # Process with error handling
    results = await asyncio.gather(
        safe_aembed(client, "Text 1"),
        safe_aembed(client, "Text 2"),
        safe_aembed(client, "Invalid text..."),
        return_exceptions=False
    )

    successful = [r for r in results if r is not None]
    print(f"Successful: {len(successful)}/{len(results)}")

asyncio.run(main())

Context Managers with Async

async def main():
    async with catsu.Client() as client:
        response = await client.aembed(
            model="voyage-3",
            input="Async with automatic cleanup"
        )
        print(response.embeddings)

asyncio.run(main())

Best Practices

  • Use asyncio.gather() for parallel requests
  • Combine async with batching for maximum efficiency
  • Set appropriate timeouts for async operations
  • Use async context managers for cleanup
  • Handle exceptions gracefully in async code
  • Consider rate limits when running many parallel requests

Next Steps

On this page