API Reference¶
Complete reference for the AsyncMultiCollectionQueryClient and related functions.
AsyncMultiCollectionQueryClient¶
Async client for querying multi-collection ChromaDB indexes with model-based filtering.
Constructor¶
AsyncMultiCollectionQueryClient(
config_path: Path,
*,
client_type: str = "http",
http_host: Optional[str] = None,
http_port: Optional[int] = None,
http_ssl: bool = False,
http_headers: Optional[Mapping[str, str]] = None,
cloud_api_key: Optional[str] = None,
cloud_tenant: Optional[str] = None,
cloud_database: Optional[str] = None,
embedding_function: Optional[EmbeddingFunction] = None,
)
Parameters:
- config_path (Path): Path to query config JSON file generated by
generate-query-config - client_type (str): ChromaDB client type -
"http"for self-hosted or"cloud"for ChromaDB Cloud - http_host (str, optional): Hostname for HTTP client (e.g.,
"localhost:8000") - http_port (int, optional): Port for HTTP client (default: 8000)
- http_ssl (bool): Whether to use SSL for HTTP client (default: False)
- http_headers (Mapping[str, str], optional): Custom headers for HTTP client
- cloud_api_key (str, optional): API key for ChromaDB Cloud
- cloud_tenant (str, optional): Tenant ID for ChromaDB Cloud
- cloud_database (str, optional): Database name for ChromaDB Cloud
- embedding_function (EmbeddingFunction, optional): Custom embedding function
Usage:
# HTTP client (self-hosted)
client = AsyncMultiCollectionQueryClient(
config_path=Path("query_config.json"),
client_type="http",
http_host="localhost:8000",
)
# Cloud client
client = AsyncMultiCollectionQueryClient(
config_path=Path("query_config.json"),
client_type="cloud",
cloud_api_key="your-api-key",
)
Methods¶
query()¶
Query across multiple collections with automatic fan-out and result merging.
async def query(
*,
query_embeddings: Optional[Sequence[Sequence[float]]] = None,
query_texts: Optional[Sequence[str]] = None,
n_results: int = 10,
where: Optional[Mapping[str, Any]] = None,
where_document: Optional[WhereDocument] = None,
include: Optional[Include] = None,
models: Optional[Sequence[str]] = None,
) -> QueryResult
Parameters:
- query_embeddings (Sequence[Sequence[float]], optional): Query embeddings (provide either this or query_texts)
- query_texts (Sequence[str], optional): Query texts to embed (provide either this or query_embeddings)
- n_results (int): Maximum number of results to return (default: 10)
- where (Mapping[str, Any], optional): ChromaDB metadata filter
- where_document (WhereDocument, optional): ChromaDB document content filter
- include (Include, optional): Fields to include in results (default: ["metadatas", "documents", "distances"])
- models (Sequence[str], optional): Model names to query. If None, queries all collections
Returns:
- QueryResult: Dictionary containing:
ids: List of document IDs per querydistances: List of distance scores per querydocuments: List of document texts per querymetadatas: List of metadata dicts per queryembeddings: Optional embeddings if included
Example:
# Query with texts
results = await client.query(
query_texts=["SAP authorization"],
n_results=10,
models=["Table", "Field"],
)
# Query with embeddings
results = await client.query(
query_embeddings=[[0.1, 0.2, 0.3, ...]],
n_results=10,
models=None,
)
# Query with filters
results = await client.query(
query_texts=["customer data"],
n_results=10,
where={"has_sem": True, "schema_version": {"$gte": 2}},
models=["Table"],
)
get()¶
Retrieve documents by ID or filter from multiple collections.
async def get(
*,
ids: Optional[Sequence[str]] = None,
where: Optional[Mapping[str, Any]] = None,
where_document: Optional[WhereDocument] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
include: Optional[Include] = None,
models: Optional[Sequence[str]] = None,
) -> Dict[str, Any]
Parameters:
- ids (Sequence[str], optional): Document IDs to retrieve
- where (Mapping[str, Any], optional): Metadata filter
- where_document (WhereDocument, optional): Document content filter
- limit (int, optional): Maximum number of documents to return
- offset (int, optional): Number of documents to skip
- include (Include, optional): Fields to include (default: ["metadatas", "documents"])
- models (Sequence[str], optional): Model names to query
Returns:
- Dictionary containing ids, documents, metadatas, etc.
Example:
# Get by IDs
docs = await client.get(
ids=["doc_123", "doc_456"],
models=["Table"],
)
# Get with filter
docs = await client.get(
where={"has_sem": True},
limit=100,
models=None,
)
count()¶
Count total documents across collections.
async def count(
*,
where: Optional[Mapping[str, Any]] = None,
models: Optional[Sequence[str]] = None,
) -> int
Parameters:
- where (Mapping[str, Any], optional): Metadata filter
- models (Sequence[str], optional): Model names to count
Returns:
- Total document count across queried collections
Example:
# Count all documents
total = await client.count(models=None)
# Count specific model
table_count = await client.count(models=["Table"])
# Count with filter
sem_count = await client.count(
where={"has_sem": True},
models=None,
)
connect() / close()¶
Manually manage connection lifecycle (typically use context manager instead).
async def connect() -> None
async def close() -> None
Example:
# Manual lifecycle management
client = AsyncMultiCollectionQueryClient(config_path=Path("query_config.json"))
await client.connect()
try:
results = await client.query(query_texts=["search"], n_results=10)
finally:
await client.close()
# Recommended: use context manager
async with AsyncMultiCollectionQueryClient(config_path=Path("query_config.json")) as client:
results = await client.query(query_texts=["search"], n_results=10)
Query Config Functions¶
generate_query_config()¶
Generate query configuration from indexed partitions.
def generate_query_config(
partition_out_dir: Path,
*,
output_path: Optional[Path] = None,
collection_prefix: Optional[str] = None,
) -> Dict[str, Any]
Parameters:
- partition_out_dir (Path): Directory containing partition subdirectories with resume state files
- output_path (Path, optional): Path to write query config JSON (if not provided, returns dict only)
- collection_prefix (str, optional): Optional prefix added to collection names
Returns:
- Query configuration dictionary
Example:
from indexer.vectorize_lib import generate_query_config
from pathlib import Path
config = generate_query_config(
partition_out_dir=Path("build/vector"),
output_path=Path("query_config.json"),
collection_prefix="ecc-prod",
)
print(f"Generated config with {config['metadata']['total_models']} models")
load_query_config()¶
Load and validate query configuration from JSON file.
def load_query_config(config_path: Path) -> Dict[str, Any]
Parameters:
- config_path (Path): Path to query config JSON file
Returns:
- Query configuration dictionary
Raises:
- ValueError: If file doesn't exist, is invalid JSON, or missing required keys
Example:
from indexer.vectorize_lib import load_query_config
from pathlib import Path
config = load_query_config(Path("query_config.json"))
print(f"Loaded config with {len(config['model_to_collections'])} models")
get_collections_for_models()¶
Get list of collections to query for given model names.
def get_collections_for_models(
query_config: Dict[str, Any],
model_names: Optional[List[str]] = None,
) -> List[str]
Parameters:
- query_config (Dict[str, Any]): Query configuration dictionary
- model_names (List[str], optional): Model names to query. If None, returns all collections
Returns:
- List of collection names
Example:
from indexer.vectorize_lib import load_query_config, get_collections_for_models
from pathlib import Path
config = load_query_config(Path("query_config.json"))
# Get collections for specific models
collections = get_collections_for_models(config, ["Table", "Field"])
print(f"Collections: {collections}") # ["partition_00001", "partition_00002", ...]
# Get all collections
all_collections = get_collections_for_models(config, None)
print(f"All collections: {all_collections}")
ChromaDB Metadata Filters¶
The where parameter supports ChromaDB's metadata filtering operators:
# Equality
where={"model_name": "Table"}
# Comparison
where={"schema_version": {"$gte": 2}}
# In/Not In
where={"model_name": {"$in": ["Table", "Field"]}}
# Logical operators
where={
"$and": [
{"has_sem": True},
{"schema_version": {"$gte": 2}}
]
}
Supported operators:
$eq: Equal$ne: Not equal$gt: Greater than$gte: Greater than or equal$lt: Less than$lte: Less than or equal$in: In list$nin: Not in list$and: Logical AND$or: Logical OR
See ChromaDB metadata filtering documentation for more details.