Skip to content

Loading: Built-in loaders

Six loaders ship under railtracks.retrieval.loaders. Pick one based on your source format, and reach for a custom loader when none of these fit.


Summary

Loader Source One Document per Extras
TextLoader .txt / .md files (or directories) File None
CSVLoader .csv files (or directories) Row None
JSONLoader .json files (or directories) Top-level object None
PyPDFLoader PDFs with a text layer Page (default) or whole file railtracks[pdf]
PyPDFOCRLoader PDFs that include scanned images Page (default) or whole file railtracks[ocr] + Tesseract binary
HuggingFaceDatasetLoader Any dataset on the HF Hub Row railtracks[huggingface]

Every loader exposes the same triple: load() (sync, materializes everything), aload() (async, materializes everything), astream() (async generator). For corpora larger than memory, always reach for astream().


TextLoader

Reads .txt and .md files. Markdown files auto-get type="markdown", which lets downstream chunkers (MarkdownHeaderChunker) pick heading-aware splitting.

from railtracks.retrieval.loaders import TextLoader



loader = TextLoader("notes.txt")
docs = loader.load()

doc = docs[0]
print(doc.content)            # full file text
print(doc.type)               # "text" or "markdown"
print(doc.source)             # "notes.txt"
print(doc.metadata)           # {"file_type": ".txt", "encoding": "utf-8-sig"}
# Recursively loads .txt and .md files, sorted by path.
docs = TextLoader("knowledge_base/").load()
print(len(docs))
print(docs[0].source)

Directories are walked recursively; files are returned in sorted-path order for deterministic re-ingest. Default encoding is utf-8-sig (BOM-aware), which beats utf-8 for legacy corpora without slowing the common case.

Parameters

Parameter Type Default Description
file_path str required Path to a .txt/.md file or directory
encoding str "utf-8-sig" File encoding (BOM-aware)

Document metadata: file_type (.txt or .md), encoding.


CSVLoader

One Document per row. Columns can go into content (searchable) or metadata (filterable, not embedded).

from railtracks.retrieval.loaders import CSVLoader



# Every row becomes a Document. By default, all columns end up in content.
docs = CSVLoader("products.csv").load()

doc = docs[0]
print(doc.content)   # "name: Widget\nprice: 9.99\ndescription: ..."
print(doc.type)      # "csv"
print(doc.metadata)  # {"row_index": 0}

With no column config, every column ends up in content: usually not what you want. IDs, timestamps, and foreign keys add noise without helping retrieval. Use content_columns to be explicit:

# Columns in content_columns form the searchable text.
# Everything else automatically becomes metadata (filterable downstream).
loader = CSVLoader(
    "products.csv",
    content_columns=["name", "description"],
)
docs = loader.load()
print(docs[0].content)   # "name: Widget\ndescription: ..."
print(docs[0].metadata)  # {"price": "9.99", "row_index": 0}

Columns not in content_columns automatically become metadata. Use ignore_columns to drop fields entirely (PII, audit timestamps).

Additionally you can decide what you want to use as a separator for merging columns when loading:

# Default content_separator is "\n". Change it for single-line records.
CSVLoader(
    "products.csv",
    content_columns=["name", "description"],
    content_separator=" | ",
)
Parameters

Parameter Type Default Description
file_path str required Path to a .csv file or directory
content_columns list[str] | None None Columns joined into content. None = all columns.
ignore_columns list[str] | None None Columns dropped entirely
content_separator str "\n" Used to join content-column values
encoding str "utf-8-sig" File encoding

Document metadata: row_index plus every column not in content_columns or ignore_columns.


JSONLoader

Reads .json files where the root is an object or array of objects. Each object becomes one Document.

from railtracks.retrieval.loaders import JSONLoader

# Root must be an object or array of objects. content_keys selects which
# keys form the searchable text; ignore_keys drops keys entirely.
docs = JSONLoader(
    "articles.json",
    content_keys=["title", "body"],
    ignore_keys=["internal_id"],
).load()

print(docs[0].content)   # "title: Getting started\nbody: ..."
print(docs[0].metadata)  # {"author": "Alice", "index": 0}

Parameters

Parameter Type Default Description
file_path str required Path to a .json file or directory
content_keys list[str] | "*" "*" Keys whose values form content. "*" serialises the whole object.
ignore_keys list[str] | None None Keys dropped entirely
content_separator str "\n" Used to join content-key values
encoding str "utf-8-sig" File encoding

PyPDFLoader

For PDFs with embedded text. Pages with no text layer (scanned images) return empty; for mixed corpora reach for PyPDFOCRLoader instead.

pip install "railtracks[pdf]"
# Requires: pip install "railtracks[pdf]"
from railtracks.retrieval.loaders.pdf_loader import PyPDFLoader

docs = PyPDFLoader("report.pdf").load()
doc = docs[0]
print(doc.content)   # extracted text from page 1
print(doc.type)      # "pdf"
print(doc.metadata)  # {"page": 1, "total_pages": 42, "file_type": ".pdf"}

Breakdown strategy

"page" (the default) emits one Document per page. Page numbers end up in metadata, citations become trivial, and the chunker decides per-page rather than across a 200-page file. Use page strategy for retrieval.

from railtracks.retrieval.loaders.pdf_loader import PyPDFLoader

# One Document per page. Best for retrieval — keeps page numbers in
# metadata, which makes citations trivial.
docs = PyPDFLoader("report.pdf", breakdown_strategy="page").load()
print(len(docs))              # number of pages
print(docs[0].metadata)       # {"page": 1, "total_pages": 42, "file_type": ".pdf"}

"document" emits a single Document for the whole PDF; only useful when the PDF is small enough to chunk as one unit, or you want custom splitting that crosses pages.

from railtracks.retrieval.loaders.pdf_loader import PyPDFLoader

# Single Document. Pages joined with "\n\n". Use only when the whole PDF
# is small enough to chunk together or you want to apply custom splitting.
docs = PyPDFLoader("report.pdf", breakdown_strategy="document").load()
print(len(docs))        # always 1

Parameters

Parameter Type Default Description
file_path str required Path to a .pdf file or directory
breakdown_strategy "page" \| "document" "page" How to split the PDF

Document metadata (page strategy): page (1-based), total_pages, file_type (.pdf).


PyPDFOCRLoader

For PDFs with scanned-image pages. Per page, tries pypdf text extraction first (fast), falls back to Tesseract OCR if extraction returns empty. Mixed PDFs work transparently.

Installation

Two pieces: a Python extra and a system binary.

pip install "railtracks[ocr]"

Tesseract is OS-level; pip can't install it. Follow the official instructions, then verify in a fresh terminal:

tesseract --version

Usage

# Requires: pip install "railtracks[ocr]" + Tesseract on PATH.
from railtracks.retrieval.loaders.pdf_ocr_loader import PyPDFOCRLoader

docs = PyPDFOCRLoader("scanned_invoice.pdf").load()
doc = docs[0]
print(doc.content)         # OCR'd or pypdf-extracted text
print(doc.metadata["ocr"]) # True if OCR was used for this page

Some PDFs have a garbled or incomplete text layer that pypdf will happily return. force_ocr=True skips the fast path and re-OCRs unconditionally:

from railtracks.retrieval.loaders.pdf_ocr_loader import PyPDFOCRLoader

# Skip the text-extraction fast path. Useful when pypdf returns a
# garbled or incomplete text layer that you'd rather re-OCR.
docs = PyPDFOCRLoader("messy_scan.pdf", force_ocr=True).load()
assert all(d.metadata["ocr"] for d in docs)
from railtracks.retrieval.loaders.pdf_ocr_loader import PyPDFOCRLoader

docs = PyPDFOCRLoader("report.pdf", breakdown_strategy="document").load()
print(docs[0].metadata)
    # {"total_pages": 42, "file_type": ".pdf", "ocr_pages": [3, 7, 8]}

ocr_pages (document strategy) is the sorted list of 1-based page numbers that required OCR; useful for auditing how much of a corpus needed image-based extraction.

Parameters

Parameter Type Default Description
file_path str required Path to a .pdf file or directory
breakdown_strategy "page" \| "document" "page" How to split the PDF
force_ocr bool False OCR every page, skipping fast path
dpi int 300 OCR render resolution; 300 is Tesseract's sweet spot
language str "eng" Tesseract language code ("eng+deu", "jpn", etc.)

Document metadata: page, total_pages, file_type, ocr (page-strategy boolean), ocr_pages (document-strategy list).

Tesseract limitations

Tesseract handles clean printed text well, struggles with handwriting, low-quality scans, and complex layouts (tables, forms). The BaseOCRLoader abstraction lets future loaders plug in cloud OCR or LLM-vision engines by overriding _ocr_image.


HuggingFaceDatasetLoader

Streams rows from any dataset on the Hugging Face Hub. One Document per row, fetched lazily.

pip install "railtracks[huggingface]"
async def hf_basic():
    # Requires: pip install "railtracks[huggingface]"
    from railtracks.retrieval.loaders.huggingface_loader import HuggingFaceDatasetLoader

    loader = HuggingFaceDatasetLoader(
        dataset_name="ag_news",
        split="test",
        content_columns=["text"],
    )
    # Rows are streamed; use astream() for anything larger than memory.
    async for doc in loader.astream():
        print(doc.content[:80])
        print(doc.source)    # "ag_news/test"
        print(doc.metadata)  # {"row_index": 0}

Always use astream() here. aload() / load() materialize the whole split before returning; fine for tiny demo datasets, disastrous for ag_news or anything Common Crawl–scale.

Many QA datasets split "the text" across columns (question + context, title + body). Pass them all to content_columns:

from railtracks.retrieval.loaders.huggingface_loader import HuggingFaceDatasetLoader

# Many datasets split "the text" across columns. Join them with
# content_separator instead of stitching things yourself downstream.
HuggingFaceDatasetLoader(
    dataset_name="squad",
    split="validation",
    content_columns=["question", "context"],
    content_separator="\n\n",
)

metadata_columns are copied into Document.metadata as-is. Anything not in content_columns or metadata_columns is dropped; be explicit about what you want:

from railtracks.retrieval.loaders.huggingface_loader import HuggingFaceDatasetLoader

# metadata_columns are copied into Document.metadata for later filtering
# or citation. Anything not in content_columns or metadata_columns is dropped.
HuggingFaceDatasetLoader(
    dataset_name="squad",
    split="validation",
    content_columns=["question", "context"],
    metadata_columns=["title", "id"],
)

For subsets, revisions, or gated datasets, dataset_kwargs is forwarded straight to datasets.load_dataset:

from railtracks.retrieval.loaders.huggingface_loader import HuggingFaceDatasetLoader

# dataset_kwargs is forwarded straight to datasets.load_dataset.
# Use it for subsets, revisions, gated-dataset tokens, or to disable streaming.
HuggingFaceDatasetLoader(
    dataset_name="ms_marco",
    split="validation",
    content_columns=["query", "passages"],
    dataset_kwargs={"name": "v2.1"},
)

For gated datasets set HF_TOKEN in your environment, or pass dataset_kwargs={"token": "hf_xxxxxxx"}.

Parameters

Parameter Type Default Description
dataset_name str required Dataset name on the Hub
split str required Split to stream ("train", "validation", etc.)
content_columns list[str] required Columns joined into content. Must be non-empty.
metadata_columns list[str] \| None None Columns copied into metadata
content_separator str "\n" Used to join content_columns values
dataset_kwargs dict \| None None Forwarded to datasets.load_dataset

Document metadata: row_index plus any column listed in metadata_columns. Document.source is "{dataset_name}/{split}".


LangChain Loaders

LangChainLoaderAdapter wraps any LangChain BaseLoader and normalises its output to railtracks' Document model. This unlocks LangChain's large community loader ecosystem (Wikipedia, Notion, Confluence, S3, Slack, …) without having to re-implement any of them in railtracks.

The adapter does not import langchain itself — it duck-types on the wrapped loader. Install whichever LangChain package provides the loader you want:

pip install langchain-community
from langchain_community.document_loaders import WikipediaLoader

Basic Usage

# Requires: pip install "railtracks[pdf]"
from railtracks.retrieval.loaders.pdf_loader import PyPDFLoader

docs = PyPDFLoader("report.pdf").load()
doc = docs[0]
print(doc.content)   # extracted text from page 1
print(doc.type)      # "pdf"
print(doc.metadata)  # {"page": 1, "total_pages": 42, "file_type": ".pdf"}

Each LangChain Document becomes one railtracks Document:

  • page_contentDocument.content
  • metadata["source"] is popped into Document.source (if present)
  • The remaining metadata is copied across as-is

Tagging the Document Type

LangChain loaders are source-agnostic, so the adapter cannot guess the right DocumentType. Pass it explicitly when you know what you're loading:

# Requires: pip install "railtracks[pdf]"
from railtracks.retrieval.loaders.pdf_loader import PyPDFLoader

docs = PyPDFLoader("report.pdf").load()
doc = docs[0]
print(doc.content)   # extracted text from page 1
print(doc.type)      # "pdf"
print(doc.metadata)  # {"page": 1, "total_pages": 42, "file_type": ".pdf"}

The default is DocumentType.TEXT.


Overriding the Source

If the wrapped loader doesn't populate metadata["source"] or you'd like a more meaningful label, pass source= to the adapter. The explicit value wins and metadata is left untouched:



Streaming Behaviour

The adapter tries to stream rather than buffer, falling back gracefully when the wrapped loader doesn't support async or lazy iteration:

Wrapped loader exposes Adapter uses Streams?
alazy_load alazy_load directly Yes (native async)
lazy_load only lazy_load pumped from a worker thread Yes
load only load() once, then iterates the result No (eager)

Every modern LangChain BaseLoader provides at least the default alazy_load, so the streaming path is the common case.


Parameters

Parameter Type Default Description
loader Any A LangChain BaseLoader-compatible instance.
document_type DocumentType DocumentType.TEXT Tag applied to every emitted document.
source str \| None None Overrides Document.source. When None, the adapter falls back to metadata["source"].

When to Reach for the Adapter

Use LangChainLoaderAdapter when:

  • A loader you need already exists in langchain-community (Notion, Slack, Confluence, Sitemap, GitHub issues, …) and re-implementing it would be wasted effort.
  • You're migrating a LangChain-based ingestion pipeline to railtracks and want to keep the existing loaders working unchanged.

Reach for a native railtracks loader (TextLoader, PyPDFLoader, HuggingFaceDatasetLoader, …) when one exists — they're better integrated and don't carry a third-party dependency.


Choosing a loader

Situation Start with
Plain text or markdown files on disk TextLoader
Tabular rows (one document per row) CSVLoader
Hand-curated structured data JSONLoader
PDFs that came from a digital source PyPDFLoader
PDFs from scans, photos, or unknown provenance PyPDFOCRLoader
Public NLP datasets, benchmarks, large corpora HuggingFaceDatasetLoader
Anything else (DB row, API response, queue) Custom loader

Custom loaders

When the built-ins don't cover your source (a database table, an internal API, a message queue), subclass BaseDocumentLoader and implement astream(). aload() and load() come for free.

from collections.abc import AsyncGenerator

from railtracks.retrieval import Document, DocumentType  
from railtracks.retrieval.loaders import BaseDocumentLoader  


class MyDatabaseLoader(BaseDocumentLoader):
    """One Document per row of a database table."""

    def __init__(self, dsn: str, table: str) -> None:
        self._dsn = dsn
        self._table = table

    async def astream(self) -> AsyncGenerator[Document, None]:
        rows = await _async_fetch_rows(self._dsn, self._table)
        for row in rows:
            yield Document(
                content=row["body"],
                type=DocumentType.TEXT,
                source=f"{self._table}:{row['id']}",
                metadata={"author": row["author"], "created_at": row["created_at"]},
            )

Use it like any other loader:

loader = MyDatabaseLoader("postgresql://...", table="articles")
# Implementing astream() gets you load() and aload() for free.
loader.load()

Don't buffer the corpus. Yield each Document as soon as it's ready - the streaming pipeline depends on producers handing off work without materializing everything first. Buffering at your source breaks back-pressure for every downstream stage.

Wrapping a synchronous source

If your source only has a blocking API, push it to a worker thread with asyncio.to_thread():

class MySyncLoader(BaseDocumentLoader):
    """Wrap a blocking source without blocking the event loop."""

    async def astream(self) -> AsyncGenerator[Document, None]:
        rows = await asyncio.to_thread(_fetch_rows_sync)
        for row in rows:
            yield Document(content=row["text"], type=DocumentType.TEXT)

Set source for free idempotency

Set Document.source to something stable: a path, a URL, a primary key. The runtime hashes content and pairs it with source to skip re-ingest of unchanged documents. Without a stable source, every run looks "new" and you pay for embedding the same content repeatedly.


See also