railtracks.vector_stores

 1from .chroma import ChromaVectorStore
 2from .chunking.base_chunker import Chunk
 3from .chunking.fixed_token_chunker import FixedTokenChunker
 4from .chunking.media_parser import MediaParser
 5from .filter import F, all_of, any_of
 6
 7__all__ = [
 8    "all_of",
 9    "any_of",
10    "ChromaVectorStore",
11    "Chunk",
12    "F",
13    "FixedTokenChunker",
14    "MediaParser",
15]
def all_of( filters: Iterable[railtracks.vector_stores.filter.BaseExpr]) -> railtracks.vector_stores.filter.BaseExpr:
219def all_of(filters: Iterable[BaseExpr]) -> BaseExpr:
220    """Combine an iterable of filter expressions with logical AND.
221
222    Args:
223        filters: An iterable of filter expressions to combine
224
225    Returns:
226        A single filter expression (LeafExpr if one filter, LogicExpr if multiple)
227
228    Raises:
229        ValueError: If the iterable is empty
230    """
231    lst = list(filters)
232    if not lst:
233        raise ValueError("all_of() requires at least one Expression")
234
235    if len(lst) == 1:
236        return lst[0]
237
238    return and_(*lst)

Combine an iterable of filter expressions with logical AND.

Arguments:
  • filters: An iterable of filter expressions to combine
Returns:

A single filter expression (LeafExpr if one filter, LogicExpr if multiple)

Raises:
  • ValueError: If the iterable is empty
def any_of( filters: Iterable[railtracks.vector_stores.filter.BaseExpr]) -> railtracks.vector_stores.filter.BaseExpr:
241def any_of(filters: Iterable[BaseExpr]) -> BaseExpr:
242    """Combine an iterable of filter expressions with logical OR.
243
244    Args:
245        filters: An iterable of filter expressions to combine
246
247    Returns:
248        A single filter expression (LeafExpr if one filter, LogicExpr if multiple)
249
250    Raises:
251        ValueError: If the iterable is empty
252    """
253    lst = list(filters)
254    if not lst:
255        raise ValueError("any_of() requires at least one Expression")
256
257    if len(lst) == 1:
258        return lst[0]
259
260    return or_(*lst)

Combine an iterable of filter expressions with logical OR.

Arguments:
  • filters: An iterable of filter expressions to combine
Returns:

A single filter expression (LeafExpr if one filter, LogicExpr if multiple)

Raises:
  • ValueError: If the iterable is empty
class ChromaVectorStore(railtracks.vector_stores.vector_store_base.VectorStore):
 30class ChromaVectorStore(VectorStore):
 31    """ChromaDB-backed implementation of :class:`VectorStore`.
 32
 33    This class wraps a Chroma collection and translates between Chroma's
 34    API and the project's neutral types. The implementation currently
 35    supports upserting lists of either :class:`Chunk` or strings and
 36    querying by text strings.
 37    """
 38
 39    @classmethod
 40    def class_init(
 41        cls, path: Optional[str], host: Optional[str], port: Optional[int]
 42    ) -> None:
 43        """Lazily initialize the shared Chroma client.
 44
 45        This method performs an optional import of Chroma and creates a
 46        persistent, HTTP or ephemeral client depending on the parameters.
 47
 48        Args:
 49            path: Filesystem path for a persistent client (optional).
 50            host: Hostname for an HTTP client (optional).
 51            port: Port for an HTTP client (optional).
 52
 53        Raises:
 54            ImportError: If the `chromadb` package is not installed.
 55        """
 56        if not hasattr(cls, "_chroma"):
 57            try:
 58                import chromadb
 59
 60                # Provide just a path for local store
 61                if path and not host and not port:
 62                    cls._chroma = chromadb.PersistentClient(path=path)
 63                # Provide just a host and port for http store
 64                elif not path and host and port:
 65                    cls._chroma = chromadb.HttpClient(host=host, port=port)
 66                # Provide nothing for temporary store
 67                elif not path and not host and not port:
 68                    cls._chroma = chromadb.EphemeralClient()
 69                else:
 70                    raise ValueError(
 71                        "Invalid combination of path, host, and port for Chroma client."
 72                    )
 73            except ImportError:
 74                raise ImportError(
 75                    "Chroma package is not installed. Please install railtracks[chroma]."
 76                )
 77
 78    @overload
 79    def __init__(
 80        self,
 81        collection_name: str,
 82        embedding_function: Callable[[list[str]], list[list[float]]],
 83        *,
 84        path: str,
 85    ) -> None: ...
 86
 87    @overload
 88    def __init__(
 89        self,
 90        collection_name: str,
 91        embedding_function: Callable[[list[str]], list[list[float]]],
 92        *,
 93        host: str,
 94        port: int,
 95    ) -> None: ...
 96
 97    @overload
 98    def __init__(
 99        self,
100        collection_name: str,
101        embedding_function: Callable[[list[str]], list[list[float]]],
102    ) -> None: ...
103
104    def __init__(
105        self,
106        collection_name: str,
107        embedding_function: Callable[[list[str]], list[list[float]]],
108        path: Optional[str] = None,
109        host: Optional[str] = None,
110        port: Optional[int] = None,
111    ):
112        """Create a ChromaVectorStore instance.
113
114        Args:
115            collection_name: Name of the Chroma collection to use or create.
116            embedding_function: Callable that maps a list of strings to a list
117                of embedding vectors.
118            path: Optional path for persistent Chroma storage.
119            host: Optional HTTP host for remote Chroma.
120            port: Optional HTTP port for remote Chroma.
121        """
122        self._collection_name = collection_name
123        self._embedding_function = embedding_function
124
125        ChromaVectorStore.class_init(path, host, port)
126        self._collection = self._chroma.get_or_create_collection(collection_name)
127
128    # In future should have our own chunking service so we can accept documents for users
129    @overload
130    def upsert(self, content: Chunk | str) -> str: ...
131
132    @overload
133    def upsert(self, content: list[Chunk] | list[str]) -> list[str]: ...
134
135    def upsert(self, content: OneOrMany[Chunk] | OneOrMany[str]) -> OneOrMany[str]:
136        """Upsert a batch of chunks or raw strings into the collection.
137
138        The method accepts a list of :class:`Chunk` instances or plain strings.
139        Each element is embedded via ``embedding_function`` and stored along
140        with metadata that always contains the original content under the
141        key defined in :data:`CONTENT`.
142
143        Args:
144            content: List of or singular chunks or strings to upsert.
145
146        Returns:
147            OneOrMany[str]: Generated ids for the inserted items.
148        """
149        ids = []
150        embeddings = []
151        metadatas = []
152        documents = []
153        is_many = True
154        if isinstance(content, str):
155            content = [content]
156            is_many = False
157
158        if isinstance(content, Chunk):
159            content = [content]
160            is_many = False
161
162        for item in content:
163            if isinstance(item, Chunk):
164                id = item.id
165                embedding = self._embedding_function([item.content])[0]
166                metadata = item.metadata
167                metadata[CONTENT] = item.content
168                documents.append(item.document)
169
170            else:
171                id = str(uuid4())
172                embedding = self._embedding_function([item])[0]
173                metadata = {CONTENT: item}
174                documents.append(None)
175
176            ids.append(id)
177            embeddings.append(embedding)
178            metadatas.append(metadata)
179
180        self._collection.upsert(
181            ids=ids, embeddings=embeddings, metadatas=metadatas, documents=documents
182        )
183        return ids if is_many else ids[0]
184
185    def fetch(
186        self,
187        ids: Optional[OneOrMany[str]] = None,
188        where: Optional[Where] = None,
189        limit: Optional[int] = None,
190        offset: Optional[int] = None,
191        where_document: Optional[WhereDocument] = None,
192    ) -> FetchResponse:
193        """Fetch a set of vectors and their metadata from the collection.
194
195        Args:
196            ids: Optional list of ids or singular id to fetch.
197            where: Optional metadata filter.
198            limit: Result limit for pagination.
199            offset: Result offset for pagination.
200            where_document: Optional document-based filter.
201
202        Returns:
203            FetchResponse: A list-like container of :class:`FetchResult`.
204
205        Raises:
206            ValueError: If the Chroma response does not contain required fields.
207        """
208        results = FetchResponse()
209        # currently we ignore Include and assume the default
210        responses = self._collection.get(
211            ids,
212            where,
213            limit,
214            offset,
215            where_document,
216            include=["embeddings", "metadatas", "documents"],
217        )
218
219        embeddings = responses.get("embeddings")
220        if embeddings is None:
221            raise ValueError("Embeddings were not found in fetch response.")
222        documents = responses.get("documents")
223        if documents is None:
224            raise ValueError("Documents were not found in fetch response.")
225        metadatas = responses.get("metadatas")
226        if metadatas is None:
227            raise ValueError("Metadatas were not found in fetch response.")
228
229        for i, response in enumerate(responses["ids"]):
230            id = response
231
232            metadata = dict(deepcopy(metadatas[i]))
233            if not (content := metadata.get(CONTENT)) or not isinstance(content, str):
234                raise ValueError(
235                    "Content was not initialized in vector. Please create an issue"
236                )
237
238            metadata.pop(CONTENT)
239            results.append(
240                FetchResult(
241                    id=id,
242                    content=content,
243                    vector=list(embeddings[i]),
244                    document=documents[i],
245                    metadata=metadata,
246                )
247            )
248
249        return results
250
251    # There is support for other types of query modalities but for now just list of strings
252    # Should Probably add support for Chunks as well
253    @overload
254    def search(
255        self,
256        query: Chunk | str,
257        ids: Optional[str] = None,
258        top_k: int = 10,
259        where: Optional[Where] = None,
260        where_document: Optional[WhereDocument] = None,
261        include: Include = [
262            "metadatas",
263            "embeddings",
264            "documents",
265            "distances",
266        ],
267    ) -> SearchResponse: ...
268
269    @overload
270    def search(
271        self,
272        query: list[Chunk] | list[str],
273        ids: Optional[list[str]] = None,
274        top_k: int = 10,
275        where: Optional[Where] = None,
276        where_document: Optional[WhereDocument] = None,
277        include: Include = [
278            "metadatas",
279            "embeddings",
280            "documents",
281            "distances",
282        ],
283    ) -> list[SearchResponse]: ...
284
285    def search(  # noqa: C901
286        self,
287        query: OneOrMany[Chunk] | OneOrMany[str],
288        ids: Optional[OneOrMany[str]] = None,
289        top_k: int = 10,
290        where: Optional[Where] = None,
291        where_document: Optional[WhereDocument] = None,
292        include: Include = [
293            "metadatas",
294            "embeddings",
295            "documents",
296            "distances",
297        ],
298    ) -> OneOrMany[SearchResponse]:
299        """Run a similarity search for the provided query texts.
300
301        Args:
302            query: A list of query strings or singular string to search for.
303            ids: Optional list of ids or singular id to restrict the search to.
304            top_k: Number of hits to return per query.
305            where: Optional metadata filter to apply.
306            where_document: Optional document filter to apply.
307            include: Fields to include in the Chroma response.
308
309        Returns:
310            A list of :class:`SearchResponse` objects (one per query).
311
312        Raises:
313            ValueError: If expected fields are missing from the Chroma response.
314        """
315        is_many = True
316        # If a single chunk is passed in, convert to list of string
317        if isinstance(query, Chunk):
318            query = [query.content]
319            is_many = False
320
321        # If a single string is passed in, convert to list of string
322        elif isinstance(query, str):
323            query = [query]
324            is_many = False
325
326        # If list of chunks is passed in, convert to list of strings
327        elif isinstance(query, list) and all(isinstance(q, Chunk) for q in query):
328            query = [q.content for q in query]
329
330        elif isinstance(query, list) and all(isinstance(q, str) for q in query):
331            pass
332        else:
333            raise ValueError(
334                "Query must be a string, Chunk, or list of strings/Chunks."
335            )
336
337        query_embeddings = self._embedding_function(query)
338        results = self._collection.query(
339            query_embeddings=list(query_embeddings),
340            ids=ids,
341            n_results=top_k,
342            where=where,
343            where_document=where_document,
344            include=include,
345        )
346        answer: list[SearchResponse] = []
347        for query_idx, query_response in enumerate(results["ids"]):
348            search_response = SearchResponse()
349            for id_idx, id in enumerate(query_response):
350                if not (distance := results.get("distances")):
351                    raise ValueError("Distance not found in search results.")
352                elif not (vector := results.get("embeddings")):
353                    raise ValueError("Vector not found in search results.")
354                elif not (document := results.get("documents")):
355                    raise ValueError("Document not found in search results.")
356                elif not (metadatas := results.get("metadatas")):
357                    raise ValueError("Metadata not found in search results.")
358
359                distance = distance[query_idx][id_idx]
360                vector = list(vector[query_idx][id_idx])
361                document = document[query_idx][id_idx]
362                metadata = dict(deepcopy(metadatas[query_idx][id_idx]))
363
364                if not (content := metadata.get(CONTENT)) or not isinstance(
365                    content, str
366                ):
367                    raise ValueError(
368                        "Content was not initialized in vector. Please create an issue"
369                    )
370
371                metadata.pop(CONTENT)
372
373                search_response.append(
374                    SearchResult(
375                        id=id,
376                        distance=distance,
377                        content=content,
378                        vector=vector,
379                        document=document,  # Chroma document is just a str
380                        metadata=metadata,
381                    )
382                )
383            answer.append(search_response)
384
385        return answer if is_many else answer[0]
386
387    def delete(
388        self,
389        ids: OneOrMany[str],
390        where: Optional[Where] = None,
391        where_document: Optional[WhereDocument] = None,
392    ):
393        """
394        Remove vectors from the store by id or metadata filter.
395        Args:
396            ids: list of ids or singular id to delete.
397            where: Optional metadata filter.
398            where_document: Optional document-based filter.
399        """
400
401        self._collection.delete(
402            ids=ids,
403            where=where,
404            where_document=where_document,
405        )
406
407    def count(self) -> int:
408        """Return the total number of vectors stored in the collection."""
409        return self._collection.count()

ChromaDB-backed implementation of VectorStore.

This class wraps a Chroma collection and translates between Chroma's API and the project's neutral types. The implementation currently supports upserting lists of either Chunk or strings and querying by text strings.

ChromaVectorStore( collection_name: str, embedding_function: Callable[[list[str]], list[list[float]]], path: Optional[str] = None, host: Optional[str] = None, port: Optional[int] = None)
104    def __init__(
105        self,
106        collection_name: str,
107        embedding_function: Callable[[list[str]], list[list[float]]],
108        path: Optional[str] = None,
109        host: Optional[str] = None,
110        port: Optional[int] = None,
111    ):
112        """Create a ChromaVectorStore instance.
113
114        Args:
115            collection_name: Name of the Chroma collection to use or create.
116            embedding_function: Callable that maps a list of strings to a list
117                of embedding vectors.
118            path: Optional path for persistent Chroma storage.
119            host: Optional HTTP host for remote Chroma.
120            port: Optional HTTP port for remote Chroma.
121        """
122        self._collection_name = collection_name
123        self._embedding_function = embedding_function
124
125        ChromaVectorStore.class_init(path, host, port)
126        self._collection = self._chroma.get_or_create_collection(collection_name)

Create a ChromaVectorStore instance.

Arguments:
  • collection_name: Name of the Chroma collection to use or create.
  • embedding_function: Callable that maps a list of strings to a list of embedding vectors.
  • path: Optional path for persistent Chroma storage.
  • host: Optional HTTP host for remote Chroma.
  • port: Optional HTTP port for remote Chroma.
@classmethod
def class_init( cls, path: Optional[str], host: Optional[str], port: Optional[int]) -> None:
39    @classmethod
40    def class_init(
41        cls, path: Optional[str], host: Optional[str], port: Optional[int]
42    ) -> None:
43        """Lazily initialize the shared Chroma client.
44
45        This method performs an optional import of Chroma and creates a
46        persistent, HTTP or ephemeral client depending on the parameters.
47
48        Args:
49            path: Filesystem path for a persistent client (optional).
50            host: Hostname for an HTTP client (optional).
51            port: Port for an HTTP client (optional).
52
53        Raises:
54            ImportError: If the `chromadb` package is not installed.
55        """
56        if not hasattr(cls, "_chroma"):
57            try:
58                import chromadb
59
60                # Provide just a path for local store
61                if path and not host and not port:
62                    cls._chroma = chromadb.PersistentClient(path=path)
63                # Provide just a host and port for http store
64                elif not path and host and port:
65                    cls._chroma = chromadb.HttpClient(host=host, port=port)
66                # Provide nothing for temporary store
67                elif not path and not host and not port:
68                    cls._chroma = chromadb.EphemeralClient()
69                else:
70                    raise ValueError(
71                        "Invalid combination of path, host, and port for Chroma client."
72                    )
73            except ImportError:
74                raise ImportError(
75                    "Chroma package is not installed. Please install railtracks[chroma]."
76                )

Lazily initialize the shared Chroma client.

This method performs an optional import of Chroma and creates a persistent, HTTP or ephemeral client depending on the parameters.

Arguments:
  • path: Filesystem path for a persistent client (optional).
  • host: Hostname for an HTTP client (optional).
  • port: Port for an HTTP client (optional).
Raises:
  • ImportError: If the chromadb package is not installed.
def upsert( self, content: Union[Chunk, list[Chunk], str, list[str]]) -> Union[str, list[str]]:
135    def upsert(self, content: OneOrMany[Chunk] | OneOrMany[str]) -> OneOrMany[str]:
136        """Upsert a batch of chunks or raw strings into the collection.
137
138        The method accepts a list of :class:`Chunk` instances or plain strings.
139        Each element is embedded via ``embedding_function`` and stored along
140        with metadata that always contains the original content under the
141        key defined in :data:`CONTENT`.
142
143        Args:
144            content: List of or singular chunks or strings to upsert.
145
146        Returns:
147            OneOrMany[str]: Generated ids for the inserted items.
148        """
149        ids = []
150        embeddings = []
151        metadatas = []
152        documents = []
153        is_many = True
154        if isinstance(content, str):
155            content = [content]
156            is_many = False
157
158        if isinstance(content, Chunk):
159            content = [content]
160            is_many = False
161
162        for item in content:
163            if isinstance(item, Chunk):
164                id = item.id
165                embedding = self._embedding_function([item.content])[0]
166                metadata = item.metadata
167                metadata[CONTENT] = item.content
168                documents.append(item.document)
169
170            else:
171                id = str(uuid4())
172                embedding = self._embedding_function([item])[0]
173                metadata = {CONTENT: item}
174                documents.append(None)
175
176            ids.append(id)
177            embeddings.append(embedding)
178            metadatas.append(metadata)
179
180        self._collection.upsert(
181            ids=ids, embeddings=embeddings, metadatas=metadatas, documents=documents
182        )
183        return ids if is_many else ids[0]

Upsert a batch of chunks or raw strings into the collection.

The method accepts a list of Chunk instances or plain strings. Each element is embedded via embedding_function and stored along with metadata that always contains the original content under the key defined in CONTENT.

Arguments:
  • content: List of or singular chunks or strings to upsert.
Returns:

OneOrMany[str]: Generated ids for the inserted items.

def fetch( self, ids: Union[str, list[str], NoneType] = None, where: Optional[Dict[Union[str, Literal['$and'], Literal['$or']], Union[str, int, float, bool, Dict[Union[Literal['$gt'], Literal['$gte'], Literal['$lt'], Literal['$lte'], Literal['$ne'], Literal['$eq'], Literal['$and'], Literal['$or']], Union[str, int, float, bool]], Dict[Union[Literal['$in'], Literal['$nin']], List[Union[str, int, float, bool]]], Dict[Union[Literal['$contains'], Literal['$not_contains']], Union[str, int, float, bool]], List[Dict[Union[str, Literal['$and'], Literal['$or']], Union[str, int, float, bool, Dict[Union[Literal['$gt'], Literal['$gte'], Literal['$lt'], Literal['$lte'], Literal['$ne'], Literal['$eq'], Literal['$and'], Literal['$or']], Union[str, int, float, bool]], Dict[Union[Literal['$in'], Literal['$nin']], List[Union[str, int, float, bool]]], Dict[Union[Literal['$contains'], Literal['$not_contains']], Union[str, int, float, bool]], List[ForwardRef('Where')]]]]]]] = None, limit: Optional[int] = None, offset: Optional[int] = None, where_document: Optional[Dict[Union[Literal['$contains'], Literal['$not_contains'], Literal['$regex'], Literal['$not_regex'], Literal['$and'], Literal['$or']], Union[str, List[Dict[Union[Literal['$contains'], Literal['$not_contains'], Literal['$regex'], Literal['$not_regex'], Literal['$and'], Literal['$or']], Union[str, List[ForwardRef('WhereDocument')]]]]]]] = None) -> list[railtracks.vector_stores.vector_store_base.FetchResult]:
185    def fetch(
186        self,
187        ids: Optional[OneOrMany[str]] = None,
188        where: Optional[Where] = None,
189        limit: Optional[int] = None,
190        offset: Optional[int] = None,
191        where_document: Optional[WhereDocument] = None,
192    ) -> FetchResponse:
193        """Fetch a set of vectors and their metadata from the collection.
194
195        Args:
196            ids: Optional list of ids or singular id to fetch.
197            where: Optional metadata filter.
198            limit: Result limit for pagination.
199            offset: Result offset for pagination.
200            where_document: Optional document-based filter.
201
202        Returns:
203            FetchResponse: A list-like container of :class:`FetchResult`.
204
205        Raises:
206            ValueError: If the Chroma response does not contain required fields.
207        """
208        results = FetchResponse()
209        # currently we ignore Include and assume the default
210        responses = self._collection.get(
211            ids,
212            where,
213            limit,
214            offset,
215            where_document,
216            include=["embeddings", "metadatas", "documents"],
217        )
218
219        embeddings = responses.get("embeddings")
220        if embeddings is None:
221            raise ValueError("Embeddings were not found in fetch response.")
222        documents = responses.get("documents")
223        if documents is None:
224            raise ValueError("Documents were not found in fetch response.")
225        metadatas = responses.get("metadatas")
226        if metadatas is None:
227            raise ValueError("Metadatas were not found in fetch response.")
228
229        for i, response in enumerate(responses["ids"]):
230            id = response
231
232            metadata = dict(deepcopy(metadatas[i]))
233            if not (content := metadata.get(CONTENT)) or not isinstance(content, str):
234                raise ValueError(
235                    "Content was not initialized in vector. Please create an issue"
236                )
237
238            metadata.pop(CONTENT)
239            results.append(
240                FetchResult(
241                    id=id,
242                    content=content,
243                    vector=list(embeddings[i]),
244                    document=documents[i],
245                    metadata=metadata,
246                )
247            )
248
249        return results

Fetch a set of vectors and their metadata from the collection.

Arguments:
  • ids: Optional list of ids or singular id to fetch.
  • where: Optional metadata filter.
  • limit: Result limit for pagination.
  • offset: Result offset for pagination.
  • where_document: Optional document-based filter.
Returns:

FetchResponse: A list-like container of FetchResult.

Raises:
  • ValueError: If the Chroma response does not contain required fields.
def search( self, query: Union[Chunk, list[Chunk], str, list[str]], ids: Union[str, list[str], NoneType] = None, top_k: int = 10, where: Optional[Dict[Union[str, Literal['$and'], Literal['$or']], Union[str, int, float, bool, Dict[Union[Literal['$gt'], Literal['$gte'], Literal['$lt'], Literal['$lte'], Literal['$ne'], Literal['$eq'], Literal['$and'], Literal['$or']], Union[str, int, float, bool]], Dict[Union[Literal['$in'], Literal['$nin']], List[Union[str, int, float, bool]]], Dict[Union[Literal['$contains'], Literal['$not_contains']], Union[str, int, float, bool]], List[Dict[Union[str, Literal['$and'], Literal['$or']], Union[str, int, float, bool, Dict[Union[Literal['$gt'], Literal['$gte'], Literal['$lt'], Literal['$lte'], Literal['$ne'], Literal['$eq'], Literal['$and'], Literal['$or']], Union[str, int, float, bool]], Dict[Union[Literal['$in'], Literal['$nin']], List[Union[str, int, float, bool]]], Dict[Union[Literal['$contains'], Literal['$not_contains']], Union[str, int, float, bool]], List[ForwardRef('Where')]]]]]]] = None, where_document: Optional[Dict[Union[Literal['$contains'], Literal['$not_contains'], Literal['$regex'], Literal['$not_regex'], Literal['$and'], Literal['$or']], Union[str, List[Dict[Union[Literal['$contains'], Literal['$not_contains'], Literal['$regex'], Literal['$not_regex'], Literal['$and'], Literal['$or']], Union[str, List[ForwardRef('WhereDocument')]]]]]]] = None, include: List[Literal['documents', 'embeddings', 'metadatas', 'distances', 'uris', 'data']] = ['metadatas', 'embeddings', 'documents', 'distances']) -> Union[list[railtracks.vector_stores.vector_store_base.SearchResult], list[list[railtracks.vector_stores.vector_store_base.SearchResult]]]:
285    def search(  # noqa: C901
286        self,
287        query: OneOrMany[Chunk] | OneOrMany[str],
288        ids: Optional[OneOrMany[str]] = None,
289        top_k: int = 10,
290        where: Optional[Where] = None,
291        where_document: Optional[WhereDocument] = None,
292        include: Include = [
293            "metadatas",
294            "embeddings",
295            "documents",
296            "distances",
297        ],
298    ) -> OneOrMany[SearchResponse]:
299        """Run a similarity search for the provided query texts.
300
301        Args:
302            query: A list of query strings or singular string to search for.
303            ids: Optional list of ids or singular id to restrict the search to.
304            top_k: Number of hits to return per query.
305            where: Optional metadata filter to apply.
306            where_document: Optional document filter to apply.
307            include: Fields to include in the Chroma response.
308
309        Returns:
310            A list of :class:`SearchResponse` objects (one per query).
311
312        Raises:
313            ValueError: If expected fields are missing from the Chroma response.
314        """
315        is_many = True
316        # If a single chunk is passed in, convert to list of string
317        if isinstance(query, Chunk):
318            query = [query.content]
319            is_many = False
320
321        # If a single string is passed in, convert to list of string
322        elif isinstance(query, str):
323            query = [query]
324            is_many = False
325
326        # If list of chunks is passed in, convert to list of strings
327        elif isinstance(query, list) and all(isinstance(q, Chunk) for q in query):
328            query = [q.content for q in query]
329
330        elif isinstance(query, list) and all(isinstance(q, str) for q in query):
331            pass
332        else:
333            raise ValueError(
334                "Query must be a string, Chunk, or list of strings/Chunks."
335            )
336
337        query_embeddings = self._embedding_function(query)
338        results = self._collection.query(
339            query_embeddings=list(query_embeddings),
340            ids=ids,
341            n_results=top_k,
342            where=where,
343            where_document=where_document,
344            include=include,
345        )
346        answer: list[SearchResponse] = []
347        for query_idx, query_response in enumerate(results["ids"]):
348            search_response = SearchResponse()
349            for id_idx, id in enumerate(query_response):
350                if not (distance := results.get("distances")):
351                    raise ValueError("Distance not found in search results.")
352                elif not (vector := results.get("embeddings")):
353                    raise ValueError("Vector not found in search results.")
354                elif not (document := results.get("documents")):
355                    raise ValueError("Document not found in search results.")
356                elif not (metadatas := results.get("metadatas")):
357                    raise ValueError("Metadata not found in search results.")
358
359                distance = distance[query_idx][id_idx]
360                vector = list(vector[query_idx][id_idx])
361                document = document[query_idx][id_idx]
362                metadata = dict(deepcopy(metadatas[query_idx][id_idx]))
363
364                if not (content := metadata.get(CONTENT)) or not isinstance(
365                    content, str
366                ):
367                    raise ValueError(
368                        "Content was not initialized in vector. Please create an issue"
369                    )
370
371                metadata.pop(CONTENT)
372
373                search_response.append(
374                    SearchResult(
375                        id=id,
376                        distance=distance,
377                        content=content,
378                        vector=vector,
379                        document=document,  # Chroma document is just a str
380                        metadata=metadata,
381                    )
382                )
383            answer.append(search_response)
384
385        return answer if is_many else answer[0]

Run a similarity search for the provided query texts.

Arguments:
  • query: A list of query strings or singular string to search for.
  • ids: Optional list of ids or singular id to restrict the search to.
  • top_k: Number of hits to return per query.
  • where: Optional metadata filter to apply.
  • where_document: Optional document filter to apply.
  • include: Fields to include in the Chroma response.
Returns:

A list of SearchResponse objects (one per query).

Raises:
  • ValueError: If expected fields are missing from the Chroma response.
def delete( self, ids: Union[str, list[str]], where: Optional[Dict[Union[str, Literal['$and'], Literal['$or']], Union[str, int, float, bool, Dict[Union[Literal['$gt'], Literal['$gte'], Literal['$lt'], Literal['$lte'], Literal['$ne'], Literal['$eq'], Literal['$and'], Literal['$or']], Union[str, int, float, bool]], Dict[Union[Literal['$in'], Literal['$nin']], List[Union[str, int, float, bool]]], Dict[Union[Literal['$contains'], Literal['$not_contains']], Union[str, int, float, bool]], List[Dict[Union[str, Literal['$and'], Literal['$or']], Union[str, int, float, bool, Dict[Union[Literal['$gt'], Literal['$gte'], Literal['$lt'], Literal['$lte'], Literal['$ne'], Literal['$eq'], Literal['$and'], Literal['$or']], Union[str, int, float, bool]], Dict[Union[Literal['$in'], Literal['$nin']], List[Union[str, int, float, bool]]], Dict[Union[Literal['$contains'], Literal['$not_contains']], Union[str, int, float, bool]], List[ForwardRef('Where')]]]]]]] = None, where_document: Optional[Dict[Union[Literal['$contains'], Literal['$not_contains'], Literal['$regex'], Literal['$not_regex'], Literal['$and'], Literal['$or']], Union[str, List[Dict[Union[Literal['$contains'], Literal['$not_contains'], Literal['$regex'], Literal['$not_regex'], Literal['$and'], Literal['$or']], Union[str, List[ForwardRef('WhereDocument')]]]]]]] = None):
387    def delete(
388        self,
389        ids: OneOrMany[str],
390        where: Optional[Where] = None,
391        where_document: Optional[WhereDocument] = None,
392    ):
393        """
394        Remove vectors from the store by id or metadata filter.
395        Args:
396            ids: list of ids or singular id to delete.
397            where: Optional metadata filter.
398            where_document: Optional document-based filter.
399        """
400
401        self._collection.delete(
402            ids=ids,
403            where=where,
404            where_document=where_document,
405        )

Remove vectors from the store by id or metadata filter.

Arguments:
  • ids: list of ids or singular id to delete.
  • where: Optional metadata filter.
  • where_document: Optional document-based filter.
def count(self) -> int:
407    def count(self) -> int:
408        """Return the total number of vectors stored in the collection."""
409        return self._collection.count()

Return the total number of vectors stored in the collection.

@dataclass
class Chunk:
10@dataclass
11class Chunk:
12    """Structured chunk that can be upserted to a vector store.
13
14    Attributes:
15        content (str): The raw chunk text.
16        id (Optional[str]): Identifier for the chunk. If not provided, a UUID
17            is automatically generated in ``__post_init__``.
18        document (Optional[str]): Optional document identifier or content
19            associated with the chunk.
20        metadata (dict[str, Any]): Arbitrary key-value metadata associated
21            with this chunk. Defaults to an empty dictionary.
22
23    """
24
25    content: str
26    id: Optional[str] = None
27    document: Optional[str] = None
28    metadata: Optional[dict[str, Any]] = field(default_factory=dict)
29
30    def __post_init__(self) -> None:
31        """Normalize metadata and ensure identifier is populated."""
32        if self.metadata is None:
33            self.metadata = {}
34        if self.id is None:
35            self.id = str(uuid4())

Structured chunk that can be upserted to a vector store.

Attributes:
  • content (str): The raw chunk text.
  • id (Optional[str]): Identifier for the chunk. If not provided, a UUID is automatically generated in __post_init__.
  • document (Optional[str]): Optional document identifier or content associated with the chunk.
  • metadata (dict[str, Any]): Arbitrary key-value metadata associated with this chunk. Defaults to an empty dictionary.
Chunk( content: str, id: Optional[str] = None, document: Optional[str] = None, metadata: Optional[dict[str, Any]] = <factory>)
content: str
id: Optional[str] = None
document: Optional[str] = None
metadata: Optional[dict[str, Any]]
F = <railtracks.vector_stores.filter._FilterBuilder object>
class FixedTokenChunker(railtracks.vector_stores.chunking.base_chunker.BaseChunker):
 9class FixedTokenChunker(BaseChunker):
10    """A chunker that splits text strictly by token count.
11
12    This implementation divides text using a fixed token window, optionally
13    with overlap between chunks. Tokenization is performed using `tiktoken`
14    and defaults to the `cl100k_base` tokenizer unless otherwise specified.
15
16    Args:
17        chunk_size (int): Maximum number of tokens allowed in a produced chunk.
18            Defaults to 400.
19        overlap (int): Number of tokens shared between adjacent chunks.
20            Defaults to 200.
21        tokenizer (Optional[str]): Name of the `tiktoken` encoding to use. If
22            omitted, ``cl100k_base`` is used.
23
24    Attributes:
25        _chunk_size (int): Internal storage for chunk size.
26        _overlap (int): Internal storage for token overlap.
27        _tokenizer (tiktoken.Encoding): Tokenizer used for encoding/decoding.
28    """
29
30    def __init__(
31        self, chunk_size: int = 400, overlap: int = 200, tokenizer: Optional[str] = None
32    ):
33        super().__init__(chunk_size, overlap)
34        self._tokenizer = (
35            tiktoken.get_encoding(tokenizer)
36            if tokenizer
37            else tiktoken.get_encoding("cl100k_base")
38        )
39
40    def split_text(
41        self,
42        text: str,
43    ) -> list[str]:
44        """Split raw text into token-based windows.
45
46        The text is tokenized using the configured tokenizer, and then divided
47        into windows of ``_chunk_size`` tokens with ``_overlap`` tokens of
48        backward overlap.
49
50        Args:
51            text (str): Raw text to split.
52
53        Returns:
54            list[str]: A list of text segments decoded back from token windows.
55                Note : returns an empty list if passed an empty string
56        """
57
58        text_chunks = []
59        tokens = self._tokenizer.encode(text)
60        start = 0
61
62        while start < len(tokens):
63            end = min(start + self._chunk_size, len(tokens))
64            token_window = tokens[start:end]
65            text_chunks.append(self._tokenizer.decode(token_window))
66            start += self._chunk_size - self._overlap
67
68        return text_chunks

A chunker that splits text strictly by token count.

This implementation divides text using a fixed token window, optionally with overlap between chunks. Tokenization is performed using tiktoken and defaults to the cl100k_base tokenizer unless otherwise specified.

Arguments:
  • chunk_size (int): Maximum number of tokens allowed in a produced chunk. Defaults to 400.
  • overlap (int): Number of tokens shared between adjacent chunks. Defaults to 200.
  • tokenizer (Optional[str]): Name of the tiktoken encoding to use. If omitted, cl100k_base is used.
Attributes:
  • _chunk_size (int): Internal storage for chunk size.
  • _overlap (int): Internal storage for token overlap.
  • _tokenizer (tiktoken.Encoding): Tokenizer used for encoding/decoding.
FixedTokenChunker( chunk_size: int = 400, overlap: int = 200, tokenizer: Optional[str] = None)
30    def __init__(
31        self, chunk_size: int = 400, overlap: int = 200, tokenizer: Optional[str] = None
32    ):
33        super().__init__(chunk_size, overlap)
34        self._tokenizer = (
35            tiktoken.get_encoding(tokenizer)
36            if tokenizer
37            else tiktoken.get_encoding("cl100k_base")
38        )
def split_text(self, text: str) -> list[str]:
40    def split_text(
41        self,
42        text: str,
43    ) -> list[str]:
44        """Split raw text into token-based windows.
45
46        The text is tokenized using the configured tokenizer, and then divided
47        into windows of ``_chunk_size`` tokens with ``_overlap`` tokens of
48        backward overlap.
49
50        Args:
51            text (str): Raw text to split.
52
53        Returns:
54            list[str]: A list of text segments decoded back from token windows.
55                Note : returns an empty list if passed an empty string
56        """
57
58        text_chunks = []
59        tokens = self._tokenizer.encode(text)
60        start = 0
61
62        while start < len(tokens):
63            end = min(start + self._chunk_size, len(tokens))
64            token_window = tokens[start:end]
65            text_chunks.append(self._tokenizer.decode(token_window))
66            start += self._chunk_size - self._overlap
67
68        return text_chunks

Split raw text into token-based windows.

The text is tokenized using the configured tokenizer, and then divided into windows of _chunk_size tokens with _overlap tokens of backward overlap.

Arguments:
  • text (str): Raw text to split.
Returns:

list[str]: A list of text segments decoded back from token windows. Note : returns an empty list if passed an empty string

class MediaParser:
10class MediaParser:
11    """General-purpose media parser capable of extracting text from various file types.
12
13    Currently supports:
14        - .txt
15        - .pdf
16    """
17
18    @classmethod
19    def get_text(cls, path: str, **kwargs) -> str:
20        """Return cleaned text extracted from a supported file.
21
22        Args:
23            path: Path to the file
24            **kwargs: Parser-specific arguments (e.g., encoding for .txt files)
25        """
26        ext = cls._get_extension(path)
27        if ext == ".txt":
28            handler_name = "_parse_txt"
29
30        elif ext == ".pdf":
31            handler_name = "_parse_pdf"
32
33        else:
34            raise ValueError(f"Unsupported file type: {ext}")
35
36        parser_function = getattr(cls, handler_name)
37        raw_text = parser_function(path, **kwargs)  # Pass kwargs through
38        return cls._clean_text(raw_text)
39
40    @staticmethod
41    def _parse_txt(filepath: str, encoding: Optional[str] = None, **kwargs) -> str:
42        """Extract text from a plain .txt file."""
43        if not os.path.isfile(filepath):
44            raise FileNotFoundError(f"File not found: {filepath}")
45
46        if encoding is not None:
47            with open(filepath, "r", encoding=encoding) as f:
48                return f.read()
49
50        # Auto-detect encoding
51        detected = from_path(filepath).best()
52        if detected is None:
53            raise ValueError(f"Failed to detect encoding for: {filepath}")
54
55        with open(filepath, "r", encoding=detected.encoding) as f:
56            return f.read()
57
58    @staticmethod
59    def _parse_pdf(filepath: str, **kwargs) -> str:
60        """Extract text from a PDF using pdfplumber."""
61        if not os.path.isfile(filepath):
62            raise FileNotFoundError(f"File not found: {filepath}")
63
64        global _pdfplumber
65
66        if _pdfplumber is None:
67            try:
68                import pdfplumber
69
70                _pdfplumber = pdfplumber
71            except ImportError:
72                raise RuntimeError(
73                    "pdfplumber is required for PDF parsing but isn't installed. "
74                    "Install it via `pip install pdfplumber`."
75                )
76
77        with _pdfplumber.open(filepath) as doc:
78            extracted = []
79            for page in doc.pages:
80                text = page.extract_text()
81                if text:
82                    extracted.append(text)
83            return "\n".join(extracted)
84
85    @staticmethod
86    def _clean_text(text: str) -> str:
87        """Remove null bytes / non-printable characters while preserving whitespace."""
88        if not text:
89            return ""
90        return "".join(char for char in text if char.isprintable() or char in "\t\n\r")
91
92    @staticmethod
93    def _get_extension(path: str) -> str:
94        """Return file extension in lowercase."""
95        _, ext = os.path.splitext(path)
96        return ext.lower()

General-purpose media parser capable of extracting text from various file types.

Currently supports:
  • .txt
  • .pdf
@classmethod
def get_text(cls, path: str, **kwargs) -> str:
18    @classmethod
19    def get_text(cls, path: str, **kwargs) -> str:
20        """Return cleaned text extracted from a supported file.
21
22        Args:
23            path: Path to the file
24            **kwargs: Parser-specific arguments (e.g., encoding for .txt files)
25        """
26        ext = cls._get_extension(path)
27        if ext == ".txt":
28            handler_name = "_parse_txt"
29
30        elif ext == ".pdf":
31            handler_name = "_parse_pdf"
32
33        else:
34            raise ValueError(f"Unsupported file type: {ext}")
35
36        parser_function = getattr(cls, handler_name)
37        raw_text = parser_function(path, **kwargs)  # Pass kwargs through
38        return cls._clean_text(raw_text)

Return cleaned text extracted from a supported file.

Arguments:
  • path: Path to the file
  • **kwargs: Parser-specific arguments (e.g., encoding for .txt files)