Ingestion
Ingestion is the write path. A loader produces Documents, the chunker
splits them, the embedder vectorizes the chunks, and the store persists the
result. RetrievalRuntime runs all four stages as an async stream without
buffering the corpus and waiting for a stage to drain before the next one
starts.
This page covers what you need to run that stream safely in production: streaming events, re-ingest semantics, multi-tenant writes, sanitization, and the token guard.
For loader-specific details (TextLoader, PDFs, Hugging Face datasets, custom sources) see the Ingestion components pages.
ingest_all: one-shot summary
The simplest entry point. Drains the stream and returns an IngestionStats:
async def main(runtime: RetrievalRuntime):
# Drains the stream and returns an IngestionStats summary.
stats = await runtime.ingest_all(TextLoader("./docs"))
print(
f"loaded={stats.documents_loaded} "
f"chunks={stats.chunks_embedded} "
f"skipped={stats.documents_skipped}"
)
Use ingest_all for batch jobs and tests where you only care about the
summary. Use ingest (below) anywhere you need to surface progress, react to
failures, or stream events to another system.
Streaming events
runtime.ingest() is an async generator. Each yield is a typed event -
match on it to log, retry, or short-circuit:
from railtracks.retrieval import (
BatchIngested,
DocumentFailed,
DocumentSkipped,
EmbeddingFailure,
RetrievalRuntime,
)
async def stream(runtime: RetrievalRuntime, loader):
async for event in runtime.ingest(loader):
match event:
case BatchIngested(document_id=did, embedded_chunks=ch, batch_index=i):
print(f" + doc={did} batch={i} chunks={len(ch)}")
case EmbeddingFailure(errors=errs):
print(f" ! batch failed: {errs[0]}")
case DocumentFailed(document_id=did):
print(f" ! doc {did} ended with failures")
case DocumentSkipped(source=src):
print(f" ~ skipped (unchanged): {src}")
.ingest(..) event outputs |
Meaning |
|---|---|
BatchIngested |
A batch of chunks finished embedding and was written. |
EmbeddingFailure |
A batch failed mid-flight. The stream continues; successful batches for the same document are still written. |
DocumentFailed |
End-of-document signalling at least one batch in the document failed. |
DocumentSkipped |
The document's source + content_hash matches an existing entry; no embedding call was made. |
Failures are surfaced as events, not raised. This results in bulk re-indexing across thousands of documents not aborting due to one batch failure.
Re-ingest semantics
from railtracks.retrieval import Document, DocumentType
class SampleLoader(BaseDocumentLoader):
doc = Document(content="...", type=DocumentType.TEXT, source="handbook.md")
async def astream(self):
yield self.doc
async def reingest(runtime: RetrievalRuntime):
# Same Document.id → upsert. The runtime clears prior chunks for this
# document after the first batch succeeds, then writes the new ones.
# A full embedding failure leaves the prior version intact.
await runtime.ingest_all(loader=SampleLoader())
# Same source + identical content → skipped via SHA-256 content_hash lookup.
# ingest() yields DocumentSkipped without calling the embedder.
await runtime.ingest_all(loader=SampleLoader())
Two paths, both safe to re-run on the same loader:
- Upsert by
Document.id. Callingingestwith aDocumentwhoseidalready exists in the store triggersstore.delete_where({"document_id": ...})after the first successful batch lands, then writes the new chunks. If every batch fails, the delete never fires and the prior version is preserved. - Skip by
source+ content hash. The runtime SHA-256s each document's content and looks it up viastore.find. If a row with the samesourceandcontent_hashexists, the runtime yieldsDocumentSkippedwithout calling the embedder. This is what makesingest()cheaply idempotent against a stable source.
Multi-tenant writes
Pass StoreScope per call to allow stores to hold multi-tenants:
from railtracks.retrieval.stores import StoreScope # noqa: E402
async def multitenant_write():
# One runtime, one store, one set of infrastructure. Scope is a
# per-call parameter — not a constructor argument — because it's
# request-level context, not runtime config.
runtime = RetrievalRuntime(
chunker=RecursiveCharacterChunker(chunk_size=800),
embedder=OpenAIEmbedding(),
store=VectorStore(InMemoryVectorBackend()),
)
await runtime.ingest_all(
TextLoader("./alice_docs"), scope=StoreScope(labels={"user_id": "alice"})
)
await runtime.ingest_all(
TextLoader("./bob_docs"), scope=StoreScope(labels={"user_id": "bob"})
)
# Reads pass scope the same way; nothing crosses tenants.
await runtime.retrieve("favorite color", scope=StoreScope(labels={"user_id": "alice"}))
StoreScope wraps an open labels: Mapping[str, Any]. Pick whichever
axes fit your tenancy model. {"user_id": "alice"} for SaaS,
{"organization": "acme", "environment": "prod"} for B2B, anything else
that makes sense. Each label becomes a mandatory equality filter on every
write and read. One store can back any number of tenants; the cost of
multi-tenancy is the cost of one extra payload field.
Scope is request-level context, not runtime config; that's why it lives
on ingest() / ingest_all() / retrieve() rather than on the
constructor. Single-tenant callers just omit it.
Sanitizing loaders
SanitizingLoader(inner, sanitizer) wraps any loader and passes every
Document through your Sanitizer before it reaches the chunker. The
Sanitizer protocol is one method: .sanitize(document: Document) ->
Document (sync or async). Use it for PII redaction or any per-document
normalization:
import re
from railtracks.retrieval.loaders import SanitizingLoader, TextLoader
class EmailRedactor:
"""Implements the Sanitizer protocol: .sanitize(Document) -> Document.
Stateful by design — real redactors hold compiled regexes, denylists,
or an async client to a DLP service."""
EMAIL = re.compile(r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b")
def sanitize(self, document: Document) -> Document:
document.content = self.EMAIL.sub("[REDACTED]", document.content)
return document
# Wrap any loader. Sanitizer runs once per Document, before chunking.
loader: BaseDocumentLoader = SanitizingLoader(TextLoader("./hr_docs"), sanitizer=EmailRedactor())
Sanitization runs once per document, not per chunk; content_hash is computed on the sanitized text, so the skip-by-hash path stays accurate.
Audit hooks
on_ingest=callback fires for every event the stream yields. Especially
useful with ingest_all, where you never see events directly and the hook
is your only window into per-batch progress. Use it for audit logs,
metrics, or write-ahead logging:
import asyncio
def audit(event):
# Sync hook — runs inline with the ingest stream. Keep it cheap:
# logging, in-process counters, metrics .inc() calls.
print(f"[audit] {type(event).__name__}: {event}")
def audit_async(event):
# Slow or async work? Schedule it and return immediately so the
# stream keeps moving. Errors surface on the task, not on ingest.
asyncio.create_task(_send_to_audit_log(event))
async def _send_to_audit_log(event) -> None:
# Stand-in for a real async client (asyncpg, aiohttp webhook, etc.).
...
def build_with_hook():
return RetrievalRuntime(
chunker=RecursiveCharacterChunker(chunk_size=800),
embedder=OpenAIEmbedding(),
store=VectorStore(InMemoryVectorBackend()),
on_ingest=audit,
)
The callback runs inline with the ingest stream. For anything async or
slow (DB writes, HTTP webhooks), wrap the body in asyncio.create_task(...)
so the stream keeps moving and errors then surface on the task instead of
bubbling out of the ingest call.
Token guard
Embedding providers reject oversize inputs server-side. max_tokens drops
those chunks pre-flight and surfaces them as EmbeddingFailure events
instead of provider 400s:
def build_with_token_guard():
# Chunks above 8000 tokens are dropped pre-flight and surface as
# EmbeddingFailure events rather than provider-side 400s. A default
# TiktokenTokenizer is wired up automatically when max_tokens is set.
return RetrievalRuntime(
chunker=RecursiveCharacterChunker(chunk_size=2000),
embedder=OpenAIEmbedding(model="text-embedding-3-small"),
store=VectorStore(InMemoryVectorBackend()),
max_tokens=8000, # OpenAI's text-embedding-3 hard cap is 8191
)
When max_tokens is set without a tokenizer, RetrievalRuntime wires up
TiktokenTokenizer automatically. Override with tokenizer=... if you need
a non-OpenAI tokenizer.
Pick the limit conservatively. OpenAI's text-embedding-3 family caps at
8191 tokens; setting max_tokens=8000 leaves headroom for whatever the
provider counts differently than tiktoken.
Related
- Retrieval: the read path:
retrieve(), filters, scope overrides, and patterns for wiring retrieval into an agent. - Components → Ingestion: the
built-in loaders, the
Documentshape, custom loaders. - Components → Design: internals: streaming
concurrency model, the
Storeprotocol, stage contracts.