railtracks.rag

1from .chunking_service import TextChunkingService
2from .rag_core import RAG, RAGConfig
3
4__all__ = [
5    "RAG",
6    "RAGConfig",
7    "TextChunkingService",
8]
class RAG:
 72class RAG:
 73    """
 74    RAG (Retrieval-Augmented Generation) system for processing and searching documents.
 75
 76    You can construct it with:
 77      1) A single RAGConfig (clean, declarative), and/or
 78      2) Prebuilt services (embedding_service, vector_store, chunk_service) for advanced control.
 79
 80    Precedence: explicitly provided services > config-driven creation > defaults.
 81
 82    Typical use:
 83        rag = RAG.from_docs(docs, config)
 84        rag.embed_all()
 85        results = rag.search("query", top_k=3)
 86    """
 87
 88    def __init__(
 89        self,
 90        docs: Sequence[str],
 91        *,
 92        config: Optional[RAGConfig] = None,
 93        embedding_service: Optional[BaseEmbeddingService] = None,
 94        vector_store: Optional[AbstractVectorStore] = None,
 95        chunk_service: Optional[TextChunkingService] = None,
 96    ):
 97        self._config = config or RAGConfig()
 98
 99        # Build or accept existing services
100        self.embed_service = embedding_service or EmbeddingService(
101            **self._config.embedding
102        )
103        self.vector_store = vector_store or create_store(**self._config.store)
104        self.chunk_service = chunk_service or TextChunkingService(
105            **self._config.chunking,
106            strategy=self._config.chunk_strategy or TextChunkingService.chunk_by_token,
107        )
108
109        # Initialize TextObjects
110        self.text_objects: List[TextObject] = [TextObject(doc) for doc in docs]
111
112    @classmethod
113    def from_docs(
114        cls,
115        docs: Sequence[str],
116        config: Optional[RAGConfig] = None,
117        **service_overrides,
118    ) -> "RAG":
119        """
120        Convenience constructor. Accepts a config plus optional service overrides:
121          - embedding_service
122          - vector_store
123          - chunk_service
124        """
125        return cls(docs, config=config, **service_overrides)
126
127    def add_docs(self, docs: Sequence[str]) -> None:
128        """Add more raw documents."""
129        for doc in docs:
130            self.text_objects.append(TextObject(doc))
131
132    def embed_all(self) -> None:
133        """
134        Chunk and embed all text objects; persist as vector records in the vector store.
135
136        Idempotency note: This method does not deduplicate; calling multiple times will
137        append more records for the same docs. Manage deduplication externally if needed.
138        """
139        for tobj in self.text_objects:
140            chunks = self.chunk_service.chunk(tobj.raw_content)
141            vectors = self.embed_service.embed(chunks)
142            tobj.set_chunked(chunks)
143            tobj.set_embeddings(vectors)
144            records = textobject_to_vectorrecords(tobj)
145            # These are pre-embedded VectorRecord objects
146            self.vector_store.add(records)
147
148    def search(self, query: Union[str, List[float]], top_k: int = 3) -> SearchResult:
149        """
150        Search the vector store for relevant documents.
151
152        If query is text, we embed it; if it's a vector, we pass it through.
153        """
154        if isinstance(query, str):
155            q_vec = self.embed_service.embed([query])[0]
156            return self.vector_store.search(q_vec, top_k=top_k)
157        else:
158            return self.vector_store.search(query, top_k=top_k)

RAG (Retrieval-Augmented Generation) system for processing and searching documents.

You can construct it with:

1) A single RAGConfig (clean, declarative), and/or 2) Prebuilt services (embedding_service, vector_store, chunk_service) for advanced control.

Precedence: explicitly provided services > config-driven creation > defaults.

Typical use:

rag = RAG.from_docs(docs, config) rag.embed_all() results = rag.search("query", top_k=3)

RAG( docs: Sequence[str], *, config: Optional[RAGConfig] = None, embedding_service: Optional[railtracks.rag.embedding_service.BaseEmbeddingService] = None, vector_store: Optional[railtracks.rag.vector_store.base.AbstractVectorStore] = None, chunk_service: Optional[TextChunkingService] = None)
 88    def __init__(
 89        self,
 90        docs: Sequence[str],
 91        *,
 92        config: Optional[RAGConfig] = None,
 93        embedding_service: Optional[BaseEmbeddingService] = None,
 94        vector_store: Optional[AbstractVectorStore] = None,
 95        chunk_service: Optional[TextChunkingService] = None,
 96    ):
 97        self._config = config or RAGConfig()
 98
 99        # Build or accept existing services
100        self.embed_service = embedding_service or EmbeddingService(
101            **self._config.embedding
102        )
103        self.vector_store = vector_store or create_store(**self._config.store)
104        self.chunk_service = chunk_service or TextChunkingService(
105            **self._config.chunking,
106            strategy=self._config.chunk_strategy or TextChunkingService.chunk_by_token,
107        )
108
109        # Initialize TextObjects
110        self.text_objects: List[TextObject] = [TextObject(doc) for doc in docs]
embed_service
vector_store
chunk_service
text_objects: List[railtracks.rag.text_object.TextObject]
@classmethod
def from_docs( cls, docs: Sequence[str], config: Optional[RAGConfig] = None, **service_overrides) -> RAG:
112    @classmethod
113    def from_docs(
114        cls,
115        docs: Sequence[str],
116        config: Optional[RAGConfig] = None,
117        **service_overrides,
118    ) -> "RAG":
119        """
120        Convenience constructor. Accepts a config plus optional service overrides:
121          - embedding_service
122          - vector_store
123          - chunk_service
124        """
125        return cls(docs, config=config, **service_overrides)

Convenience constructor. Accepts a config plus optional service overrides:

  • embedding_service
  • vector_store
  • chunk_service
def add_docs(self, docs: Sequence[str]) -> None:
127    def add_docs(self, docs: Sequence[str]) -> None:
128        """Add more raw documents."""
129        for doc in docs:
130            self.text_objects.append(TextObject(doc))

Add more raw documents.

def embed_all(self) -> None:
132    def embed_all(self) -> None:
133        """
134        Chunk and embed all text objects; persist as vector records in the vector store.
135
136        Idempotency note: This method does not deduplicate; calling multiple times will
137        append more records for the same docs. Manage deduplication externally if needed.
138        """
139        for tobj in self.text_objects:
140            chunks = self.chunk_service.chunk(tobj.raw_content)
141            vectors = self.embed_service.embed(chunks)
142            tobj.set_chunked(chunks)
143            tobj.set_embeddings(vectors)
144            records = textobject_to_vectorrecords(tobj)
145            # These are pre-embedded VectorRecord objects
146            self.vector_store.add(records)

Chunk and embed all text objects; persist as vector records in the vector store.

Idempotency note: This method does not deduplicate; calling multiple times will append more records for the same docs. Manage deduplication externally if needed.

def search( self, query: Union[str, List[float]], top_k: int = 3) -> railtracks.rag.vector_store.base.SearchResult:
148    def search(self, query: Union[str, List[float]], top_k: int = 3) -> SearchResult:
149        """
150        Search the vector store for relevant documents.
151
152        If query is text, we embed it; if it's a vector, we pass it through.
153        """
154        if isinstance(query, str):
155            q_vec = self.embed_service.embed([query])[0]
156            return self.vector_store.search(q_vec, top_k=top_k)
157        else:
158            return self.vector_store.search(query, top_k=top_k)

Search the vector store for relevant documents.

If query is text, we embed it; if it's a vector, we pass it through.

@dataclass(frozen=True)
class RAGConfig:
22@dataclass(frozen=True)
23class RAGConfig:
24    """
25    Unified configuration for RAG.
26
27    - embedding: kwargs forwarded to EmbeddingService(...)
28    - store: kwargs forwarded to create_store(...)
29    - chunking: kwargs forwarded to TextChunkingService(...)
30    - chunk_strategy: override the default chunking strategy if provided
31    """
32
33    embedding: Dict = field(default_factory=dict)
34    store: Dict = field(default_factory=dict)
35    chunking: Dict = field(default_factory=dict)
36    chunk_strategy: Optional[Callable[[str], List[str]]] = None

Unified configuration for RAG.

  • embedding: kwargs forwarded to EmbeddingService(...)
  • store: kwargs forwarded to create_store(...)
  • chunking: kwargs forwarded to TextChunkingService(...)
  • chunk_strategy: override the default chunking strategy if provided
RAGConfig( embedding: Dict = <factory>, store: Dict = <factory>, chunking: Dict = <factory>, chunk_strategy: Optional[Callable[[str], List[str]]] = None)
embedding: Dict
store: Dict
chunking: Dict
chunk_strategy: Optional[Callable[[str], List[str]]] = None
class TextChunkingService(railtracks.rag.chunking_service.BaseChunkingService):
 83class TextChunkingService(BaseChunkingService):
 84    """Processor for text operations."""
 85
 86    strategies = ["chunk_by_char", "chunk_by_token", "chunk_smart"]
 87
 88    def __init__(
 89        self,
 90        chunk_size: int = 2048,
 91        chunk_overlap: int = 256,
 92        model: Optional[str] = "gpt-3.5-turbo",
 93        strategy: Optional[Callable] = None,
 94        *other_configs,
 95        **other_kwargs,
 96    ):
 97        """
 98        Initialize the text chunker.
 99
100        Args:
101            chunk_size: Size of each chunk in characters
102            chunk_overlap: Overlap between chunks in characters
103        """
104        self.model = model
105        super().__init__(
106            chunk_size, chunk_overlap, strategy, *other_configs, **other_kwargs
107        )
108
109    def chunk_by_char(
110        self,
111        content: str,
112    ) -> List[str]:
113        """
114        Split text into chunks
115        """
116        chunks = []
117        start = 0
118
119        # end was used but never declared in the original code snippet;
120        # we'll add a separate approach or define end before the loop.
121        while start < len(content):
122            end = min(start + self.chunk_size, len(content))
123            chunk = content[start:end]
124            chunks.append(chunk)
125            start += self.chunk_size - self.chunk_overlap
126
127        return chunks
128
129    def chunk_by_token(self, content: str) -> List[str]:
130        """
131        Split text into chunks by token.
132
133        TODO: use LLM to do this
134        """
135        chunks = []
136        if not self.model:
137            error_message = "Model not specified for token chunking."
138            logger.error(error_message)
139            raise ValueError(error_message)
140        tokenizer = Tokenizer(self.model)
141        tokens = tokenizer.encode(content)
142
143        if self.chunk_overlap > self.chunk_size:
144            logger.warning(
145                f"Warning: chunk_overlap ({self.chunk_overlap}) is greater than chunk_size ({self.chunk_size})."
146                " Should be <= 40%"
147            )
148            raise ValueError("chunk_overlap should be less than or equal to chunk_size")
149
150        start = 0
151        while start < len(tokens):
152            end = min(start + self.chunk_size, len(tokens))
153            token_chunk = tokens[start:end]
154            chunk = tokenizer.decode(token_chunk)
155
156            chunks.append(chunk)
157            start += self.chunk_size - self.chunk_overlap
158            if end >= len(tokens):
159                break
160
161        return chunks

Processor for text operations.

TextChunkingService( chunk_size: int = 2048, chunk_overlap: int = 256, model: Optional[str] = 'gpt-3.5-turbo', strategy: Optional[Callable] = None, *other_configs, **other_kwargs)
 88    def __init__(
 89        self,
 90        chunk_size: int = 2048,
 91        chunk_overlap: int = 256,
 92        model: Optional[str] = "gpt-3.5-turbo",
 93        strategy: Optional[Callable] = None,
 94        *other_configs,
 95        **other_kwargs,
 96    ):
 97        """
 98        Initialize the text chunker.
 99
100        Args:
101            chunk_size: Size of each chunk in characters
102            chunk_overlap: Overlap between chunks in characters
103        """
104        self.model = model
105        super().__init__(
106            chunk_size, chunk_overlap, strategy, *other_configs, **other_kwargs
107        )

Initialize the text chunker.

Arguments:
  • chunk_size: Size of each chunk in characters
  • chunk_overlap: Overlap between chunks in characters
strategies = ['chunk_by_char', 'chunk_by_token', 'chunk_smart']
model
def chunk_by_char(self, content: str) -> List[str]:
109    def chunk_by_char(
110        self,
111        content: str,
112    ) -> List[str]:
113        """
114        Split text into chunks
115        """
116        chunks = []
117        start = 0
118
119        # end was used but never declared in the original code snippet;
120        # we'll add a separate approach or define end before the loop.
121        while start < len(content):
122            end = min(start + self.chunk_size, len(content))
123            chunk = content[start:end]
124            chunks.append(chunk)
125            start += self.chunk_size - self.chunk_overlap
126
127        return chunks

Split text into chunks

def chunk_by_token(self, content: str) -> List[str]:
129    def chunk_by_token(self, content: str) -> List[str]:
130        """
131        Split text into chunks by token.
132
133        TODO: use LLM to do this
134        """
135        chunks = []
136        if not self.model:
137            error_message = "Model not specified for token chunking."
138            logger.error(error_message)
139            raise ValueError(error_message)
140        tokenizer = Tokenizer(self.model)
141        tokens = tokenizer.encode(content)
142
143        if self.chunk_overlap > self.chunk_size:
144            logger.warning(
145                f"Warning: chunk_overlap ({self.chunk_overlap}) is greater than chunk_size ({self.chunk_size})."
146                " Should be <= 40%"
147            )
148            raise ValueError("chunk_overlap should be less than or equal to chunk_size")
149
150        start = 0
151        while start < len(tokens):
152            end = min(start + self.chunk_size, len(tokens))
153            token_chunk = tokens[start:end]
154            chunk = tokenizer.decode(token_chunk)
155
156            chunks.append(chunk)
157            start += self.chunk_size - self.chunk_overlap
158            if end >= len(tokens):
159                break
160
161        return chunks

Split text into chunks by token.

TODO: use LLM to do this