railtracks.retrieval
Retrieval subsystem.
The runtime (RetrievalRuntime) orchestrates loading, chunking,
embedding, storage, and retrieval. The ~railtracks.retrieval.stores.Store
protocol is the storage contract; ~railtracks.retrieval.stores.VectorStore
is the canonical implementation. The rest of the pipeline is provided
by railtracks.retrieval.loaders, railtracks.retrieval.chunking,
and railtracks.retrieval.embedding.
1"""Retrieval subsystem. 2 3The runtime (:class:`RetrievalRuntime`) orchestrates loading, chunking, 4embedding, storage, and retrieval. The :class:`~railtracks.retrieval.stores.Store` 5protocol is the storage contract; :class:`~railtracks.retrieval.stores.VectorStore` 6is the canonical implementation. The rest of the pipeline is provided 7by :mod:`railtracks.retrieval.loaders`, :mod:`railtracks.retrieval.chunking`, 8and :mod:`railtracks.retrieval.embedding`. 9""" 10 11from .embedding.models import EmbeddingFailure 12from .errors import EmbeddingModelMismatchError 13from .models import ( 14 Chunk, 15 Document, 16 DocumentType, 17 EmbeddedChunk, 18 RetrievalResult, 19 RetrievedChunk, 20) 21from .runtime import ( 22 BatchIngested, 23 DocumentFailed, 24 DocumentSkipped, 25 IngestionStats, 26 RetrievalRuntime, 27) 28from .stores import Store, StoreEntry, StoreQuery, StoreScope, VectorStore 29 30__all__ = [ 31 "BatchIngested", 32 "Chunk", 33 "Document", 34 "DocumentFailed", 35 "DocumentSkipped", 36 "DocumentType", 37 "EmbeddedChunk", 38 "EmbeddingFailure", 39 "EmbeddingModelMismatchError", 40 "IngestionStats", 41 "RetrievalResult", 42 "RetrievalRuntime", 43 "RetrievedChunk", 44 "Store", 45 "StoreEntry", 46 "StoreQuery", 47 "StoreScope", 48 "VectorStore", 49]
25@dataclass 26class BatchIngested: 27 """A batch of chunks that finished embedding and was written to the store. 28 29 ``batch_index`` is **per-document**: it starts at 0 for each document and 30 counts that document's batches (both successful and failed) in order. It 31 is not a run-global counter — to track overall progress, count events or 32 read ``IngestionStats``. 33 34 ``metrics`` carries the per-batch usage and timing reported by the 35 embedder (tokens, dollar cost, latency, vector count). Use it to track 36 per-ingest cost without having to wrap the embedder. 37 """ 38 39 document_id: UUID 40 embedded_chunks: list[EmbeddedChunk] 41 batch_index: int 42 metrics: EmbeddingMetrics | None = None
A batch of chunks that finished embedding and was written to the store.
batch_index is per-document: it starts at 0 for each document and
counts that document's batches (both successful and failed) in order. It
is not a run-global counter — to track overall progress, count events or
read IngestionStats.
metrics carries the per-batch usage and timing reported by the
embedder (tokens, dollar cost, latency, vector count). Use it to track
per-ingest cost without having to wrap the embedder.
73@dataclass 74class Chunk: 75 content: str 76 document_id: UUID 77 id: UUID = field(default_factory=uuid4) 78 index: int = 0 79 parent_chunk_id: UUID | None = None 80 offsets: tuple[int, int] | None = None 81 metadata: dict[str, Any] = field(default_factory=dict)
23@dataclass 24class Document: 25 """A unit of source content produced by a loader. 26 27 Attributes: 28 content: The decoded textual content of the document. Always a string; 29 binary loaders are responsible for their own decoding. 30 type: A :class:`DocumentType` describing the content format. Cloud 31 loaders infer it from the object's file extension; structured 32 loaders (CSV, SQL) set it explicitly. 33 id: Unique identifier. If not provided and ``source`` is set, derived 34 deterministically from ``source`` via UUID5 (RFC 4122 URL 35 namespace) so the same source yields the same id across processes 36 — required for the runtime's upsert (``delete_where`` on 37 ``document_id``) to find and clear the prior version when content 38 changes. Sourceless documents get a random UUID4 (no stable 39 identity → no upsert semantics). 40 source: The natural identifier of where this document came from — 41 a URI (``s3://bucket/key``, ``gs://bucket/name``, ``https://...``), 42 a file path, or a relational id. Cloud loaders always set this; 43 user-constructed documents may leave it ``None``. 44 45 Writers that derive a storage key (when no ``key_fn`` is supplied) 46 look here first; the cloud writers also strip their own URI prefix 47 so that "load from S3, write back to S3" produces a clean key 48 rather than a nested URI. 49 content_hash: SHA-256 of ``content``. Computed by the runtime at 50 ingest time; loaders should leave this ``None``. Used by 51 staleness-detection to skip re-embedding unchanged documents. 52 metadata: Arbitrary provider-specific or user-attached key-value data. 53 Loaders use this to expose details like ``bucket``, ``key``, 54 ``page``, ``row_index``, etc. 55 """ 56 57 content: str 58 type: DocumentType = DocumentType.TEXT 59 id: UUID = _UNSET_DOCUMENT_ID 60 source: str | None = None 61 content_hash: str | None = None 62 metadata: dict[str, Any] = field(default_factory=dict) 63 64 def __post_init__(self) -> None: 65 if self.id == _UNSET_DOCUMENT_ID: 66 self.id = ( 67 uuid5(NAMESPACE_URL, self.source) 68 if self.source is not None 69 else uuid4() 70 )
A unit of source content produced by a loader.
Attributes:
- content: The decoded textual content of the document. Always a string; binary loaders are responsible for their own decoding.
- type: A
DocumentTypedescribing the content format. Cloud loaders infer it from the object's file extension; structured loaders (CSV, SQL) set it explicitly. - id: Unique identifier. If not provided and
sourceis set, derived deterministically fromsourcevia UUID5 (RFC 4122 URL namespace) so the same source yields the same id across processes — required for the runtime's upsert (delete_whereondocument_id) to find and clear the prior version when content changes. Sourceless documents get a random UUID4 (no stable identity → no upsert semantics). source: The natural identifier of where this document came from — a URI (
s3://bucket/key,gs://bucket/name,https://...), a file path, or a relational id. Cloud loaders always set this; user-constructed documents may leave itNone.Writers that derive a storage key (when no
key_fnis supplied) look here first; the cloud writers also strip their own URI prefix so that "load from S3, write back to S3" produces a clean key rather than a nested URI.- content_hash: SHA-256 of
content. Computed by the runtime at ingest time; loaders should leave thisNone. Used by staleness-detection to skip re-embedding unchanged documents. - metadata: Arbitrary provider-specific or user-attached key-value data.
Loaders use this to expose details like
bucket,key,page,row_index, etc.
45@dataclass 46class DocumentFailed: 47 """A document that had at least one failed embedding batch. 48 49 Note: successful batches for the same document *are* written to the 50 store. ``DocumentFailed`` is an informational signal that the document 51 is now partial — callers may want to retry, delete, or accept the 52 partial state. 53 """ 54 55 document_id: UUID 56 source: str | None 57 errors: list[Exception]
A document that had at least one failed embedding batch.
Note: successful batches for the same document are written to the
store. DocumentFailed is an informational signal that the document
is now partial — callers may want to retry, delete, or accept the
partial state.
60@dataclass 61class DocumentSkipped: 62 """A document skipped during ingest because the store already has an 63 entry with the same ``source_path`` and ``content_hash``.""" 64 65 document_id: UUID 66 source: str | None 67 reason: str = "unchanged"
A document skipped during ingest because the store already has an
entry with the same source_path and content_hash.
15class DocumentType(str, Enum): 16 TEXT = "text" 17 MARKDOWN = "markdown" 18 PDF = "pdf" 19 CSV = "csv" 20 JSON = "json"
An enumeration.
84@dataclass 85class EmbeddedChunk: 86 chunk: Chunk 87 vector: list[float] 88 embedding_model: str 89 embedding_version: str | None = None
72@dataclass 73class EmbeddingFailure: 74 """A failed batch embedding attempt. 75 76 Attributes: 77 chunks: Source chunks that could not be embedded. 78 errors: Exceptions raised during embedding. 79 """ 80 81 chunks: list[Chunk] 82 errors: list[Exception]
A failed batch embedding attempt.
Attributes:
- chunks: Source chunks that could not be embedded.
- errors: Exceptions raised during embedding.
7class EmbeddingModelMismatchError(RuntimeError): 8 """Raised when the runtime's embedder model differs from the store's. 9 10 Mixing vectors from different embedding models silently produces 11 meaningless similarity scores, so the runtime fails loudly before 12 issuing the search. 13 """
Raised when the runtime's embedder model differs from the store's.
Mixing vectors from different embedding models silently produces meaningless similarity scores, so the runtime fails loudly before issuing the search.
70@dataclass 71class IngestionStats: 72 """Summary of a complete ingest run. 73 74 ``total_metrics`` accumulates per-batch ``EmbeddingMetrics`` (tokens, 75 dollar cost, latency, vector count) across every successful batch in 76 the run, so callers can read a single total for billing/observability. 77 """ 78 79 documents_loaded: int = 0 80 documents_failed: int = 0 81 documents_skipped: int = 0 82 chunks_created: int = 0 83 chunks_embedded: int = 0 84 batches_failed: int = 0 85 batch_failures: list[EmbeddingFailure] = field(default_factory=list) 86 failed_documents: list[DocumentFailed] = field(default_factory=list) 87 total_metrics: EmbeddingMetrics = field(default_factory=EmbeddingMetrics)
Summary of a complete ingest run.
total_metrics accumulates per-batch EmbeddingMetrics (tokens,
dollar cost, latency, vector count) across every successful batch in
the run, so callers can read a single total for billing/observability.
101@dataclass 102class RetrievalResult: 103 query: str 104 chunks: list[RetrievedChunk] 105 total_candidates: int | None = None 106 metadata: dict[str, Any] = field(default_factory=dict)
106class RetrievalRuntime: 107 """Orchestrates loading, chunking, embedding, storage, and retrieval. 108 109 The runtime captures *how* to process documents (chunker + embedder + 110 store); the loader passed to :meth:`ingest` decides *what* to 111 process. A single runtime can ingest from multiple sources, mix 112 chunking strategies via separate runtimes against the same store, 113 and update existing documents by re-ingesting them. Multi-tenant 114 callers share one runtime and pass ``scope`` per :meth:`ingest` or 115 :meth:`retrieve` call. 116 117 Args: 118 chunker: Splits documents into chunks. 119 embedder: Embeds chunk text into vectors. 120 store: Receives written ``StoreEntry``s and serves similarity search. 121 batch_size: Items per embedding batch. Falls back to 122 ``embedder.default_batch_size`` when omitted; raises 123 ``ValueError`` at construction if neither is set. 124 on_ingest: Synchronous callback invoked with each ``IngestionEvent`` 125 as it is yielded. Wrap in ``asyncio.create_task`` for async logging. 126 on_retrieve: Synchronous callback invoked with the query string and 127 the ``RetrievalResult`` after each retrieve call. 128 max_tokens: When set, chunks whose token count exceeds this limit 129 are dropped before embedding and reported via 130 ``EmbeddingFailure`` rather than being sent to the provider. 131 Requires ``tokenizer`` (defaults to ``TiktokenTokenizer``). 132 tokenizer: Tokenizer used to enforce ``max_tokens``. Defaults to 133 ``TiktokenTokenizer`` lazily when ``max_tokens`` is set. 134 """ 135 136 def __init__( 137 self, 138 chunker: Chunker, 139 embedder: Embedding, 140 store: Store, 141 *, 142 batch_size: int | None = None, 143 on_ingest: Callable[ 144 [BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped], None 145 ] 146 | None = None, 147 on_retrieve: Callable[[str, RetrievalResult], None] | None = None, 148 max_tokens: int | None = None, 149 tokenizer: Tokenizer | None = None, 150 ) -> None: 151 self._chunker = chunker 152 self._embedder = embedder 153 self._store = store 154 self._batch_size = self._resolve_batch_size(batch_size, embedder) 155 self._on_ingest = on_ingest 156 self._on_retrieve = on_retrieve 157 self._max_tokens = max_tokens 158 if max_tokens is not None and tokenizer is None: 159 from .chunking.tokenization import TiktokenTokenizer 160 161 tokenizer = TiktokenTokenizer() 162 self._tokenizer = tokenizer 163 # Captured on the first successful embedded batch and checked at 164 # retrieve time; survives process restarts by lazy-seeding from 165 # an existing store entry on the first ingest/retrieve. 166 self._captured_model: str | None = None 167 self._seed_attempted: bool = False 168 169 @property 170 def store(self) -> Store: 171 return self._store 172 173 @property 174 def embedder(self) -> Embedding: 175 return self._embedder 176 177 @property 178 def chunker(self) -> Chunker: 179 return self._chunker 180 181 @property 182 def batch_size(self) -> int: 183 return self._batch_size 184 185 @property 186 def max_tokens(self) -> int | None: 187 return self._max_tokens 188 189 @staticmethod 190 def _resolve_batch_size(batch_size: int | None, embedder: Embedding) -> int: 191 bs = batch_size if batch_size is not None else embedder.default_batch_size 192 if bs is None: 193 raise ValueError( 194 f"{type(embedder).__name__} does not declare a " 195 "default_batch_size. Pass batch_size= to RetrievalRuntime " 196 "or set default_batch_size on the embedder class." 197 ) 198 return bs 199 200 async def ingest( 201 self, 202 loader: BaseDocumentLoader, 203 *, 204 scope: StoreScope | None = None, 205 ) -> AsyncGenerator[ 206 BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped, None 207 ]: 208 """Stream loader → chunker → embedder → store, yielding per-batch events. 209 210 Args: 211 loader: Source of ``Document`` objects to ingest. 212 scope: Tag written onto every ``StoreEntry`` produced by this 213 call. Single-tenant callers can leave this ``None``. 214 215 Yields: 216 ``BatchIngested`` after each successful batch finishes writing, 217 ``EmbeddingFailure`` for any failed batch, and ``DocumentFailed`` 218 once at end-of-document for each document that had any failed 219 batch. Successful batches for a partially-failed document are 220 still written; ``DocumentFailed`` signals the partial state. 221 """ 222 stats = IngestionStats() 223 async for event in self._ingest_with_stats(loader, stats, scope): 224 if self._on_ingest is not None: 225 self._on_ingest(event) 226 yield event 227 228 async def ingest_all( 229 self, 230 loader: BaseDocumentLoader, 231 *, 232 scope: StoreScope | None = None, 233 ) -> IngestionStats: 234 """Drain `ingest` and return aggregate counts.""" 235 stats = IngestionStats() 236 async for event in self._ingest_with_stats(loader, stats, scope): 237 if self._on_ingest is not None: 238 self._on_ingest(event) 239 return stats 240 241 async def _ingest_with_stats( 242 self, 243 loader: BaseDocumentLoader, 244 stats: IngestionStats, 245 scope: StoreScope | None, 246 ) -> AsyncGenerator[ 247 BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped, None 248 ]: 249 async for doc in loader.astream(): 250 async for event in self._ingest_document(doc, stats, scope): 251 yield event 252 253 async def _ingest_document( 254 self, doc: Document, stats: IngestionStats, scope: StoreScope | None 255 ) -> AsyncGenerator[ 256 BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped, None 257 ]: 258 stats.documents_loaded += 1 259 doc.content_hash = _content_hash(doc.content) 260 261 await self._ensure_captured_model_seeded() 262 263 if await self._is_complete_duplicate(doc): 264 stats.documents_skipped += 1 265 yield DocumentSkipped(document_id=doc.id, source=doc.source) 266 return 267 268 chunks = await self._chunker.achunk(doc) 269 stats.chunks_created += len(chunks) 270 if not chunks: 271 return 272 273 self._stamp_staleness_metadata(doc, chunks) 274 275 # Token-size guard: drop oversized chunks before embedding to avoid 276 # provider 4xx errors. Each oversize chunk surfaces as an 277 # EmbeddingFailure carried into the document's accumulated errors. 278 doc_errors: list[Exception] = [] 279 chunks, failures = self._split_oversized(chunks, stats) 280 for failure in failures: 281 doc_errors.extend(failure.errors) 282 yield failure 283 if not chunks: 284 if doc_errors: 285 yield self._record_document_failed(doc, doc_errors, stats) 286 return 287 288 # Stamp the final (post-token-guard) chunk count onto every chunk so a 289 # later staleness check can tell a complete document from a 290 # partially-written one. Every chunk carries the same total, so reading 291 # any one persisted chunk reveals how many were expected. 292 for chunk in chunks: 293 chunk.metadata["doc_chunk_count"] = len(chunks) 294 295 async for event in self._embed_and_store(doc, chunks, stats, doc_errors, scope): 296 yield event 297 298 if doc_errors: 299 yield self._record_document_failed(doc, doc_errors, stats) 300 301 async def _is_complete_duplicate(self, doc: Document) -> bool: 302 """Whether the store already holds a *complete* copy of ``doc``. 303 304 Skip re-embedding only when as many chunks are present as the last 305 write expected. A partially-written document (some chunks present after 306 an interrupted ingest) has fewer than expected and is re-ingested rather 307 than left broken. find() is metadata-only (no vector search); the second 308 call caps its work at the document's own chunk count, and only runs when 309 a prior version exists. (Counting is done via find() rather than a 310 count() call so the runtime depends only on the Store protocol.) 311 """ 312 if doc.source is None: 313 return False 314 stale_filters = { 315 "source_path": doc.source, 316 "content_hash": doc.content_hash, 317 } 318 existing = await self._store.find(stale_filters, limit=1) 319 if not existing: 320 return False 321 expected = existing[0].chunk_metadata.get("doc_chunk_count") 322 if expected is None: 323 # Legacy entry written before count-aware staleness: 324 # preserve the original "exists => complete" behavior. 325 return True 326 present = await self._store.find(stale_filters, limit=expected) 327 return len(present) >= expected 328 329 @staticmethod 330 def _stamp_staleness_metadata(doc: Document, chunks: list[Chunk]) -> None: 331 """Inject staleness-detection metadata into every chunk so future 332 `find` calls can identify whether this document has changed.""" 333 for chunk in chunks: 334 if doc.source is not None: 335 chunk.metadata.setdefault("source_path", doc.source) 336 if doc.content_hash is not None: 337 chunk.metadata.setdefault("content_hash", doc.content_hash) 338 339 def _split_oversized( 340 self, chunks: list[Chunk], stats: IngestionStats 341 ) -> tuple[list[Chunk], list[EmbeddingFailure]]: 342 """Partition chunks into embeddable ones and per-chunk failures. 343 344 Returns ``(ok_chunks, failures)``; each oversize chunk becomes a 345 single-chunk ``EmbeddingFailure`` and is recorded in ``stats``. 346 """ 347 if self._max_tokens is None or self._tokenizer is None: 348 return chunks, [] 349 ok_chunks: list[Chunk] = [] 350 failures: list[EmbeddingFailure] = [] 351 for chunk in chunks: 352 tokens = self._tokenizer.count(chunk.content) 353 if tokens > self._max_tokens: 354 err = ValueError( 355 f"chunk {chunk.id} has {tokens} tokens " 356 f"(>{self._max_tokens}); dropped before embedding" 357 ) 358 stats.batches_failed += 1 359 failure = EmbeddingFailure(chunks=[chunk], errors=[err]) 360 stats.batch_failures.append(failure) 361 failures.append(failure) 362 else: 363 ok_chunks.append(chunk) 364 return ok_chunks, failures 365 366 async def _embed_and_store( 367 self, 368 doc: Document, 369 chunks: list[Chunk], 370 stats: IngestionStats, 371 doc_errors: list[Exception], 372 scope: StoreScope | None, 373 ) -> AsyncGenerator[ 374 BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped, None 375 ]: 376 # batch_index is per-document: it counts batches (successful and 377 # failed) within this document and resets for the next one. 378 batch_index = 0 379 delete_done = False 380 async for batch in self._embedder.astream_batches( 381 chunks, batch_size=self._batch_size 382 ): 383 if isinstance(batch, EmbeddingResult): 384 # Check model BEFORE delete_where / write — a mismatch here 385 # must not corrupt the store by clearing prior chunks first. 386 self._check_model(batch.metrics.model) 387 if not delete_done: 388 await self._store.delete_where({"document_id": str(doc.id)}) 389 delete_done = True 390 for embedded in batch.chunks: 391 self._capture_model(embedded) 392 entry = StoreEntry.from_chunk(embedded, scope=scope) 393 await self._store.write(entry) 394 stats.chunks_embedded += len(batch.chunks) 395 stats.total_metrics = stats.total_metrics + batch.metrics 396 yield BatchIngested( 397 document_id=doc.id, 398 embedded_chunks=batch.chunks, 399 batch_index=batch_index, 400 metrics=batch.metrics, 401 ) 402 else: 403 doc_errors.extend(batch.errors) 404 stats.batches_failed += 1 405 stats.batch_failures.append(batch) 406 yield batch 407 batch_index += 1 408 409 def _capture_model(self, embedded: EmbeddedChunk) -> None: 410 """Record the embedding model from the first successful chunk so later 411 retrieve() calls can enforce model consistency.""" 412 if self._captured_model is None and embedded.embedding_model: 413 self._captured_model = embedded.embedding_model 414 logger.info( 415 "RetrievalRuntime captured embedding model %r " 416 "from first successful batch; subsequent retrieve() " 417 "calls will enforce this model.", 418 self._captured_model, 419 ) 420 421 async def _ensure_captured_model_seeded(self) -> None: 422 """Lazily seed ``_captured_model`` from an existing store entry so the 423 guard survives across process restarts. ``StoreEntry.embedding_model`` 424 is recorded on every persisted entry, so a single ``find`` call is 425 enough — no schema change required. Runs at most once per runtime; 426 a miss against an empty store sets ``_seed_attempted`` so we don't 427 re-query on every doc.""" 428 if self._captured_model is not None or self._seed_attempted: 429 return 430 self._seed_attempted = True 431 existing = await self._store.find({}, limit=1) 432 if existing and existing[0].embedding_model: 433 self._captured_model = existing[0].embedding_model 434 logger.info( 435 "RetrievalRuntime seeded captured embedding model %r from " 436 "an existing store entry; mismatched embedders will raise.", 437 self._captured_model, 438 ) 439 440 def _check_model(self, embed_model: str | None) -> None: 441 """Raise if ``embed_model`` disagrees with the captured model.""" 442 if ( 443 self._captured_model is not None 444 and embed_model 445 and embed_model != self._captured_model 446 ): 447 raise EmbeddingModelMismatchError( 448 f"Embedder produced vectors with model {embed_model!r} but " 449 f"store was built with {self._captured_model!r}. Similarity " 450 "scores across models are meaningless; rebuild the store " 451 "with the correct embedder or switch embedders." 452 ) 453 454 @staticmethod 455 def _record_document_failed( 456 doc: Document, doc_errors: list[Exception], stats: IngestionStats 457 ) -> DocumentFailed: 458 failed = DocumentFailed( 459 document_id=doc.id, 460 source=doc.source, 461 errors=doc_errors, 462 ) 463 stats.documents_failed += 1 464 stats.failed_documents.append(failed) 465 return failed 466 467 async def delete_document(self, document_id: UUID) -> None: 468 """Remove all chunks for a document from the store. 469 470 Convenience wrapper around ``store.delete_where({"document_id": ...})`` 471 so callers don't need to know the metadata key. 472 """ 473 await self._store.delete_where({"document_id": str(document_id)}) 474 475 async def retrieve( 476 self, 477 query: str, 478 top_k: int = 5, 479 metadata_filters: dict[str, Any] | None = None, 480 scope: StoreScope | None = None, 481 ) -> RetrievalResult: 482 """Embed ``query`` and return the top ``top_k`` matches from the store. 483 484 Args: 485 query: The text to embed and search with. 486 top_k: Maximum number of results. 487 metadata_filters: Additional equality filters on chunk metadata. 488 scope: Restricts the search to entries written with the same 489 scope. Leave ``None`` to search across all scopes. 490 491 Raises: 492 EmbeddingModelMismatchError: When the embedder reports a model 493 different from the one captured on first ingest. 494 """ 495 await self._ensure_captured_model_seeded() 496 text_result = await self._embedder.aembed([query]) 497 self._check_model(text_result.metrics.model) 498 499 store_query = StoreQuery( 500 text=query, 501 scope=scope, 502 embedding=text_result.vectors[0], 503 top_k=top_k, 504 metadata_filters=metadata_filters, 505 ) 506 store_hits = await self._store.read(store_query) 507 chunks = [ 508 RetrievedChunk( 509 chunk=_entry_to_chunk(hit.entry), 510 score=hit.score, 511 rank=hit.rank, 512 source_retriever=hit.source_retriever, 513 rerank_score=hit.rerank_score, 514 ) 515 for hit in store_hits 516 ] 517 result = RetrievalResult(query=query, chunks=chunks) 518 if self._on_retrieve is not None: 519 self._on_retrieve(query, result) 520 return result
Orchestrates loading, chunking, embedding, storage, and retrieval.
The runtime captures how to process documents (chunker + embedder +
store); the loader passed to ingest() decides what to
process. A single runtime can ingest from multiple sources, mix
chunking strategies via separate runtimes against the same store,
and update existing documents by re-ingesting them. Multi-tenant
callers share one runtime and pass scope per ingest() or
retrieve() call.
Arguments:
- chunker: Splits documents into chunks.
- embedder: Embeds chunk text into vectors.
- store: Receives written
StoreEntrys and serves similarity search. - batch_size: Items per embedding batch. Falls back to
embedder.default_batch_sizewhen omitted; raisesValueErrorat construction if neither is set. - on_ingest: Synchronous callback invoked with each
IngestionEventas it is yielded. Wrap inasyncio.create_taskfor async logging. - on_retrieve: Synchronous callback invoked with the query string and
the
RetrievalResultafter each retrieve call. - max_tokens: When set, chunks whose token count exceeds this limit
are dropped before embedding and reported via
EmbeddingFailurerather than being sent to the provider. Requirestokenizer(defaults toTiktokenTokenizer). - tokenizer: Tokenizer used to enforce
max_tokens. Defaults toTiktokenTokenizerlazily whenmax_tokensis set.
136 def __init__( 137 self, 138 chunker: Chunker, 139 embedder: Embedding, 140 store: Store, 141 *, 142 batch_size: int | None = None, 143 on_ingest: Callable[ 144 [BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped], None 145 ] 146 | None = None, 147 on_retrieve: Callable[[str, RetrievalResult], None] | None = None, 148 max_tokens: int | None = None, 149 tokenizer: Tokenizer | None = None, 150 ) -> None: 151 self._chunker = chunker 152 self._embedder = embedder 153 self._store = store 154 self._batch_size = self._resolve_batch_size(batch_size, embedder) 155 self._on_ingest = on_ingest 156 self._on_retrieve = on_retrieve 157 self._max_tokens = max_tokens 158 if max_tokens is not None and tokenizer is None: 159 from .chunking.tokenization import TiktokenTokenizer 160 161 tokenizer = TiktokenTokenizer() 162 self._tokenizer = tokenizer 163 # Captured on the first successful embedded batch and checked at 164 # retrieve time; survives process restarts by lazy-seeding from 165 # an existing store entry on the first ingest/retrieve. 166 self._captured_model: str | None = None 167 self._seed_attempted: bool = False
200 async def ingest( 201 self, 202 loader: BaseDocumentLoader, 203 *, 204 scope: StoreScope | None = None, 205 ) -> AsyncGenerator[ 206 BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped, None 207 ]: 208 """Stream loader → chunker → embedder → store, yielding per-batch events. 209 210 Args: 211 loader: Source of ``Document`` objects to ingest. 212 scope: Tag written onto every ``StoreEntry`` produced by this 213 call. Single-tenant callers can leave this ``None``. 214 215 Yields: 216 ``BatchIngested`` after each successful batch finishes writing, 217 ``EmbeddingFailure`` for any failed batch, and ``DocumentFailed`` 218 once at end-of-document for each document that had any failed 219 batch. Successful batches for a partially-failed document are 220 still written; ``DocumentFailed`` signals the partial state. 221 """ 222 stats = IngestionStats() 223 async for event in self._ingest_with_stats(loader, stats, scope): 224 if self._on_ingest is not None: 225 self._on_ingest(event) 226 yield event
Stream loader → chunker → embedder → store, yielding per-batch events.
Arguments:
- loader: Source of
Documentobjects to ingest. - scope: Tag written onto every
StoreEntryproduced by this call. Single-tenant callers can leave thisNone.
Yields:
BatchIngestedafter each successful batch finishes writing,EmbeddingFailurefor any failed batch, andDocumentFailedonce at end-of-document for each document that had any failed batch. Successful batches for a partially-failed document are still written;DocumentFailedsignals the partial state.
228 async def ingest_all( 229 self, 230 loader: BaseDocumentLoader, 231 *, 232 scope: StoreScope | None = None, 233 ) -> IngestionStats: 234 """Drain `ingest` and return aggregate counts.""" 235 stats = IngestionStats() 236 async for event in self._ingest_with_stats(loader, stats, scope): 237 if self._on_ingest is not None: 238 self._on_ingest(event) 239 return stats
Drain ingest and return aggregate counts.
467 async def delete_document(self, document_id: UUID) -> None: 468 """Remove all chunks for a document from the store. 469 470 Convenience wrapper around ``store.delete_where({"document_id": ...})`` 471 so callers don't need to know the metadata key. 472 """ 473 await self._store.delete_where({"document_id": str(document_id)})
Remove all chunks for a document from the store.
Convenience wrapper around store.delete_where({"document_id": ...})
so callers don't need to know the metadata key.
475 async def retrieve( 476 self, 477 query: str, 478 top_k: int = 5, 479 metadata_filters: dict[str, Any] | None = None, 480 scope: StoreScope | None = None, 481 ) -> RetrievalResult: 482 """Embed ``query`` and return the top ``top_k`` matches from the store. 483 484 Args: 485 query: The text to embed and search with. 486 top_k: Maximum number of results. 487 metadata_filters: Additional equality filters on chunk metadata. 488 scope: Restricts the search to entries written with the same 489 scope. Leave ``None`` to search across all scopes. 490 491 Raises: 492 EmbeddingModelMismatchError: When the embedder reports a model 493 different from the one captured on first ingest. 494 """ 495 await self._ensure_captured_model_seeded() 496 text_result = await self._embedder.aembed([query]) 497 self._check_model(text_result.metrics.model) 498 499 store_query = StoreQuery( 500 text=query, 501 scope=scope, 502 embedding=text_result.vectors[0], 503 top_k=top_k, 504 metadata_filters=metadata_filters, 505 ) 506 store_hits = await self._store.read(store_query) 507 chunks = [ 508 RetrievedChunk( 509 chunk=_entry_to_chunk(hit.entry), 510 score=hit.score, 511 rank=hit.rank, 512 source_retriever=hit.source_retriever, 513 rerank_score=hit.rerank_score, 514 ) 515 for hit in store_hits 516 ] 517 result = RetrievalResult(query=query, chunks=chunks) 518 if self._on_retrieve is not None: 519 self._on_retrieve(query, result) 520 return result
Embed query and return the top top_k matches from the store.
Arguments:
- query: The text to embed and search with.
- top_k: Maximum number of results.
- metadata_filters: Additional equality filters on chunk metadata.
- scope: Restricts the search to entries written with the same
scope. Leave
Noneto search across all scopes.
Raises:
- EmbeddingModelMismatchError: When the embedder reports a model different from the one captured on first ingest.
92@dataclass 93class RetrievedChunk: 94 chunk: Chunk 95 score: float 96 rank: int 97 source_retriever: str | None = None 98 rerank_score: float | None = None
10@runtime_checkable 11class Store(Protocol): 12 async def write(self, entry: StoreEntry) -> str: ... 13 async def read(self, query: StoreQuery) -> list[RetrievedStoreEntry]: ... 14 async def delete(self, id: UUID) -> None: ... 15 async def clear(self, scope: StoreScope) -> None: ... 16 async def delete_where(self, filters: dict[str, Any]) -> None: ... 17 async def find( 18 self, filters: dict[str, Any], limit: int = 1 19 ) -> list[StoreEntry]: ...
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol):
def meth(self) -> int:
...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::
class C:
def meth(self) -> int:
return 0
def func(x: Proto) -> int:
return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto(Protocol[T]):
def meth(self) -> T:
...
1431def _no_init_or_replace_init(self, *args, **kwargs): 1432 cls = type(self) 1433 1434 if cls._is_protocol: 1435 raise TypeError('Protocols cannot be instantiated') 1436 1437 # Already using a custom `__init__`. No need to calculate correct 1438 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1439 if cls.__init__ is not _no_init_or_replace_init: 1440 return 1441 1442 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1443 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1444 # searches for a proper new `__init__` in the MRO. The new `__init__` 1445 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1446 # instantiation of the protocol subclass will thus use the new 1447 # `__init__` and no longer call `_no_init_or_replace_init`. 1448 for base in cls.__mro__: 1449 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1450 if init is not _no_init_or_replace_init: 1451 cls.__init__ = init 1452 break 1453 else: 1454 # should not happen 1455 cls.__init__ = object.__init__ 1456 1457 cls.__init__(self, *args, **kwargs)
12 async def write(self, entry: StoreEntry) -> str: ...
13 async def read(self, query: StoreQuery) -> list[RetrievedStoreEntry]: ...
15 async def clear(self, scope: StoreScope) -> None: ...
16 async def delete_where(self, filters: dict[str, Any]) -> None: ...
44@dataclass 45class StoreEntry: 46 # Required fields 47 id: UUID 48 content: str 49 vector: list[float] | None 50 embedding_model: str 51 chunk_id: UUID 52 document_id: UUID 53 # Optional enrichment fields 54 abstract: str | None = None 55 summary: str | None = None 56 scope: StoreScope | None = None 57 # Optional chunk provenance 58 chunk_index: int = 0 59 parent_chunk_id: UUID | None = None 60 chunk_offsets: tuple[int, int] | None = None 61 chunk_metadata: dict = field(default_factory=dict) 62 # Optional embedding provenance 63 embedding_version: str | None = None 64 # Optional store metadata 65 entities: list[Entity] | None = None 66 valid_from: datetime | None = None 67 valid_until: datetime | None = None 68 created_at: datetime = field(default_factory=lambda: datetime.now(tz=timezone.utc)) 69 70 @classmethod 71 def from_chunk( 72 cls, 73 embedded_chunk: EmbeddedChunk, 74 *, 75 scope: StoreScope | None = None, 76 abstract: str | None = None, 77 summary: str | None = None, 78 entities: list[Entity] | None = None, 79 valid_from: datetime | None = None, 80 valid_until: datetime | None = None, 81 ) -> StoreEntry: 82 chunk = embedded_chunk.chunk 83 return cls( 84 id=uuid4(), 85 content=chunk.content, 86 vector=embedded_chunk.vector, 87 embedding_model=embedded_chunk.embedding_model, 88 embedding_version=embedded_chunk.embedding_version, 89 chunk_id=chunk.id, 90 document_id=chunk.document_id, 91 chunk_index=chunk.index, 92 parent_chunk_id=chunk.parent_chunk_id, 93 chunk_offsets=chunk.offsets, 94 chunk_metadata=chunk.metadata, 95 scope=scope, 96 abstract=abstract, 97 summary=summary, 98 entities=entities, 99 valid_from=valid_from, 100 valid_until=valid_until, 101 )
70 @classmethod 71 def from_chunk( 72 cls, 73 embedded_chunk: EmbeddedChunk, 74 *, 75 scope: StoreScope | None = None, 76 abstract: str | None = None, 77 summary: str | None = None, 78 entities: list[Entity] | None = None, 79 valid_from: datetime | None = None, 80 valid_until: datetime | None = None, 81 ) -> StoreEntry: 82 chunk = embedded_chunk.chunk 83 return cls( 84 id=uuid4(), 85 content=chunk.content, 86 vector=embedded_chunk.vector, 87 embedding_model=embedded_chunk.embedding_model, 88 embedding_version=embedded_chunk.embedding_version, 89 chunk_id=chunk.id, 90 document_id=chunk.document_id, 91 chunk_index=chunk.index, 92 parent_chunk_id=chunk.parent_chunk_id, 93 chunk_offsets=chunk.offsets, 94 chunk_metadata=chunk.metadata, 95 scope=scope, 96 abstract=abstract, 97 summary=summary, 98 entities=entities, 99 valid_from=valid_from, 100 valid_until=valid_until, 101 )
113@dataclass 114class StoreQuery: 115 text: str 116 scope: StoreScope | None = None 117 embedding: list[float] | None = None 118 top_k: int = 10 119 metadata_filters: dict[str, Any] | None = None
21@dataclass(frozen=True) 22class StoreScope: 23 """Equality-filter namespace for store entries. 24 25 Each entry in ``labels`` becomes a mandatory equality filter on every 26 write and read. The retrieval module is agnostic about what dimensions 27 you scope by — pick whichever axes fit your tenancy model:: 28 29 StoreScope(labels={"user_id": "alice"}) # SaaS tenancy 30 StoreScope(labels={"organization": "acme", "environment": "prod"}) # B2B 31 StoreScope(labels={"agent_id": "docs-bot", "session_id": "s1"}) # agent context 32 StoreScope(labels={"account_id": 42, "is_prod": True}) # non-string scalars 33 34 The ``scope_`` prefix applied in :meth:`to_payload_filters` avoids key 35 collisions in flat payload dicts that also carry content fields. 36 """ 37 38 labels: Mapping[str, Any] = field(default_factory=dict) 39 40 def to_payload_filters(self) -> dict[str, Any]: 41 return {f"scope_{k}": v for k, v in self.labels.items()}
Equality-filter namespace for store entries.
Each entry in labels becomes a mandatory equality filter on every
write and read. The retrieval module is agnostic about what dimensions
you scope by — pick whichever axes fit your tenancy model::
StoreScope(labels={"user_id": "alice"}) # SaaS tenancy
StoreScope(labels={"organization": "acme", "environment": "prod"}) # B2B
StoreScope(labels={"agent_id": "docs-bot", "session_id": "s1"}) # agent context
StoreScope(labels={"account_id": 42, "is_prod": True}) # non-string scalars
The scope_ prefix applied in to_payload_filters() avoids key
collisions in flat payload dicts that also carry content fields.
184class VectorStore: 185 """Cosine similarity search over StoreEntry vectors. 186 187 Satisfies the Store protocol. Does not inherit from any base class. 188 """ 189 190 def __init__(self, backend: VectorBackend) -> None: 191 self._backend = backend 192 193 async def write(self, entry: StoreEntry) -> str: 194 if entry.vector is None: 195 logger.error( 196 "VectorStore.write called with entry.vector=None (entry_id=%s, " 197 "chunk_id=%s); the entry must be embedded before writing.", 198 entry.id, 199 entry.chunk_id, 200 ) 201 raise ValueError( 202 f"VectorStore.write requires entry.vector to be set " 203 f"(entry_id={entry.id}); embed the chunk before writing." 204 ) 205 await self._backend.upsert( 206 str(entry.id), entry.vector, _entry_to_payload(entry) 207 ) 208 return str(entry.id) 209 210 async def read(self, query: StoreQuery) -> list[RetrievedStoreEntry]: 211 if query.embedding is None: 212 raise ValueError( 213 "VectorStore.read requires query.embedding to be set; " 214 "caller must supply a pre-computed embedding." 215 ) 216 217 filters: dict[str, Any] = ( 218 query.scope.to_payload_filters() if query.scope is not None else {} 219 ) 220 if query.metadata_filters: 221 filters.update(query.metadata_filters) 222 223 raw_hits = await self._backend.search(query.embedding, query.top_k, filters) 224 225 results: list[RetrievedStoreEntry] = [] 226 for rank, (hit_id, score, payload) in enumerate(raw_hits): 227 entry = _payload_to_entry(hit_id, payload) 228 results.append( 229 RetrievedStoreEntry( 230 entry=entry, 231 score=score, 232 rank=rank, 233 source_retriever="dense", 234 ) 235 ) 236 return results 237 238 async def delete(self, id: UUID) -> None: 239 await self._backend.delete(str(id)) 240 241 async def clear(self, scope: StoreScope) -> None: 242 await self._backend.delete_where(scope.to_payload_filters()) 243 244 async def delete_where(self, filters: dict[str, Any]) -> None: 245 await self._backend.delete_where(filters) 246 247 async def find(self, filters: dict[str, Any], limit: int = 1) -> list[StoreEntry]: 248 raw_hits = await self._backend.list_where(filters, limit) 249 return [_payload_to_entry(hit_id, payload) for hit_id, payload in raw_hits] 250 251 async def count(self, filters: dict[str, Any] | None = None) -> int: 252 return await self._backend.count(filters or {}) 253 254 async def nearest_neighbors( 255 self, 256 embedding: list[float], 257 k: int, 258 scope: StoreScope | None = None, 259 ) -> list[RetrievedStoreEntry]: 260 filters = scope.to_payload_filters() if scope is not None else {} 261 raw_hits = await self._backend.search(embedding, k, filters) 262 263 results: list[RetrievedStoreEntry] = [] 264 for rank, (hit_id, score, payload) in enumerate(raw_hits): 265 entry = _payload_to_entry(hit_id, payload) 266 results.append( 267 RetrievedStoreEntry( 268 entry=entry, 269 score=score, 270 rank=rank, 271 source_retriever="dense", 272 ) 273 ) 274 return results
Cosine similarity search over StoreEntry vectors.
Satisfies the Store protocol. Does not inherit from any base class.
193 async def write(self, entry: StoreEntry) -> str: 194 if entry.vector is None: 195 logger.error( 196 "VectorStore.write called with entry.vector=None (entry_id=%s, " 197 "chunk_id=%s); the entry must be embedded before writing.", 198 entry.id, 199 entry.chunk_id, 200 ) 201 raise ValueError( 202 f"VectorStore.write requires entry.vector to be set " 203 f"(entry_id={entry.id}); embed the chunk before writing." 204 ) 205 await self._backend.upsert( 206 str(entry.id), entry.vector, _entry_to_payload(entry) 207 ) 208 return str(entry.id)
210 async def read(self, query: StoreQuery) -> list[RetrievedStoreEntry]: 211 if query.embedding is None: 212 raise ValueError( 213 "VectorStore.read requires query.embedding to be set; " 214 "caller must supply a pre-computed embedding." 215 ) 216 217 filters: dict[str, Any] = ( 218 query.scope.to_payload_filters() if query.scope is not None else {} 219 ) 220 if query.metadata_filters: 221 filters.update(query.metadata_filters) 222 223 raw_hits = await self._backend.search(query.embedding, query.top_k, filters) 224 225 results: list[RetrievedStoreEntry] = [] 226 for rank, (hit_id, score, payload) in enumerate(raw_hits): 227 entry = _payload_to_entry(hit_id, payload) 228 results.append( 229 RetrievedStoreEntry( 230 entry=entry, 231 score=score, 232 rank=rank, 233 source_retriever="dense", 234 ) 235 ) 236 return results
254 async def nearest_neighbors( 255 self, 256 embedding: list[float], 257 k: int, 258 scope: StoreScope | None = None, 259 ) -> list[RetrievedStoreEntry]: 260 filters = scope.to_payload_filters() if scope is not None else {} 261 raw_hits = await self._backend.search(embedding, k, filters) 262 263 results: list[RetrievedStoreEntry] = [] 264 for rank, (hit_id, score, payload) in enumerate(raw_hits): 265 entry = _payload_to_entry(hit_id, payload) 266 results.append( 267 RetrievedStoreEntry( 268 entry=entry, 269 score=score, 270 rank=rank, 271 source_retriever="dense", 272 ) 273 ) 274 return results