aembed() Method
Generate embeddings asynchronously for better performance
The aembed() method is the asynchronous version of embed(), enabling parallel requests and better performance.
Signature
async def aembed(
self,
*,
model: str,
input: Union[str, List[str]],
provider: Optional[str] = None,
**kwargs
) -> EmbedResponseParameters and return values are identical to embed().
When to Use Async
Use aembed() when:
- Processing multiple independent requests
- Building async applications (FastAPI, async web scrapers, etc.)
- Need better performance and resource utilization
- Working with large batches across multiple API calls
Basic Async Usage
import asyncio
import catsu
async def main():
client = catsu.Client()
response = await client.aembed(
model="voyage-3",
input="Hello, async world!"
)
print(response.embeddings[0][:5])
asyncio.run(main())Parallel Requests with asyncio.gather()
Process multiple requests concurrently:
import asyncio
import catsu
async def main():
client = catsu.Client()
# Process 3 requests in parallel
responses = await asyncio.gather(
client.aembed(model="voyage-3", input="Query 1"),
client.aembed(model="voyage-3", input="Query 2"),
client.aembed(model="voyage-3", input="Query 3"),
)
# Access results
for i, response in enumerate(responses):
print(f"Query {i+1}: {response.usage.cost:.6f} USD")
total_cost = sum(r.usage.cost for r in responses)
print(f"Total cost: ${total_cost:.6f}")
asyncio.run(main())Async Context Manager
import asyncio
import catsu
async def main():
async with catsu.Client() as client:
response = await client.aembed(
model="voyage-3",
input="Text with automatic cleanup"
)
print(response.embeddings)
asyncio.run(main())Performance Comparison
import asyncio
import time
import catsu
# Synchronous (sequential)
def sync_process():
client = catsu.Client()
start = time.time()
for i in range(10):
response = client.embed(model="voyage-3", input=f"Text {i}")
elapsed = time.time() - start
print(f"Sync: {elapsed:.2f}s")
# Asynchronous (parallel)
async def async_process():
client = catsu.Client()
start = time.time()
tasks = [
client.aembed(model="voyage-3", input=f"Text {i}")
for i in range(10)
]
responses = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"Async: {elapsed:.2f}s") # Much faster!
# Run comparison
sync_process()
asyncio.run(async_process())FastAPI Integration
from fastapi import FastAPI
import catsu
app = FastAPI()
client = catsu.Client()
@app.post("/embed")
async def create_embedding(text: str):
response = await client.aembed(
model="voyage-3",
input=text
)
return {
"embedding": response.embeddings[0],
"dimensions": response.dimensions,
"cost": response.usage.cost
}Error Handling with Async
import asyncio
from catsu.exceptions import CatsuError
async def safe_embed(client, text):
try:
return await client.aembed(model="voyage-3", input=text)
except CatsuError as e:
print(f"Error embedding '{text}': {e}")
return None
async def main():
client = catsu.Client()
# Process multiple texts, handling errors gracefully
results = await asyncio.gather(
safe_embed(client, "Text 1"),
safe_embed(client, "Text 2"),
safe_embed(client, "Invalid " * 10000), # Might fail
return_exceptions=False # Don't stop on error
)
successful = [r for r in results if r is not None]
print(f"Successfully processed {len(successful)}/{len(results)} texts")
asyncio.run(main())Best Practices
- Use
asyncio.gather()for parallel requests - Set appropriate
timeoutfor batch operations - Use async context managers for automatic cleanup
- Handle exceptions gracefully with try/except
- Consider rate limits when processing large batches
Next Steps
- Context Managers - Automatic resource cleanup
- Best Practices: Async Usage - Optimize async performance
- Best Practices: Batch Processing - Efficient batch strategies