railtracks.rag
72class RAG: 73 """ 74 RAG (Retrieval-Augmented Generation) system for processing and searching documents. 75 76 You can construct it with: 77 1) A single RAGConfig (clean, declarative), and/or 78 2) Prebuilt services (embedding_service, vector_store, chunk_service) for advanced control. 79 80 Precedence: explicitly provided services > config-driven creation > defaults. 81 82 Typical use: 83 rag = RAG.from_docs(docs, config) 84 rag.embed_all() 85 results = rag.search("query", top_k=3) 86 """ 87 88 def __init__( 89 self, 90 docs: Sequence[str], 91 *, 92 config: Optional[RAGConfig] = None, 93 embedding_service: Optional[BaseEmbeddingService] = None, 94 vector_store: Optional[AbstractVectorStore] = None, 95 chunk_service: Optional[TextChunkingService] = None, 96 ): 97 self._config = config or RAGConfig() 98 99 # Build or accept existing services 100 self.embed_service = embedding_service or EmbeddingService( 101 **self._config.embedding 102 ) 103 self.vector_store = vector_store or create_store(**self._config.store) 104 self.chunk_service = chunk_service or TextChunkingService( 105 **self._config.chunking, 106 strategy=self._config.chunk_strategy or TextChunkingService.chunk_by_token, 107 ) 108 109 # Initialize TextObjects 110 self.text_objects: List[TextObject] = [TextObject(doc) for doc in docs] 111 112 @classmethod 113 def from_docs( 114 cls, 115 docs: Sequence[str], 116 config: Optional[RAGConfig] = None, 117 **service_overrides, 118 ) -> "RAG": 119 """ 120 Convenience constructor. Accepts a config plus optional service overrides: 121 - embedding_service 122 - vector_store 123 - chunk_service 124 """ 125 return cls(docs, config=config, **service_overrides) 126 127 def add_docs(self, docs: Sequence[str]) -> None: 128 """Add more raw documents.""" 129 for doc in docs: 130 self.text_objects.append(TextObject(doc)) 131 132 def embed_all(self) -> None: 133 """ 134 Chunk and embed all text objects; persist as vector records in the vector store. 135 136 Idempotency note: This method does not deduplicate; calling multiple times will 137 append more records for the same docs. Manage deduplication externally if needed. 138 """ 139 for tobj in self.text_objects: 140 chunks = self.chunk_service.chunk(tobj.raw_content) 141 vectors = self.embed_service.embed(chunks) 142 tobj.set_chunked(chunks) 143 tobj.set_embeddings(vectors) 144 records = textobject_to_vectorrecords(tobj) 145 # These are pre-embedded VectorRecord objects 146 self.vector_store.add(records) 147 148 def search(self, query: Union[str, List[float]], top_k: int = 3) -> SearchResult: 149 """ 150 Search the vector store for relevant documents. 151 152 If query is text, we embed it; if it's a vector, we pass it through. 153 """ 154 if isinstance(query, str): 155 q_vec = self.embed_service.embed([query])[0] 156 return self.vector_store.search(q_vec, top_k=top_k) 157 else: 158 return self.vector_store.search(query, top_k=top_k)
RAG (Retrieval-Augmented Generation) system for processing and searching documents.
You can construct it with:
1) A single RAGConfig (clean, declarative), and/or 2) Prebuilt services (embedding_service, vector_store, chunk_service) for advanced control.
Precedence: explicitly provided services > config-driven creation > defaults.
Typical use:
rag = RAG.from_docs(docs, config) rag.embed_all() results = rag.search("query", top_k=3)
88 def __init__( 89 self, 90 docs: Sequence[str], 91 *, 92 config: Optional[RAGConfig] = None, 93 embedding_service: Optional[BaseEmbeddingService] = None, 94 vector_store: Optional[AbstractVectorStore] = None, 95 chunk_service: Optional[TextChunkingService] = None, 96 ): 97 self._config = config or RAGConfig() 98 99 # Build or accept existing services 100 self.embed_service = embedding_service or EmbeddingService( 101 **self._config.embedding 102 ) 103 self.vector_store = vector_store or create_store(**self._config.store) 104 self.chunk_service = chunk_service or TextChunkingService( 105 **self._config.chunking, 106 strategy=self._config.chunk_strategy or TextChunkingService.chunk_by_token, 107 ) 108 109 # Initialize TextObjects 110 self.text_objects: List[TextObject] = [TextObject(doc) for doc in docs]
112 @classmethod 113 def from_docs( 114 cls, 115 docs: Sequence[str], 116 config: Optional[RAGConfig] = None, 117 **service_overrides, 118 ) -> "RAG": 119 """ 120 Convenience constructor. Accepts a config plus optional service overrides: 121 - embedding_service 122 - vector_store 123 - chunk_service 124 """ 125 return cls(docs, config=config, **service_overrides)
Convenience constructor. Accepts a config plus optional service overrides:
- embedding_service
- vector_store
- chunk_service
127 def add_docs(self, docs: Sequence[str]) -> None: 128 """Add more raw documents.""" 129 for doc in docs: 130 self.text_objects.append(TextObject(doc))
Add more raw documents.
132 def embed_all(self) -> None: 133 """ 134 Chunk and embed all text objects; persist as vector records in the vector store. 135 136 Idempotency note: This method does not deduplicate; calling multiple times will 137 append more records for the same docs. Manage deduplication externally if needed. 138 """ 139 for tobj in self.text_objects: 140 chunks = self.chunk_service.chunk(tobj.raw_content) 141 vectors = self.embed_service.embed(chunks) 142 tobj.set_chunked(chunks) 143 tobj.set_embeddings(vectors) 144 records = textobject_to_vectorrecords(tobj) 145 # These are pre-embedded VectorRecord objects 146 self.vector_store.add(records)
Chunk and embed all text objects; persist as vector records in the vector store.
Idempotency note: This method does not deduplicate; calling multiple times will append more records for the same docs. Manage deduplication externally if needed.
148 def search(self, query: Union[str, List[float]], top_k: int = 3) -> SearchResult: 149 """ 150 Search the vector store for relevant documents. 151 152 If query is text, we embed it; if it's a vector, we pass it through. 153 """ 154 if isinstance(query, str): 155 q_vec = self.embed_service.embed([query])[0] 156 return self.vector_store.search(q_vec, top_k=top_k) 157 else: 158 return self.vector_store.search(query, top_k=top_k)
Search the vector store for relevant documents.
If query is text, we embed it; if it's a vector, we pass it through.
22@dataclass(frozen=True) 23class RAGConfig: 24 """ 25 Unified configuration for RAG. 26 27 - embedding: kwargs forwarded to EmbeddingService(...) 28 - store: kwargs forwarded to create_store(...) 29 - chunking: kwargs forwarded to TextChunkingService(...) 30 - chunk_strategy: override the default chunking strategy if provided 31 """ 32 33 embedding: Dict = field(default_factory=dict) 34 store: Dict = field(default_factory=dict) 35 chunking: Dict = field(default_factory=dict) 36 chunk_strategy: Optional[Callable[[str], List[str]]] = None
Unified configuration for RAG.
- embedding: kwargs forwarded to EmbeddingService(...)
- store: kwargs forwarded to create_store(...)
- chunking: kwargs forwarded to TextChunkingService(...)
- chunk_strategy: override the default chunking strategy if provided
83class TextChunkingService(BaseChunkingService): 84 """Processor for text operations.""" 85 86 strategies = ["chunk_by_char", "chunk_by_token", "chunk_smart"] 87 88 def __init__( 89 self, 90 chunk_size: int = 2048, 91 chunk_overlap: int = 256, 92 model: Optional[str] = "gpt-3.5-turbo", 93 strategy: Optional[Callable] = None, 94 *other_configs, 95 **other_kwargs, 96 ): 97 """ 98 Initialize the text chunker. 99 100 Args: 101 chunk_size: Size of each chunk in characters 102 chunk_overlap: Overlap between chunks in characters 103 """ 104 self.model = model 105 super().__init__( 106 chunk_size, chunk_overlap, strategy, *other_configs, **other_kwargs 107 ) 108 109 def chunk_by_char( 110 self, 111 content: str, 112 ) -> List[str]: 113 """ 114 Split text into chunks 115 """ 116 chunks = [] 117 start = 0 118 119 # end was used but never declared in the original code snippet; 120 # we'll add a separate approach or define end before the loop. 121 while start < len(content): 122 end = min(start + self.chunk_size, len(content)) 123 chunk = content[start:end] 124 chunks.append(chunk) 125 start += self.chunk_size - self.chunk_overlap 126 127 return chunks 128 129 def chunk_by_token(self, content: str) -> List[str]: 130 """ 131 Split text into chunks by token. 132 133 TODO: use LLM to do this 134 """ 135 chunks = [] 136 if not self.model: 137 error_message = "Model not specified for token chunking." 138 logger.error(error_message) 139 raise ValueError(error_message) 140 tokenizer = Tokenizer(self.model) 141 tokens = tokenizer.encode(content) 142 143 if self.chunk_overlap > self.chunk_size: 144 logger.warning( 145 f"Warning: chunk_overlap ({self.chunk_overlap}) is greater than chunk_size ({self.chunk_size})." 146 " Should be <= 40%" 147 ) 148 raise ValueError("chunk_overlap should be less than or equal to chunk_size") 149 150 start = 0 151 while start < len(tokens): 152 end = min(start + self.chunk_size, len(tokens)) 153 token_chunk = tokens[start:end] 154 chunk = tokenizer.decode(token_chunk) 155 156 chunks.append(chunk) 157 start += self.chunk_size - self.chunk_overlap 158 if end >= len(tokens): 159 break 160 161 return chunks
Processor for text operations.
88 def __init__( 89 self, 90 chunk_size: int = 2048, 91 chunk_overlap: int = 256, 92 model: Optional[str] = "gpt-3.5-turbo", 93 strategy: Optional[Callable] = None, 94 *other_configs, 95 **other_kwargs, 96 ): 97 """ 98 Initialize the text chunker. 99 100 Args: 101 chunk_size: Size of each chunk in characters 102 chunk_overlap: Overlap between chunks in characters 103 """ 104 self.model = model 105 super().__init__( 106 chunk_size, chunk_overlap, strategy, *other_configs, **other_kwargs 107 )
Initialize the text chunker.
Arguments:
- chunk_size: Size of each chunk in characters
- chunk_overlap: Overlap between chunks in characters
109 def chunk_by_char( 110 self, 111 content: str, 112 ) -> List[str]: 113 """ 114 Split text into chunks 115 """ 116 chunks = [] 117 start = 0 118 119 # end was used but never declared in the original code snippet; 120 # we'll add a separate approach or define end before the loop. 121 while start < len(content): 122 end = min(start + self.chunk_size, len(content)) 123 chunk = content[start:end] 124 chunks.append(chunk) 125 start += self.chunk_size - self.chunk_overlap 126 127 return chunks
Split text into chunks
129 def chunk_by_token(self, content: str) -> List[str]: 130 """ 131 Split text into chunks by token. 132 133 TODO: use LLM to do this 134 """ 135 chunks = [] 136 if not self.model: 137 error_message = "Model not specified for token chunking." 138 logger.error(error_message) 139 raise ValueError(error_message) 140 tokenizer = Tokenizer(self.model) 141 tokens = tokenizer.encode(content) 142 143 if self.chunk_overlap > self.chunk_size: 144 logger.warning( 145 f"Warning: chunk_overlap ({self.chunk_overlap}) is greater than chunk_size ({self.chunk_size})." 146 " Should be <= 40%" 147 ) 148 raise ValueError("chunk_overlap should be less than or equal to chunk_size") 149 150 start = 0 151 while start < len(tokens): 152 end = min(start + self.chunk_size, len(tokens)) 153 token_chunk = tokens[start:end] 154 chunk = tokenizer.decode(token_chunk) 155 156 chunks.append(chunk) 157 start += self.chunk_size - self.chunk_overlap 158 if end >= len(tokens): 159 break 160 161 return chunks
Split text into chunks by token.
TODO: use LLM to do this