railtracks.vector_stores
1from .chroma import ChromaVectorStore 2from .chunking.base_chunker import Chunk 3from .chunking.fixed_token_chunker import FixedTokenChunker 4from .chunking.media_parser import MediaParser 5from .filter import F, all_of, any_of 6 7__all__ = [ 8 "all_of", 9 "any_of", 10 "ChromaVectorStore", 11 "Chunk", 12 "F", 13 "FixedTokenChunker", 14 "MediaParser", 15]
219def all_of(filters: Iterable[BaseExpr]) -> BaseExpr: 220 """Combine an iterable of filter expressions with logical AND. 221 222 Args: 223 filters: An iterable of filter expressions to combine 224 225 Returns: 226 A single filter expression (LeafExpr if one filter, LogicExpr if multiple) 227 228 Raises: 229 ValueError: If the iterable is empty 230 """ 231 lst = list(filters) 232 if not lst: 233 raise ValueError("all_of() requires at least one Expression") 234 235 if len(lst) == 1: 236 return lst[0] 237 238 return and_(*lst)
Combine an iterable of filter expressions with logical AND.
Arguments:
- filters: An iterable of filter expressions to combine
Returns:
A single filter expression (LeafExpr if one filter, LogicExpr if multiple)
Raises:
- ValueError: If the iterable is empty
241def any_of(filters: Iterable[BaseExpr]) -> BaseExpr: 242 """Combine an iterable of filter expressions with logical OR. 243 244 Args: 245 filters: An iterable of filter expressions to combine 246 247 Returns: 248 A single filter expression (LeafExpr if one filter, LogicExpr if multiple) 249 250 Raises: 251 ValueError: If the iterable is empty 252 """ 253 lst = list(filters) 254 if not lst: 255 raise ValueError("any_of() requires at least one Expression") 256 257 if len(lst) == 1: 258 return lst[0] 259 260 return or_(*lst)
Combine an iterable of filter expressions with logical OR.
Arguments:
- filters: An iterable of filter expressions to combine
Returns:
A single filter expression (LeafExpr if one filter, LogicExpr if multiple)
Raises:
- ValueError: If the iterable is empty
30class ChromaVectorStore(VectorStore): 31 """ChromaDB-backed implementation of :class:`VectorStore`. 32 33 This class wraps a Chroma collection and translates between Chroma's 34 API and the project's neutral types. The implementation currently 35 supports upserting lists of either :class:`Chunk` or strings and 36 querying by text strings. 37 """ 38 39 @classmethod 40 def class_init( 41 cls, path: Optional[str], host: Optional[str], port: Optional[int] 42 ) -> None: 43 """Lazily initialize the shared Chroma client. 44 45 This method performs an optional import of Chroma and creates a 46 persistent, HTTP or ephemeral client depending on the parameters. 47 48 Args: 49 path: Filesystem path for a persistent client (optional). 50 host: Hostname for an HTTP client (optional). 51 port: Port for an HTTP client (optional). 52 53 Raises: 54 ImportError: If the `chromadb` package is not installed. 55 """ 56 if not hasattr(cls, "_chroma"): 57 try: 58 import chromadb 59 60 # Provide just a path for local store 61 if path and not host and not port: 62 cls._chroma = chromadb.PersistentClient(path=path) 63 # Provide just a host and port for http store 64 elif not path and host and port: 65 cls._chroma = chromadb.HttpClient(host=host, port=port) 66 # Provide nothing for temporary store 67 elif not path and not host and not port: 68 cls._chroma = chromadb.EphemeralClient() 69 else: 70 raise ValueError( 71 "Invalid combination of path, host, and port for Chroma client." 72 ) 73 except ImportError: 74 raise ImportError( 75 "Chroma package is not installed. Please install railtracks[chroma]." 76 ) 77 78 @overload 79 def __init__( 80 self, 81 collection_name: str, 82 embedding_function: Callable[[list[str]], list[list[float]]], 83 *, 84 path: str, 85 ) -> None: ... 86 87 @overload 88 def __init__( 89 self, 90 collection_name: str, 91 embedding_function: Callable[[list[str]], list[list[float]]], 92 *, 93 host: str, 94 port: int, 95 ) -> None: ... 96 97 @overload 98 def __init__( 99 self, 100 collection_name: str, 101 embedding_function: Callable[[list[str]], list[list[float]]], 102 ) -> None: ... 103 104 def __init__( 105 self, 106 collection_name: str, 107 embedding_function: Callable[[list[str]], list[list[float]]], 108 path: Optional[str] = None, 109 host: Optional[str] = None, 110 port: Optional[int] = None, 111 ): 112 """Create a ChromaVectorStore instance. 113 114 Args: 115 collection_name: Name of the Chroma collection to use or create. 116 embedding_function: Callable that maps a list of strings to a list 117 of embedding vectors. 118 path: Optional path for persistent Chroma storage. 119 host: Optional HTTP host for remote Chroma. 120 port: Optional HTTP port for remote Chroma. 121 """ 122 self._collection_name = collection_name 123 self._embedding_function = embedding_function 124 125 ChromaVectorStore.class_init(path, host, port) 126 self._collection = self._chroma.get_or_create_collection(collection_name) 127 128 # In future should have our own chunking service so we can accept documents for users 129 @overload 130 def upsert(self, content: Chunk | str) -> str: ... 131 132 @overload 133 def upsert(self, content: list[Chunk] | list[str]) -> list[str]: ... 134 135 def upsert(self, content: OneOrMany[Chunk] | OneOrMany[str]) -> OneOrMany[str]: 136 """Upsert a batch of chunks or raw strings into the collection. 137 138 The method accepts a list of :class:`Chunk` instances or plain strings. 139 Each element is embedded via ``embedding_function`` and stored along 140 with metadata that always contains the original content under the 141 key defined in :data:`CONTENT`. 142 143 Args: 144 content: List of or singular chunks or strings to upsert. 145 146 Returns: 147 OneOrMany[str]: Generated ids for the inserted items. 148 """ 149 ids = [] 150 embeddings = [] 151 metadatas = [] 152 documents = [] 153 is_many = True 154 if isinstance(content, str): 155 content = [content] 156 is_many = False 157 158 if isinstance(content, Chunk): 159 content = [content] 160 is_many = False 161 162 for item in content: 163 if isinstance(item, Chunk): 164 id = item.id 165 embedding = self._embedding_function([item.content])[0] 166 metadata = item.metadata 167 metadata[CONTENT] = item.content 168 documents.append(item.document) 169 170 else: 171 id = str(uuid4()) 172 embedding = self._embedding_function([item])[0] 173 metadata = {CONTENT: item} 174 documents.append(None) 175 176 ids.append(id) 177 embeddings.append(embedding) 178 metadatas.append(metadata) 179 180 self._collection.upsert( 181 ids=ids, embeddings=embeddings, metadatas=metadatas, documents=documents 182 ) 183 return ids if is_many else ids[0] 184 185 def fetch( 186 self, 187 ids: Optional[OneOrMany[str]] = None, 188 where: Optional[Where] = None, 189 limit: Optional[int] = None, 190 offset: Optional[int] = None, 191 where_document: Optional[WhereDocument] = None, 192 ) -> FetchResponse: 193 """Fetch a set of vectors and their metadata from the collection. 194 195 Args: 196 ids: Optional list of ids or singular id to fetch. 197 where: Optional metadata filter. 198 limit: Result limit for pagination. 199 offset: Result offset for pagination. 200 where_document: Optional document-based filter. 201 202 Returns: 203 FetchResponse: A list-like container of :class:`FetchResult`. 204 205 Raises: 206 ValueError: If the Chroma response does not contain required fields. 207 """ 208 results = FetchResponse() 209 # currently we ignore Include and assume the default 210 responses = self._collection.get( 211 ids, 212 where, 213 limit, 214 offset, 215 where_document, 216 include=["embeddings", "metadatas", "documents"], 217 ) 218 219 embeddings = responses.get("embeddings") 220 if embeddings is None: 221 raise ValueError("Embeddings were not found in fetch response.") 222 documents = responses.get("documents") 223 if documents is None: 224 raise ValueError("Documents were not found in fetch response.") 225 metadatas = responses.get("metadatas") 226 if metadatas is None: 227 raise ValueError("Metadatas were not found in fetch response.") 228 229 for i, response in enumerate(responses["ids"]): 230 id = response 231 232 metadata = dict(deepcopy(metadatas[i])) 233 if not (content := metadata.get(CONTENT)) or not isinstance(content, str): 234 raise ValueError( 235 "Content was not initialized in vector. Please create an issue" 236 ) 237 238 metadata.pop(CONTENT) 239 results.append( 240 FetchResult( 241 id=id, 242 content=content, 243 vector=list(embeddings[i]), 244 document=documents[i], 245 metadata=metadata, 246 ) 247 ) 248 249 return results 250 251 # There is support for other types of query modalities but for now just list of strings 252 # Should Probably add support for Chunks as well 253 @overload 254 def search( 255 self, 256 query: Chunk | str, 257 ids: Optional[str] = None, 258 top_k: int = 10, 259 where: Optional[Where] = None, 260 where_document: Optional[WhereDocument] = None, 261 include: Include = [ 262 "metadatas", 263 "embeddings", 264 "documents", 265 "distances", 266 ], 267 ) -> SearchResponse: ... 268 269 @overload 270 def search( 271 self, 272 query: list[Chunk] | list[str], 273 ids: Optional[list[str]] = None, 274 top_k: int = 10, 275 where: Optional[Where] = None, 276 where_document: Optional[WhereDocument] = None, 277 include: Include = [ 278 "metadatas", 279 "embeddings", 280 "documents", 281 "distances", 282 ], 283 ) -> list[SearchResponse]: ... 284 285 def search( # noqa: C901 286 self, 287 query: OneOrMany[Chunk] | OneOrMany[str], 288 ids: Optional[OneOrMany[str]] = None, 289 top_k: int = 10, 290 where: Optional[Where] = None, 291 where_document: Optional[WhereDocument] = None, 292 include: Include = [ 293 "metadatas", 294 "embeddings", 295 "documents", 296 "distances", 297 ], 298 ) -> OneOrMany[SearchResponse]: 299 """Run a similarity search for the provided query texts. 300 301 Args: 302 query: A list of query strings or singular string to search for. 303 ids: Optional list of ids or singular id to restrict the search to. 304 top_k: Number of hits to return per query. 305 where: Optional metadata filter to apply. 306 where_document: Optional document filter to apply. 307 include: Fields to include in the Chroma response. 308 309 Returns: 310 A list of :class:`SearchResponse` objects (one per query). 311 312 Raises: 313 ValueError: If expected fields are missing from the Chroma response. 314 """ 315 is_many = True 316 # If a single chunk is passed in, convert to list of string 317 if isinstance(query, Chunk): 318 query = [query.content] 319 is_many = False 320 321 # If a single string is passed in, convert to list of string 322 elif isinstance(query, str): 323 query = [query] 324 is_many = False 325 326 # If list of chunks is passed in, convert to list of strings 327 elif isinstance(query, list) and all(isinstance(q, Chunk) for q in query): 328 query = [q.content for q in query] 329 330 elif isinstance(query, list) and all(isinstance(q, str) for q in query): 331 pass 332 else: 333 raise ValueError( 334 "Query must be a string, Chunk, or list of strings/Chunks." 335 ) 336 337 query_embeddings = self._embedding_function(query) 338 results = self._collection.query( 339 query_embeddings=list(query_embeddings), 340 ids=ids, 341 n_results=top_k, 342 where=where, 343 where_document=where_document, 344 include=include, 345 ) 346 answer: list[SearchResponse] = [] 347 for query_idx, query_response in enumerate(results["ids"]): 348 search_response = SearchResponse() 349 for id_idx, id in enumerate(query_response): 350 if not (distance := results.get("distances")): 351 raise ValueError("Distance not found in search results.") 352 elif not (vector := results.get("embeddings")): 353 raise ValueError("Vector not found in search results.") 354 elif not (document := results.get("documents")): 355 raise ValueError("Document not found in search results.") 356 elif not (metadatas := results.get("metadatas")): 357 raise ValueError("Metadata not found in search results.") 358 359 distance = distance[query_idx][id_idx] 360 vector = list(vector[query_idx][id_idx]) 361 document = document[query_idx][id_idx] 362 metadata = dict(deepcopy(metadatas[query_idx][id_idx])) 363 364 if not (content := metadata.get(CONTENT)) or not isinstance( 365 content, str 366 ): 367 raise ValueError( 368 "Content was not initialized in vector. Please create an issue" 369 ) 370 371 metadata.pop(CONTENT) 372 373 search_response.append( 374 SearchResult( 375 id=id, 376 distance=distance, 377 content=content, 378 vector=vector, 379 document=document, # Chroma document is just a str 380 metadata=metadata, 381 ) 382 ) 383 answer.append(search_response) 384 385 return answer if is_many else answer[0] 386 387 def delete( 388 self, 389 ids: OneOrMany[str], 390 where: Optional[Where] = None, 391 where_document: Optional[WhereDocument] = None, 392 ): 393 """ 394 Remove vectors from the store by id or metadata filter. 395 Args: 396 ids: list of ids or singular id to delete. 397 where: Optional metadata filter. 398 where_document: Optional document-based filter. 399 """ 400 401 self._collection.delete( 402 ids=ids, 403 where=where, 404 where_document=where_document, 405 ) 406 407 def count(self) -> int: 408 """Return the total number of vectors stored in the collection.""" 409 return self._collection.count()
ChromaDB-backed implementation of VectorStore.
This class wraps a Chroma collection and translates between Chroma's
API and the project's neutral types. The implementation currently
supports upserting lists of either Chunk or strings and
querying by text strings.
104 def __init__( 105 self, 106 collection_name: str, 107 embedding_function: Callable[[list[str]], list[list[float]]], 108 path: Optional[str] = None, 109 host: Optional[str] = None, 110 port: Optional[int] = None, 111 ): 112 """Create a ChromaVectorStore instance. 113 114 Args: 115 collection_name: Name of the Chroma collection to use or create. 116 embedding_function: Callable that maps a list of strings to a list 117 of embedding vectors. 118 path: Optional path for persistent Chroma storage. 119 host: Optional HTTP host for remote Chroma. 120 port: Optional HTTP port for remote Chroma. 121 """ 122 self._collection_name = collection_name 123 self._embedding_function = embedding_function 124 125 ChromaVectorStore.class_init(path, host, port) 126 self._collection = self._chroma.get_or_create_collection(collection_name)
Create a ChromaVectorStore instance.
Arguments:
- collection_name: Name of the Chroma collection to use or create.
- embedding_function: Callable that maps a list of strings to a list of embedding vectors.
- path: Optional path for persistent Chroma storage.
- host: Optional HTTP host for remote Chroma.
- port: Optional HTTP port for remote Chroma.
39 @classmethod 40 def class_init( 41 cls, path: Optional[str], host: Optional[str], port: Optional[int] 42 ) -> None: 43 """Lazily initialize the shared Chroma client. 44 45 This method performs an optional import of Chroma and creates a 46 persistent, HTTP or ephemeral client depending on the parameters. 47 48 Args: 49 path: Filesystem path for a persistent client (optional). 50 host: Hostname for an HTTP client (optional). 51 port: Port for an HTTP client (optional). 52 53 Raises: 54 ImportError: If the `chromadb` package is not installed. 55 """ 56 if not hasattr(cls, "_chroma"): 57 try: 58 import chromadb 59 60 # Provide just a path for local store 61 if path and not host and not port: 62 cls._chroma = chromadb.PersistentClient(path=path) 63 # Provide just a host and port for http store 64 elif not path and host and port: 65 cls._chroma = chromadb.HttpClient(host=host, port=port) 66 # Provide nothing for temporary store 67 elif not path and not host and not port: 68 cls._chroma = chromadb.EphemeralClient() 69 else: 70 raise ValueError( 71 "Invalid combination of path, host, and port for Chroma client." 72 ) 73 except ImportError: 74 raise ImportError( 75 "Chroma package is not installed. Please install railtracks[chroma]." 76 )
Lazily initialize the shared Chroma client.
This method performs an optional import of Chroma and creates a persistent, HTTP or ephemeral client depending on the parameters.
Arguments:
- path: Filesystem path for a persistent client (optional).
- host: Hostname for an HTTP client (optional).
- port: Port for an HTTP client (optional).
Raises:
- ImportError: If the
chromadbpackage is not installed.
135 def upsert(self, content: OneOrMany[Chunk] | OneOrMany[str]) -> OneOrMany[str]: 136 """Upsert a batch of chunks or raw strings into the collection. 137 138 The method accepts a list of :class:`Chunk` instances or plain strings. 139 Each element is embedded via ``embedding_function`` and stored along 140 with metadata that always contains the original content under the 141 key defined in :data:`CONTENT`. 142 143 Args: 144 content: List of or singular chunks or strings to upsert. 145 146 Returns: 147 OneOrMany[str]: Generated ids for the inserted items. 148 """ 149 ids = [] 150 embeddings = [] 151 metadatas = [] 152 documents = [] 153 is_many = True 154 if isinstance(content, str): 155 content = [content] 156 is_many = False 157 158 if isinstance(content, Chunk): 159 content = [content] 160 is_many = False 161 162 for item in content: 163 if isinstance(item, Chunk): 164 id = item.id 165 embedding = self._embedding_function([item.content])[0] 166 metadata = item.metadata 167 metadata[CONTENT] = item.content 168 documents.append(item.document) 169 170 else: 171 id = str(uuid4()) 172 embedding = self._embedding_function([item])[0] 173 metadata = {CONTENT: item} 174 documents.append(None) 175 176 ids.append(id) 177 embeddings.append(embedding) 178 metadatas.append(metadata) 179 180 self._collection.upsert( 181 ids=ids, embeddings=embeddings, metadatas=metadatas, documents=documents 182 ) 183 return ids if is_many else ids[0]
Upsert a batch of chunks or raw strings into the collection.
The method accepts a list of Chunk instances or plain strings.
Each element is embedded via embedding_function and stored along
with metadata that always contains the original content under the
key defined in CONTENT.
Arguments:
- content: List of or singular chunks or strings to upsert.
Returns:
OneOrMany[str]: Generated ids for the inserted items.
185 def fetch( 186 self, 187 ids: Optional[OneOrMany[str]] = None, 188 where: Optional[Where] = None, 189 limit: Optional[int] = None, 190 offset: Optional[int] = None, 191 where_document: Optional[WhereDocument] = None, 192 ) -> FetchResponse: 193 """Fetch a set of vectors and their metadata from the collection. 194 195 Args: 196 ids: Optional list of ids or singular id to fetch. 197 where: Optional metadata filter. 198 limit: Result limit for pagination. 199 offset: Result offset for pagination. 200 where_document: Optional document-based filter. 201 202 Returns: 203 FetchResponse: A list-like container of :class:`FetchResult`. 204 205 Raises: 206 ValueError: If the Chroma response does not contain required fields. 207 """ 208 results = FetchResponse() 209 # currently we ignore Include and assume the default 210 responses = self._collection.get( 211 ids, 212 where, 213 limit, 214 offset, 215 where_document, 216 include=["embeddings", "metadatas", "documents"], 217 ) 218 219 embeddings = responses.get("embeddings") 220 if embeddings is None: 221 raise ValueError("Embeddings were not found in fetch response.") 222 documents = responses.get("documents") 223 if documents is None: 224 raise ValueError("Documents were not found in fetch response.") 225 metadatas = responses.get("metadatas") 226 if metadatas is None: 227 raise ValueError("Metadatas were not found in fetch response.") 228 229 for i, response in enumerate(responses["ids"]): 230 id = response 231 232 metadata = dict(deepcopy(metadatas[i])) 233 if not (content := metadata.get(CONTENT)) or not isinstance(content, str): 234 raise ValueError( 235 "Content was not initialized in vector. Please create an issue" 236 ) 237 238 metadata.pop(CONTENT) 239 results.append( 240 FetchResult( 241 id=id, 242 content=content, 243 vector=list(embeddings[i]), 244 document=documents[i], 245 metadata=metadata, 246 ) 247 ) 248 249 return results
Fetch a set of vectors and their metadata from the collection.
Arguments:
- ids: Optional list of ids or singular id to fetch.
- where: Optional metadata filter.
- limit: Result limit for pagination.
- offset: Result offset for pagination.
- where_document: Optional document-based filter.
Returns:
FetchResponse: A list-like container of
FetchResult.
Raises:
- ValueError: If the Chroma response does not contain required fields.
285 def search( # noqa: C901 286 self, 287 query: OneOrMany[Chunk] | OneOrMany[str], 288 ids: Optional[OneOrMany[str]] = None, 289 top_k: int = 10, 290 where: Optional[Where] = None, 291 where_document: Optional[WhereDocument] = None, 292 include: Include = [ 293 "metadatas", 294 "embeddings", 295 "documents", 296 "distances", 297 ], 298 ) -> OneOrMany[SearchResponse]: 299 """Run a similarity search for the provided query texts. 300 301 Args: 302 query: A list of query strings or singular string to search for. 303 ids: Optional list of ids or singular id to restrict the search to. 304 top_k: Number of hits to return per query. 305 where: Optional metadata filter to apply. 306 where_document: Optional document filter to apply. 307 include: Fields to include in the Chroma response. 308 309 Returns: 310 A list of :class:`SearchResponse` objects (one per query). 311 312 Raises: 313 ValueError: If expected fields are missing from the Chroma response. 314 """ 315 is_many = True 316 # If a single chunk is passed in, convert to list of string 317 if isinstance(query, Chunk): 318 query = [query.content] 319 is_many = False 320 321 # If a single string is passed in, convert to list of string 322 elif isinstance(query, str): 323 query = [query] 324 is_many = False 325 326 # If list of chunks is passed in, convert to list of strings 327 elif isinstance(query, list) and all(isinstance(q, Chunk) for q in query): 328 query = [q.content for q in query] 329 330 elif isinstance(query, list) and all(isinstance(q, str) for q in query): 331 pass 332 else: 333 raise ValueError( 334 "Query must be a string, Chunk, or list of strings/Chunks." 335 ) 336 337 query_embeddings = self._embedding_function(query) 338 results = self._collection.query( 339 query_embeddings=list(query_embeddings), 340 ids=ids, 341 n_results=top_k, 342 where=where, 343 where_document=where_document, 344 include=include, 345 ) 346 answer: list[SearchResponse] = [] 347 for query_idx, query_response in enumerate(results["ids"]): 348 search_response = SearchResponse() 349 for id_idx, id in enumerate(query_response): 350 if not (distance := results.get("distances")): 351 raise ValueError("Distance not found in search results.") 352 elif not (vector := results.get("embeddings")): 353 raise ValueError("Vector not found in search results.") 354 elif not (document := results.get("documents")): 355 raise ValueError("Document not found in search results.") 356 elif not (metadatas := results.get("metadatas")): 357 raise ValueError("Metadata not found in search results.") 358 359 distance = distance[query_idx][id_idx] 360 vector = list(vector[query_idx][id_idx]) 361 document = document[query_idx][id_idx] 362 metadata = dict(deepcopy(metadatas[query_idx][id_idx])) 363 364 if not (content := metadata.get(CONTENT)) or not isinstance( 365 content, str 366 ): 367 raise ValueError( 368 "Content was not initialized in vector. Please create an issue" 369 ) 370 371 metadata.pop(CONTENT) 372 373 search_response.append( 374 SearchResult( 375 id=id, 376 distance=distance, 377 content=content, 378 vector=vector, 379 document=document, # Chroma document is just a str 380 metadata=metadata, 381 ) 382 ) 383 answer.append(search_response) 384 385 return answer if is_many else answer[0]
Run a similarity search for the provided query texts.
Arguments:
- query: A list of query strings or singular string to search for.
- ids: Optional list of ids or singular id to restrict the search to.
- top_k: Number of hits to return per query.
- where: Optional metadata filter to apply.
- where_document: Optional document filter to apply.
- include: Fields to include in the Chroma response.
Returns:
A list of
SearchResponseobjects (one per query).
Raises:
- ValueError: If expected fields are missing from the Chroma response.
387 def delete( 388 self, 389 ids: OneOrMany[str], 390 where: Optional[Where] = None, 391 where_document: Optional[WhereDocument] = None, 392 ): 393 """ 394 Remove vectors from the store by id or metadata filter. 395 Args: 396 ids: list of ids or singular id to delete. 397 where: Optional metadata filter. 398 where_document: Optional document-based filter. 399 """ 400 401 self._collection.delete( 402 ids=ids, 403 where=where, 404 where_document=where_document, 405 )
Remove vectors from the store by id or metadata filter.
Arguments:
- ids: list of ids or singular id to delete.
- where: Optional metadata filter.
- where_document: Optional document-based filter.
10@dataclass 11class Chunk: 12 """Structured chunk that can be upserted to a vector store. 13 14 Attributes: 15 content (str): The raw chunk text. 16 id (Optional[str]): Identifier for the chunk. If not provided, a UUID 17 is automatically generated in ``__post_init__``. 18 document (Optional[str]): Optional document identifier or content 19 associated with the chunk. 20 metadata (dict[str, Any]): Arbitrary key-value metadata associated 21 with this chunk. Defaults to an empty dictionary. 22 23 """ 24 25 content: str 26 id: Optional[str] = None 27 document: Optional[str] = None 28 metadata: Optional[dict[str, Any]] = field(default_factory=dict) 29 30 def __post_init__(self) -> None: 31 """Normalize metadata and ensure identifier is populated.""" 32 if self.metadata is None: 33 self.metadata = {} 34 if self.id is None: 35 self.id = str(uuid4())
Structured chunk that can be upserted to a vector store.
Attributes:
- content (str): The raw chunk text.
- id (Optional[str]): Identifier for the chunk. If not provided, a UUID
is automatically generated in
__post_init__. - document (Optional[str]): Optional document identifier or content associated with the chunk.
- metadata (dict[str, Any]): Arbitrary key-value metadata associated with this chunk. Defaults to an empty dictionary.
9class FixedTokenChunker(BaseChunker): 10 """A chunker that splits text strictly by token count. 11 12 This implementation divides text using a fixed token window, optionally 13 with overlap between chunks. Tokenization is performed using `tiktoken` 14 and defaults to the `cl100k_base` tokenizer unless otherwise specified. 15 16 Args: 17 chunk_size (int): Maximum number of tokens allowed in a produced chunk. 18 Defaults to 400. 19 overlap (int): Number of tokens shared between adjacent chunks. 20 Defaults to 200. 21 tokenizer (Optional[str]): Name of the `tiktoken` encoding to use. If 22 omitted, ``cl100k_base`` is used. 23 24 Attributes: 25 _chunk_size (int): Internal storage for chunk size. 26 _overlap (int): Internal storage for token overlap. 27 _tokenizer (tiktoken.Encoding): Tokenizer used for encoding/decoding. 28 """ 29 30 def __init__( 31 self, chunk_size: int = 400, overlap: int = 200, tokenizer: Optional[str] = None 32 ): 33 super().__init__(chunk_size, overlap) 34 self._tokenizer = ( 35 tiktoken.get_encoding(tokenizer) 36 if tokenizer 37 else tiktoken.get_encoding("cl100k_base") 38 ) 39 40 def split_text( 41 self, 42 text: str, 43 ) -> list[str]: 44 """Split raw text into token-based windows. 45 46 The text is tokenized using the configured tokenizer, and then divided 47 into windows of ``_chunk_size`` tokens with ``_overlap`` tokens of 48 backward overlap. 49 50 Args: 51 text (str): Raw text to split. 52 53 Returns: 54 list[str]: A list of text segments decoded back from token windows. 55 Note : returns an empty list if passed an empty string 56 """ 57 58 text_chunks = [] 59 tokens = self._tokenizer.encode(text) 60 start = 0 61 62 while start < len(tokens): 63 end = min(start + self._chunk_size, len(tokens)) 64 token_window = tokens[start:end] 65 text_chunks.append(self._tokenizer.decode(token_window)) 66 start += self._chunk_size - self._overlap 67 68 return text_chunks
A chunker that splits text strictly by token count.
This implementation divides text using a fixed token window, optionally
with overlap between chunks. Tokenization is performed using tiktoken
and defaults to the cl100k_base tokenizer unless otherwise specified.
Arguments:
- chunk_size (int): Maximum number of tokens allowed in a produced chunk. Defaults to 400.
- overlap (int): Number of tokens shared between adjacent chunks. Defaults to 200.
- tokenizer (Optional[str]): Name of the
tiktokenencoding to use. If omitted,cl100k_baseis used.
Attributes:
- _chunk_size (int): Internal storage for chunk size.
- _overlap (int): Internal storage for token overlap.
- _tokenizer (tiktoken.Encoding): Tokenizer used for encoding/decoding.
40 def split_text( 41 self, 42 text: str, 43 ) -> list[str]: 44 """Split raw text into token-based windows. 45 46 The text is tokenized using the configured tokenizer, and then divided 47 into windows of ``_chunk_size`` tokens with ``_overlap`` tokens of 48 backward overlap. 49 50 Args: 51 text (str): Raw text to split. 52 53 Returns: 54 list[str]: A list of text segments decoded back from token windows. 55 Note : returns an empty list if passed an empty string 56 """ 57 58 text_chunks = [] 59 tokens = self._tokenizer.encode(text) 60 start = 0 61 62 while start < len(tokens): 63 end = min(start + self._chunk_size, len(tokens)) 64 token_window = tokens[start:end] 65 text_chunks.append(self._tokenizer.decode(token_window)) 66 start += self._chunk_size - self._overlap 67 68 return text_chunks
Split raw text into token-based windows.
The text is tokenized using the configured tokenizer, and then divided
into windows of _chunk_size tokens with _overlap tokens of
backward overlap.
Arguments:
- text (str): Raw text to split.
Returns:
list[str]: A list of text segments decoded back from token windows. Note : returns an empty list if passed an empty string
10class MediaParser: 11 """General-purpose media parser capable of extracting text from various file types. 12 13 Currently supports: 14 - .txt 15 - .pdf 16 """ 17 18 @classmethod 19 def get_text(cls, path: str, **kwargs) -> str: 20 """Return cleaned text extracted from a supported file. 21 22 Args: 23 path: Path to the file 24 **kwargs: Parser-specific arguments (e.g., encoding for .txt files) 25 """ 26 ext = cls._get_extension(path) 27 if ext == ".txt": 28 handler_name = "_parse_txt" 29 30 elif ext == ".pdf": 31 handler_name = "_parse_pdf" 32 33 else: 34 raise ValueError(f"Unsupported file type: {ext}") 35 36 parser_function = getattr(cls, handler_name) 37 raw_text = parser_function(path, **kwargs) # Pass kwargs through 38 return cls._clean_text(raw_text) 39 40 @staticmethod 41 def _parse_txt(filepath: str, encoding: Optional[str] = None, **kwargs) -> str: 42 """Extract text from a plain .txt file.""" 43 if not os.path.isfile(filepath): 44 raise FileNotFoundError(f"File not found: {filepath}") 45 46 if encoding is not None: 47 with open(filepath, "r", encoding=encoding) as f: 48 return f.read() 49 50 # Auto-detect encoding 51 detected = from_path(filepath).best() 52 if detected is None: 53 raise ValueError(f"Failed to detect encoding for: {filepath}") 54 55 with open(filepath, "r", encoding=detected.encoding) as f: 56 return f.read() 57 58 @staticmethod 59 def _parse_pdf(filepath: str, **kwargs) -> str: 60 """Extract text from a PDF using pdfplumber.""" 61 if not os.path.isfile(filepath): 62 raise FileNotFoundError(f"File not found: {filepath}") 63 64 global _pdfplumber 65 66 if _pdfplumber is None: 67 try: 68 import pdfplumber 69 70 _pdfplumber = pdfplumber 71 except ImportError: 72 raise RuntimeError( 73 "pdfplumber is required for PDF parsing but isn't installed. " 74 "Install it via `pip install pdfplumber`." 75 ) 76 77 with _pdfplumber.open(filepath) as doc: 78 extracted = [] 79 for page in doc.pages: 80 text = page.extract_text() 81 if text: 82 extracted.append(text) 83 return "\n".join(extracted) 84 85 @staticmethod 86 def _clean_text(text: str) -> str: 87 """Remove null bytes / non-printable characters while preserving whitespace.""" 88 if not text: 89 return "" 90 return "".join(char for char in text if char.isprintable() or char in "\t\n\r") 91 92 @staticmethod 93 def _get_extension(path: str) -> str: 94 """Return file extension in lowercase.""" 95 _, ext = os.path.splitext(path) 96 return ext.lower()
General-purpose media parser capable of extracting text from various file types.
Currently supports:
- .txt
18 @classmethod 19 def get_text(cls, path: str, **kwargs) -> str: 20 """Return cleaned text extracted from a supported file. 21 22 Args: 23 path: Path to the file 24 **kwargs: Parser-specific arguments (e.g., encoding for .txt files) 25 """ 26 ext = cls._get_extension(path) 27 if ext == ".txt": 28 handler_name = "_parse_txt" 29 30 elif ext == ".pdf": 31 handler_name = "_parse_pdf" 32 33 else: 34 raise ValueError(f"Unsupported file type: {ext}") 35 36 parser_function = getattr(cls, handler_name) 37 raw_text = parser_function(path, **kwargs) # Pass kwargs through 38 return cls._clean_text(raw_text)
Return cleaned text extracted from a supported file.
Arguments:
- path: Path to the file
- **kwargs: Parser-specific arguments (e.g., encoding for .txt files)