Loading: Built-in loaders
Six loaders ship under railtracks.retrieval.loaders. Pick one based on
your source format, and reach for a custom loader when
none of these fit.
Summary
| Loader | Source | One Document per | Extras |
|---|---|---|---|
TextLoader |
.txt / .md files (or directories) |
File | None |
CSVLoader |
.csv files (or directories) |
Row | None |
JSONLoader |
.json files (or directories) |
Top-level object | None |
PyPDFLoader |
PDFs with a text layer | Page (default) or whole file | railtracks[pdf] |
PyPDFOCRLoader |
PDFs that include scanned images | Page (default) or whole file | railtracks[ocr] + Tesseract binary |
HuggingFaceDatasetLoader |
Any dataset on the HF Hub | Row | railtracks[huggingface] |
Every loader exposes the same triple: load() (sync, materializes
everything), aload() (async, materializes everything), astream()
(async generator). For corpora larger than memory, always reach for
astream().
TextLoader
Reads .txt and .md files. Markdown files auto-get type="markdown",
which lets downstream chunkers (MarkdownHeaderChunker) pick heading-aware
splitting.
from railtracks.retrieval.loaders import TextLoader
loader = TextLoader("notes.txt")
docs = loader.load()
doc = docs[0]
print(doc.content) # full file text
print(doc.type) # "text" or "markdown"
print(doc.source) # "notes.txt"
print(doc.metadata) # {"file_type": ".txt", "encoding": "utf-8-sig"}
# Recursively loads .txt and .md files, sorted by path.
docs = TextLoader("knowledge_base/").load()
print(len(docs))
print(docs[0].source)
Directories are walked recursively; files are returned in sorted-path
order for deterministic re-ingest. Default encoding is utf-8-sig
(BOM-aware), which beats utf-8 for legacy corpora without slowing the
common case.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
file_path |
str |
required | Path to a .txt/.md file or directory |
encoding |
str |
"utf-8-sig" |
File encoding (BOM-aware) |
Document metadata: file_type (.txt or .md), encoding.
CSVLoader
One Document per row. Columns can go into content (searchable) or
metadata (filterable, not embedded).
from railtracks.retrieval.loaders import CSVLoader
# Every row becomes a Document. By default, all columns end up in content.
docs = CSVLoader("products.csv").load()
doc = docs[0]
print(doc.content) # "name: Widget\nprice: 9.99\ndescription: ..."
print(doc.type) # "csv"
print(doc.metadata) # {"row_index": 0}
With no column config, every column ends up in content: usually
not what you want. IDs, timestamps, and foreign keys add noise without
helping retrieval. Use content_columns to be explicit:
# Columns in content_columns form the searchable text.
# Everything else automatically becomes metadata (filterable downstream).
loader = CSVLoader(
"products.csv",
content_columns=["name", "description"],
)
docs = loader.load()
print(docs[0].content) # "name: Widget\ndescription: ..."
print(docs[0].metadata) # {"price": "9.99", "row_index": 0}
Columns not in content_columns automatically become metadata. Use
ignore_columns to drop fields entirely (PII, audit timestamps).
Additionally you can decide what you want to use as a separator for merging columns when loading:
# Default content_separator is "\n". Change it for single-line records.
CSVLoader(
"products.csv",
content_columns=["name", "description"],
content_separator=" | ",
)
| Parameter | Type | Default | Description |
|---|---|---|---|
file_path |
str |
required | Path to a .csv file or directory |
content_columns |
list[str] | None |
None |
Columns joined into content. None = all columns. |
ignore_columns |
list[str] | None |
None |
Columns dropped entirely |
content_separator |
str |
"\n" |
Used to join content-column values |
encoding |
str |
"utf-8-sig" |
File encoding |
Document metadata: row_index plus every column not in
content_columns or ignore_columns.
JSONLoader
Reads .json files where the root is an object or array of objects.
Each object becomes one Document.
from railtracks.retrieval.loaders import JSONLoader
# Root must be an object or array of objects. content_keys selects which
# keys form the searchable text; ignore_keys drops keys entirely.
docs = JSONLoader(
"articles.json",
content_keys=["title", "body"],
ignore_keys=["internal_id"],
).load()
print(docs[0].content) # "title: Getting started\nbody: ..."
print(docs[0].metadata) # {"author": "Alice", "index": 0}
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
file_path |
str |
required | Path to a .json file or directory |
content_keys |
list[str] | "*" |
"*" |
Keys whose values form content. "*" serialises the whole object. |
ignore_keys |
list[str] | None |
None |
Keys dropped entirely |
content_separator |
str |
"\n" |
Used to join content-key values |
encoding |
str |
"utf-8-sig" |
File encoding |
PyPDFLoader
For PDFs with embedded text. Pages with no text layer (scanned images)
return empty; for mixed corpora reach for PyPDFOCRLoader instead.
# Requires: pip install "railtracks[pdf]"
from railtracks.retrieval.loaders.pdf_loader import PyPDFLoader
docs = PyPDFLoader("report.pdf").load()
doc = docs[0]
print(doc.content) # extracted text from page 1
print(doc.type) # "pdf"
print(doc.metadata) # {"page": 1, "total_pages": 42, "file_type": ".pdf"}
Breakdown strategy
"page" (the default) emits one Document per page. Page numbers end up
in metadata, citations become trivial, and the chunker decides
per-page rather than across a 200-page file. Use page strategy for
retrieval.
from railtracks.retrieval.loaders.pdf_loader import PyPDFLoader
# One Document per page. Best for retrieval — keeps page numbers in
# metadata, which makes citations trivial.
docs = PyPDFLoader("report.pdf", breakdown_strategy="page").load()
print(len(docs)) # number of pages
print(docs[0].metadata) # {"page": 1, "total_pages": 42, "file_type": ".pdf"}
"document" emits a single Document for the whole PDF; only useful when
the PDF is small enough to chunk as one unit, or you want custom
splitting that crosses pages.
from railtracks.retrieval.loaders.pdf_loader import PyPDFLoader
# Single Document. Pages joined with "\n\n". Use only when the whole PDF
# is small enough to chunk together or you want to apply custom splitting.
docs = PyPDFLoader("report.pdf", breakdown_strategy="document").load()
print(len(docs)) # always 1
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
file_path |
str |
required | Path to a .pdf file or directory |
breakdown_strategy |
"page" \| "document" |
"page" |
How to split the PDF |
Document metadata (page strategy): page (1-based), total_pages,
file_type (.pdf).
PyPDFOCRLoader
For PDFs with scanned-image pages. Per page, tries pypdf text extraction first (fast), falls back to Tesseract OCR if extraction returns empty. Mixed PDFs work transparently.
Installation
Two pieces: a Python extra and a system binary.
Tesseract is OS-level; pip can't install it. Follow the official instructions, then verify in a fresh terminal:
Usage
# Requires: pip install "railtracks[ocr]" + Tesseract on PATH.
from railtracks.retrieval.loaders.pdf_ocr_loader import PyPDFOCRLoader
docs = PyPDFOCRLoader("scanned_invoice.pdf").load()
doc = docs[0]
print(doc.content) # OCR'd or pypdf-extracted text
print(doc.metadata["ocr"]) # True if OCR was used for this page
Some PDFs have a garbled or incomplete text layer that pypdf will happily
return. force_ocr=True skips the fast path and re-OCRs unconditionally:
from railtracks.retrieval.loaders.pdf_ocr_loader import PyPDFOCRLoader
# Skip the text-extraction fast path. Useful when pypdf returns a
# garbled or incomplete text layer that you'd rather re-OCR.
docs = PyPDFOCRLoader("messy_scan.pdf", force_ocr=True).load()
assert all(d.metadata["ocr"] for d in docs)
from railtracks.retrieval.loaders.pdf_ocr_loader import PyPDFOCRLoader
docs = PyPDFOCRLoader("report.pdf", breakdown_strategy="document").load()
print(docs[0].metadata)
# {"total_pages": 42, "file_type": ".pdf", "ocr_pages": [3, 7, 8]}
ocr_pages (document strategy) is the sorted list of 1-based page
numbers that required OCR; useful for auditing how much of a corpus
needed image-based extraction.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
file_path |
str |
required | Path to a .pdf file or directory |
breakdown_strategy |
"page" \| "document" |
"page" |
How to split the PDF |
force_ocr |
bool |
False |
OCR every page, skipping fast path |
dpi |
int |
300 |
OCR render resolution; 300 is Tesseract's sweet spot |
language |
str |
"eng" |
Tesseract language code ("eng+deu", "jpn", etc.) |
Document metadata: page, total_pages, file_type, ocr
(page-strategy boolean), ocr_pages (document-strategy list).
Tesseract limitations
Tesseract handles clean printed text well, struggles with
handwriting, low-quality scans, and complex layouts (tables, forms).
The BaseOCRLoader
abstraction lets future loaders plug in cloud OCR or LLM-vision
engines by overriding _ocr_image.
HuggingFaceDatasetLoader
Streams rows from any dataset on the Hugging Face Hub. One Document per row, fetched lazily.
async def hf_basic():
# Requires: pip install "railtracks[huggingface]"
from railtracks.retrieval.loaders.huggingface_loader import HuggingFaceDatasetLoader
loader = HuggingFaceDatasetLoader(
dataset_name="ag_news",
split="test",
content_columns=["text"],
)
# Rows are streamed; use astream() for anything larger than memory.
async for doc in loader.astream():
print(doc.content[:80])
print(doc.source) # "ag_news/test"
print(doc.metadata) # {"row_index": 0}
Always use astream() here. aload() / load() materialize the
whole split before returning; fine for tiny demo datasets, disastrous
for ag_news or anything Common Crawl–scale.
Many QA datasets split "the text" across columns (question + context,
title + body). Pass them all to content_columns:
from railtracks.retrieval.loaders.huggingface_loader import HuggingFaceDatasetLoader
# Many datasets split "the text" across columns. Join them with
# content_separator instead of stitching things yourself downstream.
HuggingFaceDatasetLoader(
dataset_name="squad",
split="validation",
content_columns=["question", "context"],
content_separator="\n\n",
)
metadata_columns are copied into Document.metadata as-is. Anything
not in content_columns or metadata_columns is dropped; be explicit
about what you want:
from railtracks.retrieval.loaders.huggingface_loader import HuggingFaceDatasetLoader
# metadata_columns are copied into Document.metadata for later filtering
# or citation. Anything not in content_columns or metadata_columns is dropped.
HuggingFaceDatasetLoader(
dataset_name="squad",
split="validation",
content_columns=["question", "context"],
metadata_columns=["title", "id"],
)
For subsets, revisions, or gated datasets, dataset_kwargs is forwarded
straight to datasets.load_dataset:
from railtracks.retrieval.loaders.huggingface_loader import HuggingFaceDatasetLoader
# dataset_kwargs is forwarded straight to datasets.load_dataset.
# Use it for subsets, revisions, gated-dataset tokens, or to disable streaming.
HuggingFaceDatasetLoader(
dataset_name="ms_marco",
split="validation",
content_columns=["query", "passages"],
dataset_kwargs={"name": "v2.1"},
)
For gated datasets set HF_TOKEN in your environment, or pass
dataset_kwargs={"token": "hf_xxxxxxx"}.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
dataset_name |
str |
required | Dataset name on the Hub |
split |
str |
required | Split to stream ("train", "validation", etc.) |
content_columns |
list[str] |
required | Columns joined into content. Must be non-empty. |
metadata_columns |
list[str] \| None |
None |
Columns copied into metadata |
content_separator |
str |
"\n" |
Used to join content_columns values |
dataset_kwargs |
dict \| None |
None |
Forwarded to datasets.load_dataset |
Document metadata: row_index plus any column listed in
metadata_columns. Document.source is "{dataset_name}/{split}".
LangChain Loaders
LangChainLoaderAdapter wraps any LangChain BaseLoader and normalises its output to railtracks' Document model. This unlocks LangChain's large community loader ecosystem (Wikipedia, Notion, Confluence, S3, Slack, …) without having to re-implement any of them in railtracks.
The adapter does not import langchain itself — it duck-types on the wrapped loader. Install whichever LangChain package provides the loader you want:
Basic Usage
# Requires: pip install "railtracks[pdf]"
from railtracks.retrieval.loaders.pdf_loader import PyPDFLoader
docs = PyPDFLoader("report.pdf").load()
doc = docs[0]
print(doc.content) # extracted text from page 1
print(doc.type) # "pdf"
print(doc.metadata) # {"page": 1, "total_pages": 42, "file_type": ".pdf"}
Each LangChain Document becomes one railtracks Document:
page_content→Document.contentmetadata["source"]is popped intoDocument.source(if present)- The remaining
metadatais copied across as-is
Tagging the Document Type
LangChain loaders are source-agnostic, so the adapter cannot guess the right DocumentType. Pass it explicitly when you know what you're loading:
# Requires: pip install "railtracks[pdf]"
from railtracks.retrieval.loaders.pdf_loader import PyPDFLoader
docs = PyPDFLoader("report.pdf").load()
doc = docs[0]
print(doc.content) # extracted text from page 1
print(doc.type) # "pdf"
print(doc.metadata) # {"page": 1, "total_pages": 42, "file_type": ".pdf"}
The default is DocumentType.TEXT.
Overriding the Source
If the wrapped loader doesn't populate metadata["source"] or you'd like a more meaningful label, pass source= to the adapter. The explicit value wins and metadata is left untouched:
Streaming Behaviour
The adapter tries to stream rather than buffer, falling back gracefully when the wrapped loader doesn't support async or lazy iteration:
| Wrapped loader exposes | Adapter uses | Streams? |
|---|---|---|
alazy_load |
alazy_load directly |
Yes (native async) |
lazy_load only |
lazy_load pumped from a worker thread |
Yes |
load only |
load() once, then iterates the result |
No (eager) |
Every modern LangChain BaseLoader provides at least the default alazy_load, so the streaming path is the common case.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
loader |
Any |
— | A LangChain BaseLoader-compatible instance. |
document_type |
DocumentType |
DocumentType.TEXT |
Tag applied to every emitted document. |
source |
str \| None |
None |
Overrides Document.source. When None, the adapter falls back to metadata["source"]. |
When to Reach for the Adapter
Use LangChainLoaderAdapter when:
- A loader you need already exists in
langchain-community(Notion, Slack, Confluence, Sitemap, GitHub issues, …) and re-implementing it would be wasted effort. - You're migrating a LangChain-based ingestion pipeline to railtracks and want to keep the existing loaders working unchanged.
Reach for a native railtracks loader (TextLoader, PyPDFLoader, HuggingFaceDatasetLoader, …) when one exists — they're better integrated and don't carry a third-party dependency.
Choosing a loader
| Situation | Start with |
|---|---|
| Plain text or markdown files on disk | TextLoader |
| Tabular rows (one document per row) | CSVLoader |
| Hand-curated structured data | JSONLoader |
| PDFs that came from a digital source | PyPDFLoader |
| PDFs from scans, photos, or unknown provenance | PyPDFOCRLoader |
| Public NLP datasets, benchmarks, large corpora | HuggingFaceDatasetLoader |
| Anything else (DB row, API response, queue) | Custom loader |
Custom loaders
When the built-ins don't cover your source (a database table, an
internal API, a message queue), subclass BaseDocumentLoader and
implement astream(). aload() and load() come for free.
from collections.abc import AsyncGenerator
from railtracks.retrieval import Document, DocumentType
from railtracks.retrieval.loaders import BaseDocumentLoader
class MyDatabaseLoader(BaseDocumentLoader):
"""One Document per row of a database table."""
def __init__(self, dsn: str, table: str) -> None:
self._dsn = dsn
self._table = table
async def astream(self) -> AsyncGenerator[Document, None]:
rows = await _async_fetch_rows(self._dsn, self._table)
for row in rows:
yield Document(
content=row["body"],
type=DocumentType.TEXT,
source=f"{self._table}:{row['id']}",
metadata={"author": row["author"], "created_at": row["created_at"]},
)
Use it like any other loader:
loader = MyDatabaseLoader("postgresql://...", table="articles")
# Implementing astream() gets you load() and aload() for free.
loader.load()
Don't buffer the corpus. Yield each Document as soon as it's ready
- the streaming pipeline depends on producers handing off work without
materializing everything first. Buffering at your source breaks
back-pressure for every downstream stage.
Wrapping a synchronous source
If your source only has a blocking API, push it to a worker thread with
asyncio.to_thread():
class MySyncLoader(BaseDocumentLoader):
"""Wrap a blocking source without blocking the event loop."""
async def astream(self) -> AsyncGenerator[Document, None]:
rows = await asyncio.to_thread(_fetch_rows_sync)
for row in rows:
yield Document(content=row["text"], type=DocumentType.TEXT)
Set source for free idempotency
Set Document.source to something stable: a path, a URL, a primary key.
The runtime hashes content and pairs it with source to skip re-ingest
of unchanged documents. Without a stable source, every run looks
"new" and you pay for embedding the same content repeatedly.
See also
- Loading overview: the
Documentobject,BaseDocumentLoadercontract, the loader → chunker handoff. - Chunking methods: what to do with the
Documents these loaders produce. SanitizingLoader- wrap any loader to redact PII before chunking.