railtracks.retrieval

Retrieval subsystem.

The runtime (RetrievalRuntime) orchestrates loading, chunking, embedding, storage, and retrieval. The ~railtracks.retrieval.stores.Store protocol is the storage contract; ~railtracks.retrieval.stores.VectorStore is the canonical implementation. The rest of the pipeline is provided by railtracks.retrieval.loaders, railtracks.retrieval.chunking, and railtracks.retrieval.embedding.

 1"""Retrieval subsystem.
 2
 3The runtime (:class:`RetrievalRuntime`) orchestrates loading, chunking,
 4embedding, storage, and retrieval. The :class:`~railtracks.retrieval.stores.Store`
 5protocol is the storage contract; :class:`~railtracks.retrieval.stores.VectorStore`
 6is the canonical implementation. The rest of the pipeline is provided
 7by :mod:`railtracks.retrieval.loaders`, :mod:`railtracks.retrieval.chunking`,
 8and :mod:`railtracks.retrieval.embedding`.
 9"""
10
11from .embedding.models import EmbeddingFailure
12from .errors import EmbeddingModelMismatchError
13from .models import (
14    Chunk,
15    Document,
16    DocumentType,
17    EmbeddedChunk,
18    RetrievalResult,
19    RetrievedChunk,
20)
21from .runtime import (
22    BatchIngested,
23    DocumentFailed,
24    DocumentSkipped,
25    IngestionStats,
26    RetrievalRuntime,
27)
28from .stores import Store, StoreEntry, StoreQuery, StoreScope, VectorStore
29
30__all__ = [
31    "BatchIngested",
32    "Chunk",
33    "Document",
34    "DocumentFailed",
35    "DocumentSkipped",
36    "DocumentType",
37    "EmbeddedChunk",
38    "EmbeddingFailure",
39    "EmbeddingModelMismatchError",
40    "IngestionStats",
41    "RetrievalResult",
42    "RetrievalRuntime",
43    "RetrievedChunk",
44    "Store",
45    "StoreEntry",
46    "StoreQuery",
47    "StoreScope",
48    "VectorStore",
49]
@dataclass
class BatchIngested:
25@dataclass
26class BatchIngested:
27    """A batch of chunks that finished embedding and was written to the store.
28
29    ``batch_index`` is **per-document**: it starts at 0 for each document and
30    counts that document's batches (both successful and failed) in order. It
31    is not a run-global counter — to track overall progress, count events or
32    read ``IngestionStats``.
33
34    ``metrics`` carries the per-batch usage and timing reported by the
35    embedder (tokens, dollar cost, latency, vector count). Use it to track
36    per-ingest cost without having to wrap the embedder.
37    """
38
39    document_id: UUID
40    embedded_chunks: list[EmbeddedChunk]
41    batch_index: int
42    metrics: EmbeddingMetrics | None = None

A batch of chunks that finished embedding and was written to the store.

batch_index is per-document: it starts at 0 for each document and counts that document's batches (both successful and failed) in order. It is not a run-global counter — to track overall progress, count events or read IngestionStats.

metrics carries the per-batch usage and timing reported by the embedder (tokens, dollar cost, latency, vector count). Use it to track per-ingest cost without having to wrap the embedder.

BatchIngested( document_id: uuid.UUID, embedded_chunks: list[EmbeddedChunk], batch_index: int, metrics: railtracks.retrieval.embedding.models.EmbeddingMetrics | None = None)
document_id: uuid.UUID
embedded_chunks: list[EmbeddedChunk]
batch_index: int
metrics: railtracks.retrieval.embedding.models.EmbeddingMetrics | None = None
@dataclass
class Chunk:
73@dataclass
74class Chunk:
75    content: str
76    document_id: UUID
77    id: UUID = field(default_factory=uuid4)
78    index: int = 0
79    parent_chunk_id: UUID | None = None
80    offsets: tuple[int, int] | None = None
81    metadata: dict[str, Any] = field(default_factory=dict)
Chunk( content: str, document_id: uuid.UUID, id: uuid.UUID = <factory>, index: int = 0, parent_chunk_id: uuid.UUID | None = None, offsets: tuple[int, int] | None = None, metadata: dict[str, typing.Any] = <factory>)
content: str
document_id: uuid.UUID
id: uuid.UUID
index: int = 0
parent_chunk_id: uuid.UUID | None = None
offsets: tuple[int, int] | None = None
metadata: dict[str, typing.Any]
@dataclass
class Document:
23@dataclass
24class Document:
25    """A unit of source content produced by a loader.
26
27    Attributes:
28        content: The decoded textual content of the document. Always a string;
29            binary loaders are responsible for their own decoding.
30        type: A :class:`DocumentType` describing the content format. Cloud
31            loaders infer it from the object's file extension; structured
32            loaders (CSV, SQL) set it explicitly.
33        id: Unique identifier. If not provided and ``source`` is set, derived
34            deterministically from ``source`` via UUID5 (RFC 4122 URL
35            namespace) so the same source yields the same id across processes
36            — required for the runtime's upsert (``delete_where`` on
37            ``document_id``) to find and clear the prior version when content
38            changes. Sourceless documents get a random UUID4 (no stable
39            identity → no upsert semantics).
40        source: The natural identifier of where this document came from —
41            a URI (``s3://bucket/key``, ``gs://bucket/name``, ``https://...``),
42            a file path, or a relational id. Cloud loaders always set this;
43            user-constructed documents may leave it ``None``.
44
45            Writers that derive a storage key (when no ``key_fn`` is supplied)
46            look here first; the cloud writers also strip their own URI prefix
47            so that "load from S3, write back to S3" produces a clean key
48            rather than a nested URI.
49        content_hash: SHA-256 of ``content``. Computed by the runtime at
50            ingest time; loaders should leave this ``None``. Used by
51            staleness-detection to skip re-embedding unchanged documents.
52        metadata: Arbitrary provider-specific or user-attached key-value data.
53            Loaders use this to expose details like ``bucket``, ``key``,
54            ``page``, ``row_index``, etc.
55    """
56
57    content: str
58    type: DocumentType = DocumentType.TEXT
59    id: UUID = _UNSET_DOCUMENT_ID
60    source: str | None = None
61    content_hash: str | None = None
62    metadata: dict[str, Any] = field(default_factory=dict)
63
64    def __post_init__(self) -> None:
65        if self.id == _UNSET_DOCUMENT_ID:
66            self.id = (
67                uuid5(NAMESPACE_URL, self.source)
68                if self.source is not None
69                else uuid4()
70            )

A unit of source content produced by a loader.

Attributes:
  • content: The decoded textual content of the document. Always a string; binary loaders are responsible for their own decoding.
  • type: A DocumentType describing the content format. Cloud loaders infer it from the object's file extension; structured loaders (CSV, SQL) set it explicitly.
  • id: Unique identifier. If not provided and source is set, derived deterministically from source via UUID5 (RFC 4122 URL namespace) so the same source yields the same id across processes — required for the runtime's upsert (delete_where on document_id) to find and clear the prior version when content changes. Sourceless documents get a random UUID4 (no stable identity → no upsert semantics).
  • source: The natural identifier of where this document came from — a URI (s3://bucket/key, gs://bucket/name, https://...), a file path, or a relational id. Cloud loaders always set this; user-constructed documents may leave it None.

    Writers that derive a storage key (when no key_fn is supplied) look here first; the cloud writers also strip their own URI prefix so that "load from S3, write back to S3" produces a clean key rather than a nested URI.

  • content_hash: SHA-256 of content. Computed by the runtime at ingest time; loaders should leave this None. Used by staleness-detection to skip re-embedding unchanged documents.
  • metadata: Arbitrary provider-specific or user-attached key-value data. Loaders use this to expose details like bucket, key, page, row_index, etc.
Document( content: str, type: DocumentType = <DocumentType.TEXT: 'text'>, id: uuid.UUID = UUID('00000000-0000-0000-0000-000000000000'), source: str | None = None, content_hash: str | None = None, metadata: dict[str, typing.Any] = <factory>)
content: str
type: DocumentType = <DocumentType.TEXT: 'text'>
id: uuid.UUID = UUID('00000000-0000-0000-0000-000000000000')
source: str | None = None
content_hash: str | None = None
metadata: dict[str, typing.Any]
@dataclass
class DocumentFailed:
45@dataclass
46class DocumentFailed:
47    """A document that had at least one failed embedding batch.
48
49    Note: successful batches for the same document *are* written to the
50    store. ``DocumentFailed`` is an informational signal that the document
51    is now partial — callers may want to retry, delete, or accept the
52    partial state.
53    """
54
55    document_id: UUID
56    source: str | None
57    errors: list[Exception]

A document that had at least one failed embedding batch.

Note: successful batches for the same document are written to the store. DocumentFailed is an informational signal that the document is now partial — callers may want to retry, delete, or accept the partial state.

DocumentFailed(document_id: uuid.UUID, source: str | None, errors: list[Exception])
document_id: uuid.UUID
source: str | None
errors: list[Exception]
@dataclass
class DocumentSkipped:
60@dataclass
61class DocumentSkipped:
62    """A document skipped during ingest because the store already has an
63    entry with the same ``source_path`` and ``content_hash``."""
64
65    document_id: UUID
66    source: str | None
67    reason: str = "unchanged"

A document skipped during ingest because the store already has an entry with the same source_path and content_hash.

DocumentSkipped( document_id: uuid.UUID, source: str | None, reason: str = 'unchanged')
document_id: uuid.UUID
source: str | None
reason: str = 'unchanged'
class DocumentType(builtins.str, enum.Enum):
15class DocumentType(str, Enum):
16    TEXT = "text"
17    MARKDOWN = "markdown"
18    PDF = "pdf"
19    CSV = "csv"
20    JSON = "json"

An enumeration.

TEXT = <DocumentType.TEXT: 'text'>
MARKDOWN = <DocumentType.MARKDOWN: 'markdown'>
PDF = <DocumentType.PDF: 'pdf'>
CSV = <DocumentType.CSV: 'csv'>
JSON = <DocumentType.JSON: 'json'>
@dataclass
class EmbeddedChunk:
84@dataclass
85class EmbeddedChunk:
86    chunk: Chunk
87    vector: list[float]
88    embedding_model: str
89    embedding_version: str | None = None
EmbeddedChunk( chunk: Chunk, vector: list[float], embedding_model: str, embedding_version: str | None = None)
chunk: Chunk
vector: list[float]
embedding_model: str
embedding_version: str | None = None
@dataclass
class EmbeddingFailure:
72@dataclass
73class EmbeddingFailure:
74    """A failed batch embedding attempt.
75
76    Attributes:
77        chunks: Source chunks that could not be embedded.
78        errors: Exceptions raised during embedding.
79    """
80
81    chunks: list[Chunk]
82    errors: list[Exception]

A failed batch embedding attempt.

Attributes:
  • chunks: Source chunks that could not be embedded.
  • errors: Exceptions raised during embedding.
EmbeddingFailure( chunks: list[Chunk], errors: list[Exception])
chunks: list[Chunk]
errors: list[Exception]
class EmbeddingModelMismatchError(builtins.RuntimeError):
 7class EmbeddingModelMismatchError(RuntimeError):
 8    """Raised when the runtime's embedder model differs from the store's.
 9
10    Mixing vectors from different embedding models silently produces
11    meaningless similarity scores, so the runtime fails loudly before
12    issuing the search.
13    """

Raised when the runtime's embedder model differs from the store's.

Mixing vectors from different embedding models silently produces meaningless similarity scores, so the runtime fails loudly before issuing the search.

@dataclass
class IngestionStats:
70@dataclass
71class IngestionStats:
72    """Summary of a complete ingest run.
73
74    ``total_metrics`` accumulates per-batch ``EmbeddingMetrics`` (tokens,
75    dollar cost, latency, vector count) across every successful batch in
76    the run, so callers can read a single total for billing/observability.
77    """
78
79    documents_loaded: int = 0
80    documents_failed: int = 0
81    documents_skipped: int = 0
82    chunks_created: int = 0
83    chunks_embedded: int = 0
84    batches_failed: int = 0
85    batch_failures: list[EmbeddingFailure] = field(default_factory=list)
86    failed_documents: list[DocumentFailed] = field(default_factory=list)
87    total_metrics: EmbeddingMetrics = field(default_factory=EmbeddingMetrics)

Summary of a complete ingest run.

total_metrics accumulates per-batch EmbeddingMetrics (tokens, dollar cost, latency, vector count) across every successful batch in the run, so callers can read a single total for billing/observability.

IngestionStats( documents_loaded: int = 0, documents_failed: int = 0, documents_skipped: int = 0, chunks_created: int = 0, chunks_embedded: int = 0, batches_failed: int = 0, batch_failures: list[EmbeddingFailure] = <factory>, failed_documents: list[DocumentFailed] = <factory>, total_metrics: railtracks.retrieval.embedding.models.EmbeddingMetrics = <factory>)
documents_loaded: int = 0
documents_failed: int = 0
documents_skipped: int = 0
chunks_created: int = 0
chunks_embedded: int = 0
batches_failed: int = 0
batch_failures: list[EmbeddingFailure]
failed_documents: list[DocumentFailed]
total_metrics: railtracks.retrieval.embedding.models.EmbeddingMetrics
@dataclass
class RetrievalResult:
101@dataclass
102class RetrievalResult:
103    query: str
104    chunks: list[RetrievedChunk]
105    total_candidates: int | None = None
106    metadata: dict[str, Any] = field(default_factory=dict)
RetrievalResult( query: str, chunks: list[RetrievedChunk], total_candidates: int | None = None, metadata: dict[str, typing.Any] = <factory>)
query: str
chunks: list[RetrievedChunk]
total_candidates: int | None = None
metadata: dict[str, typing.Any]
class RetrievalRuntime:
106class RetrievalRuntime:
107    """Orchestrates loading, chunking, embedding, storage, and retrieval.
108
109    The runtime captures *how* to process documents (chunker + embedder +
110    store); the loader passed to :meth:`ingest` decides *what* to
111    process. A single runtime can ingest from multiple sources, mix
112    chunking strategies via separate runtimes against the same store,
113    and update existing documents by re-ingesting them. Multi-tenant
114    callers share one runtime and pass ``scope`` per :meth:`ingest` or
115    :meth:`retrieve` call.
116
117    Args:
118        chunker: Splits documents into chunks.
119        embedder: Embeds chunk text into vectors.
120        store: Receives written ``StoreEntry``s and serves similarity search.
121        batch_size: Items per embedding batch. Falls back to
122            ``embedder.default_batch_size`` when omitted; raises
123            ``ValueError`` at construction if neither is set.
124        on_ingest: Synchronous callback invoked with each ``IngestionEvent``
125            as it is yielded. Wrap in ``asyncio.create_task`` for async logging.
126        on_retrieve: Synchronous callback invoked with the query string and
127            the ``RetrievalResult`` after each retrieve call.
128        max_tokens: When set, chunks whose token count exceeds this limit
129            are dropped before embedding and reported via
130            ``EmbeddingFailure`` rather than being sent to the provider.
131            Requires ``tokenizer`` (defaults to ``TiktokenTokenizer``).
132        tokenizer: Tokenizer used to enforce ``max_tokens``. Defaults to
133            ``TiktokenTokenizer`` lazily when ``max_tokens`` is set.
134    """
135
136    def __init__(
137        self,
138        chunker: Chunker,
139        embedder: Embedding,
140        store: Store,
141        *,
142        batch_size: int | None = None,
143        on_ingest: Callable[
144            [BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped], None
145        ]
146        | None = None,
147        on_retrieve: Callable[[str, RetrievalResult], None] | None = None,
148        max_tokens: int | None = None,
149        tokenizer: Tokenizer | None = None,
150    ) -> None:
151        self._chunker = chunker
152        self._embedder = embedder
153        self._store = store
154        self._batch_size = self._resolve_batch_size(batch_size, embedder)
155        self._on_ingest = on_ingest
156        self._on_retrieve = on_retrieve
157        self._max_tokens = max_tokens
158        if max_tokens is not None and tokenizer is None:
159            from .chunking.tokenization import TiktokenTokenizer
160
161            tokenizer = TiktokenTokenizer()
162        self._tokenizer = tokenizer
163        # Captured on the first successful embedded batch and checked at
164        # retrieve time; survives process restarts by lazy-seeding from
165        # an existing store entry on the first ingest/retrieve.
166        self._captured_model: str | None = None
167        self._seed_attempted: bool = False
168
169    @property
170    def store(self) -> Store:
171        return self._store
172
173    @property
174    def embedder(self) -> Embedding:
175        return self._embedder
176
177    @property
178    def chunker(self) -> Chunker:
179        return self._chunker
180
181    @property
182    def batch_size(self) -> int:
183        return self._batch_size
184
185    @property
186    def max_tokens(self) -> int | None:
187        return self._max_tokens
188
189    @staticmethod
190    def _resolve_batch_size(batch_size: int | None, embedder: Embedding) -> int:
191        bs = batch_size if batch_size is not None else embedder.default_batch_size
192        if bs is None:
193            raise ValueError(
194                f"{type(embedder).__name__} does not declare a "
195                "default_batch_size. Pass batch_size= to RetrievalRuntime "
196                "or set default_batch_size on the embedder class."
197            )
198        return bs
199
200    async def ingest(
201        self,
202        loader: BaseDocumentLoader,
203        *,
204        scope: StoreScope | None = None,
205    ) -> AsyncGenerator[
206        BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped, None
207    ]:
208        """Stream loader → chunker → embedder → store, yielding per-batch events.
209
210        Args:
211            loader: Source of ``Document`` objects to ingest.
212            scope: Tag written onto every ``StoreEntry`` produced by this
213                call. Single-tenant callers can leave this ``None``.
214
215        Yields:
216            ``BatchIngested`` after each successful batch finishes writing,
217            ``EmbeddingFailure`` for any failed batch, and ``DocumentFailed``
218            once at end-of-document for each document that had any failed
219            batch. Successful batches for a partially-failed document are
220            still written; ``DocumentFailed`` signals the partial state.
221        """
222        stats = IngestionStats()
223        async for event in self._ingest_with_stats(loader, stats, scope):
224            if self._on_ingest is not None:
225                self._on_ingest(event)
226            yield event
227
228    async def ingest_all(
229        self,
230        loader: BaseDocumentLoader,
231        *,
232        scope: StoreScope | None = None,
233    ) -> IngestionStats:
234        """Drain `ingest` and return aggregate counts."""
235        stats = IngestionStats()
236        async for event in self._ingest_with_stats(loader, stats, scope):
237            if self._on_ingest is not None:
238                self._on_ingest(event)
239        return stats
240
241    async def _ingest_with_stats(
242        self,
243        loader: BaseDocumentLoader,
244        stats: IngestionStats,
245        scope: StoreScope | None,
246    ) -> AsyncGenerator[
247        BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped, None
248    ]:
249        async for doc in loader.astream():
250            async for event in self._ingest_document(doc, stats, scope):
251                yield event
252
253    async def _ingest_document(
254        self, doc: Document, stats: IngestionStats, scope: StoreScope | None
255    ) -> AsyncGenerator[
256        BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped, None
257    ]:
258        stats.documents_loaded += 1
259        doc.content_hash = _content_hash(doc.content)
260
261        await self._ensure_captured_model_seeded()
262
263        if await self._is_complete_duplicate(doc):
264            stats.documents_skipped += 1
265            yield DocumentSkipped(document_id=doc.id, source=doc.source)
266            return
267
268        chunks = await self._chunker.achunk(doc)
269        stats.chunks_created += len(chunks)
270        if not chunks:
271            return
272
273        self._stamp_staleness_metadata(doc, chunks)
274
275        # Token-size guard: drop oversized chunks before embedding to avoid
276        # provider 4xx errors. Each oversize chunk surfaces as an
277        # EmbeddingFailure carried into the document's accumulated errors.
278        doc_errors: list[Exception] = []
279        chunks, failures = self._split_oversized(chunks, stats)
280        for failure in failures:
281            doc_errors.extend(failure.errors)
282            yield failure
283        if not chunks:
284            if doc_errors:
285                yield self._record_document_failed(doc, doc_errors, stats)
286            return
287
288        # Stamp the final (post-token-guard) chunk count onto every chunk so a
289        # later staleness check can tell a complete document from a
290        # partially-written one. Every chunk carries the same total, so reading
291        # any one persisted chunk reveals how many were expected.
292        for chunk in chunks:
293            chunk.metadata["doc_chunk_count"] = len(chunks)
294
295        async for event in self._embed_and_store(doc, chunks, stats, doc_errors, scope):
296            yield event
297
298        if doc_errors:
299            yield self._record_document_failed(doc, doc_errors, stats)
300
301    async def _is_complete_duplicate(self, doc: Document) -> bool:
302        """Whether the store already holds a *complete* copy of ``doc``.
303
304        Skip re-embedding only when as many chunks are present as the last
305        write expected. A partially-written document (some chunks present after
306        an interrupted ingest) has fewer than expected and is re-ingested rather
307        than left broken. find() is metadata-only (no vector search); the second
308        call caps its work at the document's own chunk count, and only runs when
309        a prior version exists. (Counting is done via find() rather than a
310        count() call so the runtime depends only on the Store protocol.)
311        """
312        if doc.source is None:
313            return False
314        stale_filters = {
315            "source_path": doc.source,
316            "content_hash": doc.content_hash,
317        }
318        existing = await self._store.find(stale_filters, limit=1)
319        if not existing:
320            return False
321        expected = existing[0].chunk_metadata.get("doc_chunk_count")
322        if expected is None:
323            # Legacy entry written before count-aware staleness:
324            # preserve the original "exists => complete" behavior.
325            return True
326        present = await self._store.find(stale_filters, limit=expected)
327        return len(present) >= expected
328
329    @staticmethod
330    def _stamp_staleness_metadata(doc: Document, chunks: list[Chunk]) -> None:
331        """Inject staleness-detection metadata into every chunk so future
332        `find` calls can identify whether this document has changed."""
333        for chunk in chunks:
334            if doc.source is not None:
335                chunk.metadata.setdefault("source_path", doc.source)
336            if doc.content_hash is not None:
337                chunk.metadata.setdefault("content_hash", doc.content_hash)
338
339    def _split_oversized(
340        self, chunks: list[Chunk], stats: IngestionStats
341    ) -> tuple[list[Chunk], list[EmbeddingFailure]]:
342        """Partition chunks into embeddable ones and per-chunk failures.
343
344        Returns ``(ok_chunks, failures)``; each oversize chunk becomes a
345        single-chunk ``EmbeddingFailure`` and is recorded in ``stats``.
346        """
347        if self._max_tokens is None or self._tokenizer is None:
348            return chunks, []
349        ok_chunks: list[Chunk] = []
350        failures: list[EmbeddingFailure] = []
351        for chunk in chunks:
352            tokens = self._tokenizer.count(chunk.content)
353            if tokens > self._max_tokens:
354                err = ValueError(
355                    f"chunk {chunk.id} has {tokens} tokens "
356                    f"(>{self._max_tokens}); dropped before embedding"
357                )
358                stats.batches_failed += 1
359                failure = EmbeddingFailure(chunks=[chunk], errors=[err])
360                stats.batch_failures.append(failure)
361                failures.append(failure)
362            else:
363                ok_chunks.append(chunk)
364        return ok_chunks, failures
365
366    async def _embed_and_store(
367        self,
368        doc: Document,
369        chunks: list[Chunk],
370        stats: IngestionStats,
371        doc_errors: list[Exception],
372        scope: StoreScope | None,
373    ) -> AsyncGenerator[
374        BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped, None
375    ]:
376        # batch_index is per-document: it counts batches (successful and
377        # failed) within this document and resets for the next one.
378        batch_index = 0
379        delete_done = False
380        async for batch in self._embedder.astream_batches(
381            chunks, batch_size=self._batch_size
382        ):
383            if isinstance(batch, EmbeddingResult):
384                # Check model BEFORE delete_where / write — a mismatch here
385                # must not corrupt the store by clearing prior chunks first.
386                self._check_model(batch.metrics.model)
387                if not delete_done:
388                    await self._store.delete_where({"document_id": str(doc.id)})
389                    delete_done = True
390                for embedded in batch.chunks:
391                    self._capture_model(embedded)
392                    entry = StoreEntry.from_chunk(embedded, scope=scope)
393                    await self._store.write(entry)
394                stats.chunks_embedded += len(batch.chunks)
395                stats.total_metrics = stats.total_metrics + batch.metrics
396                yield BatchIngested(
397                    document_id=doc.id,
398                    embedded_chunks=batch.chunks,
399                    batch_index=batch_index,
400                    metrics=batch.metrics,
401                )
402            else:
403                doc_errors.extend(batch.errors)
404                stats.batches_failed += 1
405                stats.batch_failures.append(batch)
406                yield batch
407            batch_index += 1
408
409    def _capture_model(self, embedded: EmbeddedChunk) -> None:
410        """Record the embedding model from the first successful chunk so later
411        retrieve() calls can enforce model consistency."""
412        if self._captured_model is None and embedded.embedding_model:
413            self._captured_model = embedded.embedding_model
414            logger.info(
415                "RetrievalRuntime captured embedding model %r "
416                "from first successful batch; subsequent retrieve() "
417                "calls will enforce this model.",
418                self._captured_model,
419            )
420
421    async def _ensure_captured_model_seeded(self) -> None:
422        """Lazily seed ``_captured_model`` from an existing store entry so the
423        guard survives across process restarts. ``StoreEntry.embedding_model``
424        is recorded on every persisted entry, so a single ``find`` call is
425        enough — no schema change required. Runs at most once per runtime;
426        a miss against an empty store sets ``_seed_attempted`` so we don't
427        re-query on every doc."""
428        if self._captured_model is not None or self._seed_attempted:
429            return
430        self._seed_attempted = True
431        existing = await self._store.find({}, limit=1)
432        if existing and existing[0].embedding_model:
433            self._captured_model = existing[0].embedding_model
434            logger.info(
435                "RetrievalRuntime seeded captured embedding model %r from "
436                "an existing store entry; mismatched embedders will raise.",
437                self._captured_model,
438            )
439
440    def _check_model(self, embed_model: str | None) -> None:
441        """Raise if ``embed_model`` disagrees with the captured model."""
442        if (
443            self._captured_model is not None
444            and embed_model
445            and embed_model != self._captured_model
446        ):
447            raise EmbeddingModelMismatchError(
448                f"Embedder produced vectors with model {embed_model!r} but "
449                f"store was built with {self._captured_model!r}. Similarity "
450                "scores across models are meaningless; rebuild the store "
451                "with the correct embedder or switch embedders."
452            )
453
454    @staticmethod
455    def _record_document_failed(
456        doc: Document, doc_errors: list[Exception], stats: IngestionStats
457    ) -> DocumentFailed:
458        failed = DocumentFailed(
459            document_id=doc.id,
460            source=doc.source,
461            errors=doc_errors,
462        )
463        stats.documents_failed += 1
464        stats.failed_documents.append(failed)
465        return failed
466
467    async def delete_document(self, document_id: UUID) -> None:
468        """Remove all chunks for a document from the store.
469
470        Convenience wrapper around ``store.delete_where({"document_id": ...})``
471        so callers don't need to know the metadata key.
472        """
473        await self._store.delete_where({"document_id": str(document_id)})
474
475    async def retrieve(
476        self,
477        query: str,
478        top_k: int = 5,
479        metadata_filters: dict[str, Any] | None = None,
480        scope: StoreScope | None = None,
481    ) -> RetrievalResult:
482        """Embed ``query`` and return the top ``top_k`` matches from the store.
483
484        Args:
485            query: The text to embed and search with.
486            top_k: Maximum number of results.
487            metadata_filters: Additional equality filters on chunk metadata.
488            scope: Restricts the search to entries written with the same
489                scope. Leave ``None`` to search across all scopes.
490
491        Raises:
492            EmbeddingModelMismatchError: When the embedder reports a model
493                different from the one captured on first ingest.
494        """
495        await self._ensure_captured_model_seeded()
496        text_result = await self._embedder.aembed([query])
497        self._check_model(text_result.metrics.model)
498
499        store_query = StoreQuery(
500            text=query,
501            scope=scope,
502            embedding=text_result.vectors[0],
503            top_k=top_k,
504            metadata_filters=metadata_filters,
505        )
506        store_hits = await self._store.read(store_query)
507        chunks = [
508            RetrievedChunk(
509                chunk=_entry_to_chunk(hit.entry),
510                score=hit.score,
511                rank=hit.rank,
512                source_retriever=hit.source_retriever,
513                rerank_score=hit.rerank_score,
514            )
515            for hit in store_hits
516        ]
517        result = RetrievalResult(query=query, chunks=chunks)
518        if self._on_retrieve is not None:
519            self._on_retrieve(query, result)
520        return result

Orchestrates loading, chunking, embedding, storage, and retrieval.

The runtime captures how to process documents (chunker + embedder + store); the loader passed to ingest() decides what to process. A single runtime can ingest from multiple sources, mix chunking strategies via separate runtimes against the same store, and update existing documents by re-ingesting them. Multi-tenant callers share one runtime and pass scope per ingest() or retrieve() call.

Arguments:
  • chunker: Splits documents into chunks.
  • embedder: Embeds chunk text into vectors.
  • store: Receives written StoreEntrys and serves similarity search.
  • batch_size: Items per embedding batch. Falls back to embedder.default_batch_size when omitted; raises ValueError at construction if neither is set.
  • on_ingest: Synchronous callback invoked with each IngestionEvent as it is yielded. Wrap in asyncio.create_task for async logging.
  • on_retrieve: Synchronous callback invoked with the query string and the RetrievalResult after each retrieve call.
  • max_tokens: When set, chunks whose token count exceeds this limit are dropped before embedding and reported via EmbeddingFailure rather than being sent to the provider. Requires tokenizer (defaults to TiktokenTokenizer).
  • tokenizer: Tokenizer used to enforce max_tokens. Defaults to TiktokenTokenizer lazily when max_tokens is set.
RetrievalRuntime( chunker: railtracks.retrieval.chunking.base.Chunker, embedder: railtracks.retrieval.embedding.base.Embedding, store: Store, *, batch_size: int | None = None, on_ingest: Optional[Callable[[BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped], NoneType]] = None, on_retrieve: Optional[Callable[[str, RetrievalResult], NoneType]] = None, max_tokens: int | None = None, tokenizer: railtracks.retrieval.chunking.tokenization.Tokenizer | None = None)
136    def __init__(
137        self,
138        chunker: Chunker,
139        embedder: Embedding,
140        store: Store,
141        *,
142        batch_size: int | None = None,
143        on_ingest: Callable[
144            [BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped], None
145        ]
146        | None = None,
147        on_retrieve: Callable[[str, RetrievalResult], None] | None = None,
148        max_tokens: int | None = None,
149        tokenizer: Tokenizer | None = None,
150    ) -> None:
151        self._chunker = chunker
152        self._embedder = embedder
153        self._store = store
154        self._batch_size = self._resolve_batch_size(batch_size, embedder)
155        self._on_ingest = on_ingest
156        self._on_retrieve = on_retrieve
157        self._max_tokens = max_tokens
158        if max_tokens is not None and tokenizer is None:
159            from .chunking.tokenization import TiktokenTokenizer
160
161            tokenizer = TiktokenTokenizer()
162        self._tokenizer = tokenizer
163        # Captured on the first successful embedded batch and checked at
164        # retrieve time; survives process restarts by lazy-seeding from
165        # an existing store entry on the first ingest/retrieve.
166        self._captured_model: str | None = None
167        self._seed_attempted: bool = False
store: Store
169    @property
170    def store(self) -> Store:
171        return self._store
embedder: railtracks.retrieval.embedding.base.Embedding
173    @property
174    def embedder(self) -> Embedding:
175        return self._embedder
chunker: railtracks.retrieval.chunking.base.Chunker
177    @property
178    def chunker(self) -> Chunker:
179        return self._chunker
batch_size: int
181    @property
182    def batch_size(self) -> int:
183        return self._batch_size
max_tokens: int | None
185    @property
186    def max_tokens(self) -> int | None:
187        return self._max_tokens
async def ingest( self, loader: railtracks.retrieval.loaders.base.BaseDocumentLoader, *, scope: StoreScope | None = None) -> AsyncGenerator[BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped, None]:
200    async def ingest(
201        self,
202        loader: BaseDocumentLoader,
203        *,
204        scope: StoreScope | None = None,
205    ) -> AsyncGenerator[
206        BatchIngested | EmbeddingFailure | DocumentFailed | DocumentSkipped, None
207    ]:
208        """Stream loader → chunker → embedder → store, yielding per-batch events.
209
210        Args:
211            loader: Source of ``Document`` objects to ingest.
212            scope: Tag written onto every ``StoreEntry`` produced by this
213                call. Single-tenant callers can leave this ``None``.
214
215        Yields:
216            ``BatchIngested`` after each successful batch finishes writing,
217            ``EmbeddingFailure`` for any failed batch, and ``DocumentFailed``
218            once at end-of-document for each document that had any failed
219            batch. Successful batches for a partially-failed document are
220            still written; ``DocumentFailed`` signals the partial state.
221        """
222        stats = IngestionStats()
223        async for event in self._ingest_with_stats(loader, stats, scope):
224            if self._on_ingest is not None:
225                self._on_ingest(event)
226            yield event

Stream loader → chunker → embedder → store, yielding per-batch events.

Arguments:
  • loader: Source of Document objects to ingest.
  • scope: Tag written onto every StoreEntry produced by this call. Single-tenant callers can leave this None.
Yields:

BatchIngested after each successful batch finishes writing, EmbeddingFailure for any failed batch, and DocumentFailed once at end-of-document for each document that had any failed batch. Successful batches for a partially-failed document are still written; DocumentFailed signals the partial state.

async def ingest_all( self, loader: railtracks.retrieval.loaders.base.BaseDocumentLoader, *, scope: StoreScope | None = None) -> IngestionStats:
228    async def ingest_all(
229        self,
230        loader: BaseDocumentLoader,
231        *,
232        scope: StoreScope | None = None,
233    ) -> IngestionStats:
234        """Drain `ingest` and return aggregate counts."""
235        stats = IngestionStats()
236        async for event in self._ingest_with_stats(loader, stats, scope):
237            if self._on_ingest is not None:
238                self._on_ingest(event)
239        return stats

Drain ingest and return aggregate counts.

async def delete_document(self, document_id: uuid.UUID) -> None:
467    async def delete_document(self, document_id: UUID) -> None:
468        """Remove all chunks for a document from the store.
469
470        Convenience wrapper around ``store.delete_where({"document_id": ...})``
471        so callers don't need to know the metadata key.
472        """
473        await self._store.delete_where({"document_id": str(document_id)})

Remove all chunks for a document from the store.

Convenience wrapper around store.delete_where({"document_id": ...}) so callers don't need to know the metadata key.

async def retrieve( self, query: str, top_k: int = 5, metadata_filters: dict[str, typing.Any] | None = None, scope: StoreScope | None = None) -> RetrievalResult:
475    async def retrieve(
476        self,
477        query: str,
478        top_k: int = 5,
479        metadata_filters: dict[str, Any] | None = None,
480        scope: StoreScope | None = None,
481    ) -> RetrievalResult:
482        """Embed ``query`` and return the top ``top_k`` matches from the store.
483
484        Args:
485            query: The text to embed and search with.
486            top_k: Maximum number of results.
487            metadata_filters: Additional equality filters on chunk metadata.
488            scope: Restricts the search to entries written with the same
489                scope. Leave ``None`` to search across all scopes.
490
491        Raises:
492            EmbeddingModelMismatchError: When the embedder reports a model
493                different from the one captured on first ingest.
494        """
495        await self._ensure_captured_model_seeded()
496        text_result = await self._embedder.aembed([query])
497        self._check_model(text_result.metrics.model)
498
499        store_query = StoreQuery(
500            text=query,
501            scope=scope,
502            embedding=text_result.vectors[0],
503            top_k=top_k,
504            metadata_filters=metadata_filters,
505        )
506        store_hits = await self._store.read(store_query)
507        chunks = [
508            RetrievedChunk(
509                chunk=_entry_to_chunk(hit.entry),
510                score=hit.score,
511                rank=hit.rank,
512                source_retriever=hit.source_retriever,
513                rerank_score=hit.rerank_score,
514            )
515            for hit in store_hits
516        ]
517        result = RetrievalResult(query=query, chunks=chunks)
518        if self._on_retrieve is not None:
519            self._on_retrieve(query, result)
520        return result

Embed query and return the top top_k matches from the store.

Arguments:
  • query: The text to embed and search with.
  • top_k: Maximum number of results.
  • metadata_filters: Additional equality filters on chunk metadata.
  • scope: Restricts the search to entries written with the same scope. Leave None to search across all scopes.
Raises:
  • EmbeddingModelMismatchError: When the embedder reports a model different from the one captured on first ingest.
@dataclass
class RetrievedChunk:
92@dataclass
93class RetrievedChunk:
94    chunk: Chunk
95    score: float
96    rank: int
97    source_retriever: str | None = None
98    rerank_score: float | None = None
RetrievedChunk( chunk: Chunk, score: float, rank: int, source_retriever: str | None = None, rerank_score: float | None = None)
chunk: Chunk
score: float
rank: int
source_retriever: str | None = None
rerank_score: float | None = None
@runtime_checkable
class Store(typing.Protocol):
10@runtime_checkable
11class Store(Protocol):
12    async def write(self, entry: StoreEntry) -> str: ...
13    async def read(self, query: StoreQuery) -> list[RetrievedStoreEntry]: ...
14    async def delete(self, id: UUID) -> None: ...
15    async def clear(self, scope: StoreScope) -> None: ...
16    async def delete_where(self, filters: dict[str, Any]) -> None: ...
17    async def find(
18        self, filters: dict[str, Any], limit: int = 1
19    ) -> list[StoreEntry]: ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...
Store(*args, **kwargs)
1431def _no_init_or_replace_init(self, *args, **kwargs):
1432    cls = type(self)
1433
1434    if cls._is_protocol:
1435        raise TypeError('Protocols cannot be instantiated')
1436
1437    # Already using a custom `__init__`. No need to calculate correct
1438    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1439    if cls.__init__ is not _no_init_or_replace_init:
1440        return
1441
1442    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1443    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1444    # searches for a proper new `__init__` in the MRO. The new `__init__`
1445    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1446    # instantiation of the protocol subclass will thus use the new
1447    # `__init__` and no longer call `_no_init_or_replace_init`.
1448    for base in cls.__mro__:
1449        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1450        if init is not _no_init_or_replace_init:
1451            cls.__init__ = init
1452            break
1453    else:
1454        # should not happen
1455        cls.__init__ = object.__init__
1456
1457    cls.__init__(self, *args, **kwargs)
async def write(self, entry: StoreEntry) -> str:
12    async def write(self, entry: StoreEntry) -> str: ...
async def read( self, query: StoreQuery) -> list[railtracks.retrieval.stores.models.RetrievedStoreEntry]:
13    async def read(self, query: StoreQuery) -> list[RetrievedStoreEntry]: ...
async def delete(self, id: uuid.UUID) -> None:
14    async def delete(self, id: UUID) -> None: ...
async def clear(self, scope: StoreScope) -> None:
15    async def clear(self, scope: StoreScope) -> None: ...
async def delete_where(self, filters: dict[str, typing.Any]) -> None:
16    async def delete_where(self, filters: dict[str, Any]) -> None: ...
async def find( self, filters: dict[str, typing.Any], limit: int = 1) -> list[StoreEntry]:
17    async def find(
18        self, filters: dict[str, Any], limit: int = 1
19    ) -> list[StoreEntry]: ...
@dataclass
class StoreEntry:
 44@dataclass
 45class StoreEntry:
 46    # Required fields
 47    id: UUID
 48    content: str
 49    vector: list[float] | None
 50    embedding_model: str
 51    chunk_id: UUID
 52    document_id: UUID
 53    # Optional enrichment fields
 54    abstract: str | None = None
 55    summary: str | None = None
 56    scope: StoreScope | None = None
 57    # Optional chunk provenance
 58    chunk_index: int = 0
 59    parent_chunk_id: UUID | None = None
 60    chunk_offsets: tuple[int, int] | None = None
 61    chunk_metadata: dict = field(default_factory=dict)
 62    # Optional embedding provenance
 63    embedding_version: str | None = None
 64    # Optional store metadata
 65    entities: list[Entity] | None = None
 66    valid_from: datetime | None = None
 67    valid_until: datetime | None = None
 68    created_at: datetime = field(default_factory=lambda: datetime.now(tz=timezone.utc))
 69
 70    @classmethod
 71    def from_chunk(
 72        cls,
 73        embedded_chunk: EmbeddedChunk,
 74        *,
 75        scope: StoreScope | None = None,
 76        abstract: str | None = None,
 77        summary: str | None = None,
 78        entities: list[Entity] | None = None,
 79        valid_from: datetime | None = None,
 80        valid_until: datetime | None = None,
 81    ) -> StoreEntry:
 82        chunk = embedded_chunk.chunk
 83        return cls(
 84            id=uuid4(),
 85            content=chunk.content,
 86            vector=embedded_chunk.vector,
 87            embedding_model=embedded_chunk.embedding_model,
 88            embedding_version=embedded_chunk.embedding_version,
 89            chunk_id=chunk.id,
 90            document_id=chunk.document_id,
 91            chunk_index=chunk.index,
 92            parent_chunk_id=chunk.parent_chunk_id,
 93            chunk_offsets=chunk.offsets,
 94            chunk_metadata=chunk.metadata,
 95            scope=scope,
 96            abstract=abstract,
 97            summary=summary,
 98            entities=entities,
 99            valid_from=valid_from,
100            valid_until=valid_until,
101        )
StoreEntry( id: uuid.UUID, content: str, vector: list[float] | None, embedding_model: str, chunk_id: uuid.UUID, document_id: uuid.UUID, abstract: str | None = None, summary: str | None = None, scope: StoreScope | None = None, chunk_index: int = 0, parent_chunk_id: uuid.UUID | None = None, chunk_offsets: tuple[int, int] | None = None, chunk_metadata: dict = <factory>, embedding_version: str | None = None, entities: list[railtracks.retrieval.stores.models.Entity] | None = None, valid_from: datetime.datetime | None = None, valid_until: datetime.datetime | None = None, created_at: datetime.datetime = <factory>)
id: uuid.UUID
content: str
vector: list[float] | None
embedding_model: str
chunk_id: uuid.UUID
document_id: uuid.UUID
abstract: str | None = None
summary: str | None = None
scope: StoreScope | None = None
chunk_index: int = 0
parent_chunk_id: uuid.UUID | None = None
chunk_offsets: tuple[int, int] | None = None
chunk_metadata: dict
embedding_version: str | None = None
entities: list[railtracks.retrieval.stores.models.Entity] | None = None
valid_from: datetime.datetime | None = None
valid_until: datetime.datetime | None = None
created_at: datetime.datetime
@classmethod
def from_chunk( cls, embedded_chunk: EmbeddedChunk, *, scope: StoreScope | None = None, abstract: str | None = None, summary: str | None = None, entities: list[railtracks.retrieval.stores.models.Entity] | None = None, valid_from: datetime.datetime | None = None, valid_until: datetime.datetime | None = None) -> StoreEntry:
 70    @classmethod
 71    def from_chunk(
 72        cls,
 73        embedded_chunk: EmbeddedChunk,
 74        *,
 75        scope: StoreScope | None = None,
 76        abstract: str | None = None,
 77        summary: str | None = None,
 78        entities: list[Entity] | None = None,
 79        valid_from: datetime | None = None,
 80        valid_until: datetime | None = None,
 81    ) -> StoreEntry:
 82        chunk = embedded_chunk.chunk
 83        return cls(
 84            id=uuid4(),
 85            content=chunk.content,
 86            vector=embedded_chunk.vector,
 87            embedding_model=embedded_chunk.embedding_model,
 88            embedding_version=embedded_chunk.embedding_version,
 89            chunk_id=chunk.id,
 90            document_id=chunk.document_id,
 91            chunk_index=chunk.index,
 92            parent_chunk_id=chunk.parent_chunk_id,
 93            chunk_offsets=chunk.offsets,
 94            chunk_metadata=chunk.metadata,
 95            scope=scope,
 96            abstract=abstract,
 97            summary=summary,
 98            entities=entities,
 99            valid_from=valid_from,
100            valid_until=valid_until,
101        )
@dataclass
class StoreQuery:
113@dataclass
114class StoreQuery:
115    text: str
116    scope: StoreScope | None = None
117    embedding: list[float] | None = None
118    top_k: int = 10
119    metadata_filters: dict[str, Any] | None = None
StoreQuery( text: str, scope: StoreScope | None = None, embedding: list[float] | None = None, top_k: int = 10, metadata_filters: dict[str, typing.Any] | None = None)
text: str
scope: StoreScope | None = None
embedding: list[float] | None = None
top_k: int = 10
metadata_filters: dict[str, typing.Any] | None = None
@dataclass(frozen=True)
class StoreScope:
21@dataclass(frozen=True)
22class StoreScope:
23    """Equality-filter namespace for store entries.
24
25    Each entry in ``labels`` becomes a mandatory equality filter on every
26    write and read. The retrieval module is agnostic about what dimensions
27    you scope by — pick whichever axes fit your tenancy model::
28
29        StoreScope(labels={"user_id": "alice"})  # SaaS tenancy
30        StoreScope(labels={"organization": "acme", "environment": "prod"})  # B2B
31        StoreScope(labels={"agent_id": "docs-bot", "session_id": "s1"})  # agent context
32        StoreScope(labels={"account_id": 42, "is_prod": True})  # non-string scalars
33
34    The ``scope_`` prefix applied in :meth:`to_payload_filters` avoids key
35    collisions in flat payload dicts that also carry content fields.
36    """
37
38    labels: Mapping[str, Any] = field(default_factory=dict)
39
40    def to_payload_filters(self) -> dict[str, Any]:
41        return {f"scope_{k}": v for k, v in self.labels.items()}

Equality-filter namespace for store entries.

Each entry in labels becomes a mandatory equality filter on every write and read. The retrieval module is agnostic about what dimensions you scope by — pick whichever axes fit your tenancy model::

StoreScope(labels={"user_id": "alice"})  # SaaS tenancy
StoreScope(labels={"organization": "acme", "environment": "prod"})  # B2B
StoreScope(labels={"agent_id": "docs-bot", "session_id": "s1"})  # agent context
StoreScope(labels={"account_id": 42, "is_prod": True})  # non-string scalars

The scope_ prefix applied in to_payload_filters() avoids key collisions in flat payload dicts that also carry content fields.

StoreScope(labels: Mapping[str, typing.Any] = <factory>)
labels: Mapping[str, typing.Any]
def to_payload_filters(self) -> dict[str, typing.Any]:
40    def to_payload_filters(self) -> dict[str, Any]:
41        return {f"scope_{k}": v for k, v in self.labels.items()}
class VectorStore:
184class VectorStore:
185    """Cosine similarity search over StoreEntry vectors.
186
187    Satisfies the Store protocol. Does not inherit from any base class.
188    """
189
190    def __init__(self, backend: VectorBackend) -> None:
191        self._backend = backend
192
193    async def write(self, entry: StoreEntry) -> str:
194        if entry.vector is None:
195            logger.error(
196                "VectorStore.write called with entry.vector=None (entry_id=%s, "
197                "chunk_id=%s); the entry must be embedded before writing.",
198                entry.id,
199                entry.chunk_id,
200            )
201            raise ValueError(
202                f"VectorStore.write requires entry.vector to be set "
203                f"(entry_id={entry.id}); embed the chunk before writing."
204            )
205        await self._backend.upsert(
206            str(entry.id), entry.vector, _entry_to_payload(entry)
207        )
208        return str(entry.id)
209
210    async def read(self, query: StoreQuery) -> list[RetrievedStoreEntry]:
211        if query.embedding is None:
212            raise ValueError(
213                "VectorStore.read requires query.embedding to be set; "
214                "caller must supply a pre-computed embedding."
215            )
216
217        filters: dict[str, Any] = (
218            query.scope.to_payload_filters() if query.scope is not None else {}
219        )
220        if query.metadata_filters:
221            filters.update(query.metadata_filters)
222
223        raw_hits = await self._backend.search(query.embedding, query.top_k, filters)
224
225        results: list[RetrievedStoreEntry] = []
226        for rank, (hit_id, score, payload) in enumerate(raw_hits):
227            entry = _payload_to_entry(hit_id, payload)
228            results.append(
229                RetrievedStoreEntry(
230                    entry=entry,
231                    score=score,
232                    rank=rank,
233                    source_retriever="dense",
234                )
235            )
236        return results
237
238    async def delete(self, id: UUID) -> None:
239        await self._backend.delete(str(id))
240
241    async def clear(self, scope: StoreScope) -> None:
242        await self._backend.delete_where(scope.to_payload_filters())
243
244    async def delete_where(self, filters: dict[str, Any]) -> None:
245        await self._backend.delete_where(filters)
246
247    async def find(self, filters: dict[str, Any], limit: int = 1) -> list[StoreEntry]:
248        raw_hits = await self._backend.list_where(filters, limit)
249        return [_payload_to_entry(hit_id, payload) for hit_id, payload in raw_hits]
250
251    async def count(self, filters: dict[str, Any] | None = None) -> int:
252        return await self._backend.count(filters or {})
253
254    async def nearest_neighbors(
255        self,
256        embedding: list[float],
257        k: int,
258        scope: StoreScope | None = None,
259    ) -> list[RetrievedStoreEntry]:
260        filters = scope.to_payload_filters() if scope is not None else {}
261        raw_hits = await self._backend.search(embedding, k, filters)
262
263        results: list[RetrievedStoreEntry] = []
264        for rank, (hit_id, score, payload) in enumerate(raw_hits):
265            entry = _payload_to_entry(hit_id, payload)
266            results.append(
267                RetrievedStoreEntry(
268                    entry=entry,
269                    score=score,
270                    rank=rank,
271                    source_retriever="dense",
272                )
273            )
274        return results

Cosine similarity search over StoreEntry vectors.

Satisfies the Store protocol. Does not inherit from any base class.

VectorStore(backend: railtracks.retrieval.stores.vector.base.VectorBackend)
190    def __init__(self, backend: VectorBackend) -> None:
191        self._backend = backend
async def write(self, entry: StoreEntry) -> str:
193    async def write(self, entry: StoreEntry) -> str:
194        if entry.vector is None:
195            logger.error(
196                "VectorStore.write called with entry.vector=None (entry_id=%s, "
197                "chunk_id=%s); the entry must be embedded before writing.",
198                entry.id,
199                entry.chunk_id,
200            )
201            raise ValueError(
202                f"VectorStore.write requires entry.vector to be set "
203                f"(entry_id={entry.id}); embed the chunk before writing."
204            )
205        await self._backend.upsert(
206            str(entry.id), entry.vector, _entry_to_payload(entry)
207        )
208        return str(entry.id)
async def read( self, query: StoreQuery) -> list[railtracks.retrieval.stores.models.RetrievedStoreEntry]:
210    async def read(self, query: StoreQuery) -> list[RetrievedStoreEntry]:
211        if query.embedding is None:
212            raise ValueError(
213                "VectorStore.read requires query.embedding to be set; "
214                "caller must supply a pre-computed embedding."
215            )
216
217        filters: dict[str, Any] = (
218            query.scope.to_payload_filters() if query.scope is not None else {}
219        )
220        if query.metadata_filters:
221            filters.update(query.metadata_filters)
222
223        raw_hits = await self._backend.search(query.embedding, query.top_k, filters)
224
225        results: list[RetrievedStoreEntry] = []
226        for rank, (hit_id, score, payload) in enumerate(raw_hits):
227            entry = _payload_to_entry(hit_id, payload)
228            results.append(
229                RetrievedStoreEntry(
230                    entry=entry,
231                    score=score,
232                    rank=rank,
233                    source_retriever="dense",
234                )
235            )
236        return results
async def delete(self, id: uuid.UUID) -> None:
238    async def delete(self, id: UUID) -> None:
239        await self._backend.delete(str(id))
async def clear(self, scope: StoreScope) -> None:
241    async def clear(self, scope: StoreScope) -> None:
242        await self._backend.delete_where(scope.to_payload_filters())
async def delete_where(self, filters: dict[str, typing.Any]) -> None:
244    async def delete_where(self, filters: dict[str, Any]) -> None:
245        await self._backend.delete_where(filters)
async def find( self, filters: dict[str, typing.Any], limit: int = 1) -> list[StoreEntry]:
247    async def find(self, filters: dict[str, Any], limit: int = 1) -> list[StoreEntry]:
248        raw_hits = await self._backend.list_where(filters, limit)
249        return [_payload_to_entry(hit_id, payload) for hit_id, payload in raw_hits]
async def count(self, filters: dict[str, typing.Any] | None = None) -> int:
251    async def count(self, filters: dict[str, Any] | None = None) -> int:
252        return await self._backend.count(filters or {})
async def nearest_neighbors( self, embedding: list[float], k: int, scope: StoreScope | None = None) -> list[railtracks.retrieval.stores.models.RetrievedStoreEntry]:
254    async def nearest_neighbors(
255        self,
256        embedding: list[float],
257        k: int,
258        scope: StoreScope | None = None,
259    ) -> list[RetrievedStoreEntry]:
260        filters = scope.to_payload_filters() if scope is not None else {}
261        raw_hits = await self._backend.search(embedding, k, filters)
262
263        results: list[RetrievedStoreEntry] = []
264        for rank, (hit_id, score, payload) in enumerate(raw_hits):
265            entry = _payload_to_entry(hit_id, payload)
266            results.append(
267                RetrievedStoreEntry(
268                    entry=entry,
269                    score=score,
270                    rank=rank,
271                    source_retriever="dense",
272                )
273            )
274        return results