API Reference

Complete reference for the AsyncMultiCollectionQueryClient and related functions.

AsyncMultiCollectionQueryClient

Async client for querying multi-collection ChromaDB indexes with model-based filtering.

Constructor

AsyncMultiCollectionQueryClient(
    config_path: Path,
    *,
    client_type: str = "http",
    http_host: Optional[str] = None,
    http_port: Optional[int] = None,
    http_ssl: bool = False,
    http_headers: Optional[Mapping[str, str]] = None,
    cloud_api_key: Optional[str] = None,
    cloud_tenant: Optional[str] = None,
    cloud_database: Optional[str] = None,
    embedding_function: Optional[EmbeddingFunction] = None,
)

Parameters:

  • config_path (Path): Path to query config JSON file generated by generate-query-config
  • client_type (str): ChromaDB client type - "http" for self-hosted or "cloud" for ChromaDB Cloud
  • http_host (str, optional): Hostname for HTTP client (e.g., "localhost:8000")
  • http_port (int, optional): Port for HTTP client (default: 8000)
  • http_ssl (bool): Whether to use SSL for HTTP client (default: False)
  • http_headers (Mapping[str, str], optional): Custom headers for HTTP client
  • cloud_api_key (str, optional): API key for ChromaDB Cloud
  • cloud_tenant (str, optional): Tenant ID for ChromaDB Cloud
  • cloud_database (str, optional): Database name for ChromaDB Cloud
  • embedding_function (EmbeddingFunction, optional): Custom embedding function

Usage:

# HTTP client (self-hosted)
client = AsyncMultiCollectionQueryClient(
    config_path=Path("query_config.json"),
    client_type="http",
    http_host="localhost:8000",
)

# Cloud client
client = AsyncMultiCollectionQueryClient(
    config_path=Path("query_config.json"),
    client_type="cloud",
    cloud_api_key="your-api-key",
)

Methods

query()

Query across multiple collections with automatic fan-out and result merging.

async def query(
    *,
    query_embeddings: Optional[Sequence[Sequence[float]]] = None,
    query_texts: Optional[Sequence[str]] = None,
    n_results: int = 10,
    where: Optional[Mapping[str, Any]] = None,
    where_document: Optional[WhereDocument] = None,
    include: Optional[Include] = None,
    models: Optional[Sequence[str]] = None,
) -> QueryResult

Parameters:

  • query_embeddings (Sequence[Sequence[float]], optional): Query embeddings (provide either this or query_texts)
  • query_texts (Sequence[str], optional): Query texts to embed (provide either this or query_embeddings)
  • n_results (int): Maximum number of results to return (default: 10)
  • where (Mapping[str, Any], optional): ChromaDB metadata filter
  • where_document (WhereDocument, optional): ChromaDB document content filter
  • include (Include, optional): Fields to include in results (default: ["metadatas", "documents", "distances"])
  • models (Sequence[str], optional): Model names to query. If None, queries all collections

Returns:

  • QueryResult: Dictionary containing:
  • ids: List of document IDs per query
  • distances: List of distance scores per query
  • documents: List of document texts per query
  • metadatas: List of metadata dicts per query
  • embeddings: Optional embeddings if included

Example:

# Query with texts
results = await client.query(
    query_texts=["SAP authorization"],
    n_results=10,
    models=["Table", "Field"],
)

# Query with embeddings
results = await client.query(
    query_embeddings=[[0.1, 0.2, 0.3, ...]],
    n_results=10,
    models=None,
)

# Query with filters
results = await client.query(
    query_texts=["customer data"],
    n_results=10,
    where={"has_sem": True, "schema_version": {"$gte": 2}},
    models=["Table"],
)

get()

Retrieve documents by ID or filter from multiple collections.

async def get(
    *,
    ids: Optional[Sequence[str]] = None,
    where: Optional[Mapping[str, Any]] = None,
    where_document: Optional[WhereDocument] = None,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    include: Optional[Include] = None,
    models: Optional[Sequence[str]] = None,
) -> Dict[str, Any]

Parameters:

  • ids (Sequence[str], optional): Document IDs to retrieve
  • where (Mapping[str, Any], optional): Metadata filter
  • where_document (WhereDocument, optional): Document content filter
  • limit (int, optional): Maximum number of documents to return
  • offset (int, optional): Number of documents to skip
  • include (Include, optional): Fields to include (default: ["metadatas", "documents"])
  • models (Sequence[str], optional): Model names to query

Returns:

  • Dictionary containing ids, documents, metadatas, etc.

Example:

# Get by IDs
docs = await client.get(
    ids=["doc_123", "doc_456"],
    models=["Table"],
)

# Get with filter
docs = await client.get(
    where={"has_sem": True},
    limit=100,
    models=None,
)

count()

Count total documents across collections.

async def count(
    *,
    where: Optional[Mapping[str, Any]] = None,
    models: Optional[Sequence[str]] = None,
) -> int

Parameters:

  • where (Mapping[str, Any], optional): Metadata filter
  • models (Sequence[str], optional): Model names to count

Returns:

  • Total document count across queried collections

Example:

# Count all documents
total = await client.count(models=None)

# Count specific model
table_count = await client.count(models=["Table"])

# Count with filter
sem_count = await client.count(
    where={"has_sem": True},
    models=None,
)

connect() / close()

Manually manage connection lifecycle (typically use context manager instead).

async def connect() -> None
async def close() -> None

Example:

# Manual lifecycle management
client = AsyncMultiCollectionQueryClient(config_path=Path("query_config.json"))
await client.connect()
try:
    results = await client.query(query_texts=["search"], n_results=10)
finally:
    await client.close()

# Recommended: use context manager
async with AsyncMultiCollectionQueryClient(config_path=Path("query_config.json")) as client:
    results = await client.query(query_texts=["search"], n_results=10)

Query Config Functions

generate_query_config()

Generate query configuration from indexed partitions.

def generate_query_config(
    partition_out_dir: Path,
    *,
    output_path: Optional[Path] = None,
    collection_prefix: Optional[str] = None,
) -> Dict[str, Any]

Parameters:

  • partition_out_dir (Path): Directory containing partition subdirectories with resume state files
  • output_path (Path, optional): Path to write query config JSON (if not provided, returns dict only)
  • collection_prefix (str, optional): Optional prefix added to collection names

Returns:

  • Query configuration dictionary

Example:

from indexer.vectorize_lib import generate_query_config
from pathlib import Path

config = generate_query_config(
    partition_out_dir=Path("build/vector"),
    output_path=Path("query_config.json"),
    collection_prefix="ecc-prod",
)

print(f"Generated config with {config['metadata']['total_models']} models")

load_query_config()

Load and validate query configuration from JSON file.

def load_query_config(config_path: Path) -> Dict[str, Any]

Parameters:

  • config_path (Path): Path to query config JSON file

Returns:

  • Query configuration dictionary

Raises:

  • ValueError: If file doesn't exist, is invalid JSON, or missing required keys

Example:

from indexer.vectorize_lib import load_query_config
from pathlib import Path

config = load_query_config(Path("query_config.json"))
print(f"Loaded config with {len(config['model_to_collections'])} models")

get_collections_for_models()

Get list of collections to query for given model names.

def get_collections_for_models(
    query_config: Dict[str, Any],
    model_names: Optional[List[str]] = None,
) -> List[str]

Parameters:

  • query_config (Dict[str, Any]): Query configuration dictionary
  • model_names (List[str], optional): Model names to query. If None, returns all collections

Returns:

  • List of collection names

Example:

from indexer.vectorize_lib import load_query_config, get_collections_for_models
from pathlib import Path

config = load_query_config(Path("query_config.json"))

# Get collections for specific models
collections = get_collections_for_models(config, ["Table", "Field"])
print(f"Collections: {collections}")  # ["partition_00001", "partition_00002", ...]

# Get all collections
all_collections = get_collections_for_models(config, None)
print(f"All collections: {all_collections}")

ChromaDB Metadata Filters

The where parameter supports ChromaDB's metadata filtering operators:

# Equality
where={"model_name": "Table"}

# Comparison
where={"schema_version": {"$gte": 2}}

# In/Not In
where={"model_name": {"$in": ["Table", "Field"]}}

# Logical operators
where={
    "$and": [
        {"has_sem": True},
        {"schema_version": {"$gte": 2}}
    ]
}

Supported operators:

  • $eq: Equal
  • $ne: Not equal
  • $gt: Greater than
  • $gte: Greater than or equal
  • $lt: Less than
  • $lte: Less than or equal
  • $in: In list
  • $nin: Not in list
  • $and: Logical AND
  • $or: Logical OR

See ChromaDB metadata filtering documentation for more details.