571 lines
13 KiB
Markdown
571 lines
13 KiB
Markdown
# API Operations Reference
|
|
|
|
Guide for operating CocoIndex flows programmatically using Python APIs.
|
|
|
|
## Overview
|
|
|
|
CocoIndex flows can be operated through Python APIs, providing programmatic control over setup, updates, and queries. This is useful for embedding flows in applications, automating workflows, or building custom tools.
|
|
|
|
## Basic Setup
|
|
|
|
### Initialization
|
|
|
|
```python
|
|
from dotenv import load_dotenv
|
|
import cocoindex
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Initialize CocoIndex
|
|
cocoindex.init()
|
|
```
|
|
|
|
### Flow Definition
|
|
|
|
```python
|
|
@cocoindex.flow_def(name="MyFlow")
|
|
def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
|
|
# Flow definition
|
|
pass
|
|
```
|
|
|
|
The decorator returns a `cocoindex.Flow` object that can be used for operations.
|
|
|
|
## Flow Operations
|
|
|
|
### Setup Flow
|
|
|
|
Create persistent backends (tables, collections, etc.) for the flow.
|
|
|
|
```python
|
|
# Basic setup
|
|
my_flow.setup()
|
|
|
|
# With progress output
|
|
my_flow.setup(report_to_stdout=True)
|
|
|
|
# Async version
|
|
await my_flow.setup_async(report_to_stdout=True)
|
|
```
|
|
|
|
**When to use:**
|
|
- Before first update
|
|
- After modifying flow structure
|
|
- After dropping flow to recreate resources
|
|
|
|
### Setup All Flows
|
|
|
|
```python
|
|
# Setup all flows at once
|
|
cocoindex.setup_all_flows(report_to_stdout=True)
|
|
```
|
|
|
|
### Drop Flow
|
|
|
|
Remove all persistent backends owned by the flow.
|
|
|
|
```python
|
|
# Drop flow
|
|
my_flow.drop()
|
|
|
|
# With progress output
|
|
my_flow.drop(report_to_stdout=True)
|
|
|
|
# Async version
|
|
await my_flow.drop_async(report_to_stdout=True)
|
|
```
|
|
|
|
**Note:** After dropping, the Flow object is still valid and can be setup again.
|
|
|
|
### Drop All Flows
|
|
|
|
```python
|
|
# Drop all flows
|
|
cocoindex.drop_all_flows(report_to_stdout=True)
|
|
```
|
|
|
|
### Close Flow
|
|
|
|
Remove flow from current process memory (doesn't affect persistent data).
|
|
|
|
```python
|
|
my_flow.close()
|
|
# After this, my_flow is invalid and should not be used
|
|
```
|
|
|
|
## Update Operations
|
|
|
|
### One-Time Update
|
|
|
|
Build or update target data based on current source data.
|
|
|
|
```python
|
|
# Basic update
|
|
stats = my_flow.update()
|
|
print(f"Processed {stats.total_rows} rows")
|
|
|
|
# With reexport (force reprocess even if unchanged)
|
|
stats = my_flow.update(reexport_targets=True)
|
|
|
|
# Async version
|
|
stats = await my_flow.update_async()
|
|
stats = await my_flow.update_async(reexport_targets=True)
|
|
```
|
|
|
|
**Returns:** Statistics about processed data
|
|
|
|
**Note:** Multiple calls to `update()` can run simultaneously. CocoIndex will automatically combine them efficiently.
|
|
|
|
### Live Update
|
|
|
|
Continuously monitor source changes and update targets.
|
|
|
|
```python
|
|
import cocoindex
|
|
|
|
# Create live updater
|
|
updater = cocoindex.FlowLiveUpdater(
|
|
my_flow,
|
|
cocoindex.FlowLiveUpdaterOptions(
|
|
live_mode=True, # Enable live updates
|
|
print_stats=True, # Print progress
|
|
reexport_targets=False # Only reexport on first update if True
|
|
)
|
|
)
|
|
|
|
# Start the updater
|
|
updater.start()
|
|
|
|
# Your application logic here
|
|
# (updater runs in background threads)
|
|
|
|
# Wait for completion
|
|
updater.wait()
|
|
|
|
# Print final stats
|
|
print(updater.update_stats())
|
|
```
|
|
|
|
#### As Context Manager
|
|
|
|
```python
|
|
with cocoindex.FlowLiveUpdater(my_flow) as updater:
|
|
# Updater starts automatically
|
|
# Your application logic here
|
|
pass
|
|
# Updater aborts and waits automatically
|
|
|
|
# Async version
|
|
async with cocoindex.FlowLiveUpdater(my_flow) as updater:
|
|
# Your application logic
|
|
pass
|
|
```
|
|
|
|
#### Monitoring Status Updates
|
|
|
|
```python
|
|
updater = cocoindex.FlowLiveUpdater(my_flow)
|
|
updater.start()
|
|
|
|
while True:
|
|
# Block until next status update
|
|
updates = updater.next_status_updates()
|
|
|
|
# Check which sources were updated
|
|
for source in updates.updated_sources:
|
|
print(f"Source {source} has new data")
|
|
# Trigger downstream operations
|
|
|
|
# Check if updater stopped
|
|
if not updates.active_sources:
|
|
print("All sources stopped")
|
|
break
|
|
|
|
# Async version
|
|
while True:
|
|
updates = await updater.next_status_updates_async()
|
|
# ... same logic
|
|
```
|
|
|
|
#### Control Methods
|
|
|
|
```python
|
|
# Start updater
|
|
updater.start()
|
|
await updater.start_async()
|
|
|
|
# Abort updater
|
|
updater.abort()
|
|
|
|
# Wait for completion
|
|
updater.wait()
|
|
await updater.wait_async()
|
|
|
|
# Get current stats
|
|
stats = updater.update_stats()
|
|
```
|
|
|
|
## Evaluate Flow
|
|
|
|
Run transformations without updating targets (for testing).
|
|
|
|
```python
|
|
# Evaluate and dump results
|
|
my_flow.evaluate_and_dump(
|
|
cocoindex.EvaluateAndDumpOptions(
|
|
output_dir="./eval_output",
|
|
use_cache=True # Use existing cache (but don't update it)
|
|
)
|
|
)
|
|
```
|
|
|
|
**Use cases:**
|
|
- Testing flow logic
|
|
- Debugging transformations
|
|
- Inspecting intermediate data
|
|
|
|
## Query Operations
|
|
|
|
### Transform Flows
|
|
|
|
Transform flows enable reusable transformation logic for both indexing and querying.
|
|
|
|
```python
|
|
from numpy.typing import NDArray
|
|
import numpy as np
|
|
|
|
# Define transform flow
|
|
@cocoindex.transform_flow()
|
|
def text_to_embedding(
|
|
text: cocoindex.DataSlice[str]
|
|
) -> cocoindex.DataSlice[NDArray[np.float32]]:
|
|
"""Convert text to embedding vector."""
|
|
return text.transform(
|
|
cocoindex.functions.SentenceTransformerEmbed(
|
|
model="sentence-transformers/all-MiniLM-L6-v2"
|
|
)
|
|
)
|
|
|
|
# Use in indexing flow
|
|
@cocoindex.flow_def(name="TextEmbedding")
|
|
def text_embedding_flow(flow_builder, data_scope):
|
|
# ... setup source ...
|
|
with data_scope["documents"].row() as doc:
|
|
doc["embedding"] = text_to_embedding(doc["content"])
|
|
# ... collect and export ...
|
|
|
|
# Use for querying (evaluate with input)
|
|
query_embedding = text_to_embedding.eval("search query text")
|
|
# query_embedding is now a numpy array
|
|
```
|
|
|
|
### Query Handlers
|
|
|
|
Attach query logic to flows for easy query execution.
|
|
|
|
```python
|
|
import functools
|
|
from psycopg_pool import ConnectionPool
|
|
from pgvector.psycopg import register_vector
|
|
|
|
@functools.cache
|
|
def connection_pool():
|
|
return ConnectionPool(os.environ["COCOINDEX_DATABASE_URL"])
|
|
|
|
# Register query handler
|
|
@my_flow.query_handler(
|
|
result_fields=cocoindex.QueryHandlerResultFields(
|
|
embedding=["embedding"], # Field name(s) containing embeddings
|
|
score="score" # Field name for similarity score
|
|
)
|
|
)
|
|
def search(query: str) -> cocoindex.QueryOutput:
|
|
"""Search for documents matching query."""
|
|
|
|
# Get table name for this flow's export
|
|
table_name = cocoindex.utils.get_target_default_name(my_flow, "doc_embeddings")
|
|
|
|
# Compute query embedding using transform flow
|
|
query_vector = text_to_embedding.eval(query)
|
|
|
|
# Execute query
|
|
with connection_pool().connection() as conn:
|
|
register_vector(conn)
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"""
|
|
SELECT filename, text, embedding, embedding <=> %s AS distance
|
|
FROM {table_name}
|
|
ORDER BY distance
|
|
LIMIT 10
|
|
""",
|
|
(query_vector,)
|
|
)
|
|
|
|
return cocoindex.QueryOutput(
|
|
query_info=cocoindex.QueryInfo(
|
|
embedding=query_vector,
|
|
similarity_metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY
|
|
),
|
|
results=[
|
|
{
|
|
"filename": row[0],
|
|
"text": row[1],
|
|
"embedding": row[2],
|
|
"score": 1.0 - row[3] # Convert distance to similarity
|
|
}
|
|
for row in cur.fetchall()
|
|
]
|
|
)
|
|
|
|
# Call the query handler
|
|
results = search("machine learning algorithms")
|
|
for result in results.results:
|
|
print(f"[{result['score']:.3f}] {result['filename']}: {result['text']}")
|
|
```
|
|
|
|
### Query with Qdrant
|
|
|
|
```python
|
|
from qdrant_client import QdrantClient
|
|
import functools
|
|
|
|
@functools.cache
|
|
def get_qdrant_client():
|
|
return QdrantClient(url="http://localhost:6334", prefer_grpc=True)
|
|
|
|
@my_flow.query_handler(
|
|
result_fields=cocoindex.QueryHandlerResultFields(
|
|
embedding=["embedding"],
|
|
score="score"
|
|
)
|
|
)
|
|
def search_qdrant(query: str) -> cocoindex.QueryOutput:
|
|
client = get_qdrant_client()
|
|
|
|
# Get query embedding
|
|
query_embedding = text_to_embedding.eval(query)
|
|
|
|
# Search Qdrant
|
|
search_results = client.search(
|
|
collection_name="my_collection",
|
|
query_vector=("text_embedding", query_embedding),
|
|
limit=10
|
|
)
|
|
|
|
return cocoindex.QueryOutput(
|
|
query_info=cocoindex.QueryInfo(
|
|
embedding=query_embedding,
|
|
similarity_metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY
|
|
),
|
|
results=[
|
|
{
|
|
"text": result.payload["text"],
|
|
"embedding": result.vector,
|
|
"score": result.score
|
|
}
|
|
for result in search_results
|
|
]
|
|
)
|
|
```
|
|
|
|
## Application Integration Patterns
|
|
|
|
### Pattern 1: Simple Application with Update
|
|
|
|
```python
|
|
from dotenv import load_dotenv
|
|
import cocoindex
|
|
|
|
# Initialize
|
|
load_dotenv()
|
|
cocoindex.init()
|
|
|
|
# Define flow
|
|
@cocoindex.flow_def(name="MyApp")
|
|
def my_app_flow(flow_builder, data_scope):
|
|
# ... flow definition ...
|
|
pass
|
|
|
|
def main():
|
|
# Ensure flow is set up and data is fresh
|
|
stats = my_app_flow.update()
|
|
print(f"Updated index: {stats}")
|
|
|
|
# Run application logic
|
|
while True:
|
|
query = input("Search: ")
|
|
if not query:
|
|
break
|
|
results = search(query)
|
|
for result in results.results:
|
|
print(f" {result['score']:.3f}: {result['text']}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
```
|
|
|
|
### Pattern 2: Web Application with Live Updates
|
|
|
|
```python
|
|
from fastapi import FastAPI
|
|
import cocoindex
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
cocoindex.init()
|
|
|
|
@cocoindex.flow_def(name="WebAppFlow")
|
|
def web_app_flow(flow_builder, data_scope):
|
|
# ... flow definition ...
|
|
pass
|
|
|
|
# Create FastAPI app
|
|
app = FastAPI()
|
|
|
|
# Global updater
|
|
updater = None
|
|
|
|
@app.on_event("startup")
|
|
async def startup():
|
|
global updater
|
|
# Start live updater in background
|
|
updater = cocoindex.FlowLiveUpdater(
|
|
web_app_flow,
|
|
cocoindex.FlowLiveUpdaterOptions(live_mode=True, print_stats=True)
|
|
)
|
|
await updater.start_async()
|
|
print("Live updater started")
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown():
|
|
global updater
|
|
if updater:
|
|
updater.abort()
|
|
await updater.wait_async()
|
|
print("Live updater stopped")
|
|
|
|
@app.get("/search")
|
|
async def search_endpoint(q: str):
|
|
results = search(q)
|
|
return {
|
|
"query": q,
|
|
"results": results.results
|
|
}
|
|
```
|
|
|
|
### Pattern 3: Batch Processing
|
|
|
|
```python
|
|
import cocoindex
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
cocoindex.init()
|
|
|
|
@cocoindex.flow_def(name="BatchProcessor")
|
|
def batch_flow(flow_builder, data_scope):
|
|
# ... flow definition ...
|
|
pass
|
|
|
|
def process_batch():
|
|
"""Run as scheduled job (cron, etc.)"""
|
|
# Setup if needed (no-op if already set up)
|
|
batch_flow.setup()
|
|
|
|
# Run update
|
|
stats = batch_flow.update()
|
|
|
|
# Log results
|
|
print(f"Batch completed: {stats.total_rows} rows processed")
|
|
|
|
return stats
|
|
|
|
if __name__ == "__main__":
|
|
process_batch()
|
|
```
|
|
|
|
### Pattern 4: React to Updates
|
|
|
|
```python
|
|
import cocoindex
|
|
|
|
@cocoindex.flow_def(name="ReactiveFlow")
|
|
def reactive_flow(flow_builder, data_scope):
|
|
# ... flow definition ...
|
|
pass
|
|
|
|
async def run_with_reactions():
|
|
"""Monitor updates and trigger downstream actions."""
|
|
async with cocoindex.FlowLiveUpdater(reactive_flow) as updater:
|
|
while True:
|
|
updates = await updater.next_status_updates_async()
|
|
|
|
# React to specific source updates
|
|
if "products" in updates.updated_sources:
|
|
await rebuild_product_index()
|
|
|
|
if "customers" in updates.updated_sources:
|
|
await refresh_customer_cache()
|
|
|
|
# Exit when updater stops
|
|
if not updates.active_sources:
|
|
break
|
|
|
|
async def rebuild_product_index():
|
|
print("Rebuilding product index...")
|
|
# Custom logic
|
|
|
|
async def refresh_customer_cache():
|
|
print("Refreshing customer cache...")
|
|
# Custom logic
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
### Handling Update Errors
|
|
|
|
```python
|
|
try:
|
|
stats = my_flow.update()
|
|
except cocoindex.CocoIndexError as e:
|
|
print(f"Update failed: {e}")
|
|
# Handle error (log, retry, alert, etc.)
|
|
```
|
|
|
|
### Graceful Shutdown
|
|
|
|
```python
|
|
import signal
|
|
|
|
updater = None
|
|
|
|
def signal_handler(sig, frame):
|
|
print("Shutting down gracefully...")
|
|
if updater:
|
|
updater.abort()
|
|
updater.wait()
|
|
print("Shutdown complete")
|
|
exit(0)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
updater = cocoindex.FlowLiveUpdater(my_flow)
|
|
updater.start()
|
|
updater.wait()
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
1. **Always call cocoindex.init()** - Initialize before using any CocoIndex APIs
|
|
2. **Load environment variables** - Use dotenv or similar to load configuration
|
|
3. **Use context managers** - For live updaters to ensure cleanup
|
|
4. **Cache expensive resources** - Use `@functools.cache` for database pools, clients
|
|
5. **Handle signals** - Gracefully shutdown live updaters on SIGINT/SIGTERM
|
|
6. **Separate concerns** - Keep flow definitions, queries, and application logic separate
|
|
7. **Use transform flows** - Share logic between indexing and querying
|
|
8. **Monitor update stats** - Log and track processing statistics
|
|
9. **Test with evaluate** - Use evaluate_and_dump for testing before updates
|