RetrievalRuntime: Streaming pipeline for ingestion and retrieval#1109
Open
Amir-R25 wants to merge 8 commits into
Open
RetrievalRuntime: Streaming pipeline for ingestion and retrieval#1109Amir-R25 wants to merge 8 commits into
RetrievalRuntime: Streaming pipeline for ingestion and retrieval#1109Amir-R25 wants to merge 8 commits into
Conversation
…e, filter, and vector store base classes.
… corresponding tests
Collaborator
Author
|
A few example scripts to play with. All three share the same wiring — only the In-memory ingest + retrievefrom rich import print
from railtracks.retrieval import RetrievalRuntime, VectorStore
from railtracks.retrieval.chunking import SentenceChunker
from railtracks.retrieval.loaders import TextLoader
from railtracks.retrieval.stores import InMemoryVectorBackend
from railtracks.retrieval.embedding import OpenAIEmbedding
from railtracks.retrieval.embedding.models import EmbeddingFailure
from railtracks.retrieval.runtime import BatchIngested, DocumentFailed, DocumentSkipped
async def main() -> None:
docs_path = "path/to/directory"
rr = RetrievalRuntime(
chunker=SentenceChunker(chunk_size=5, overlap=2),
embedder=OpenAIEmbedding(model="text-embedding-3-small"),
store=VectorStore(InMemoryVectorBackend()),
batch_size=64,
)
loader = TextLoader(str(docs_path))
async for event in rr.ingest(loader):
match event:
case BatchIngested(document_id=did, embedded_chunks=ch, batch_index=i):
print(f" + doc={str(did)[:8]} batch={i} chunks={len(ch)}")
case EmbeddingFailure(errors=errs):
print(f" ! embedding failed: {errs[0]}")
case DocumentFailed(document_id=did):
print(f" ! doc {str(did)[:8]} partially failed")
case DocumentSkipped(source=src):
print(f" ~ skipped (unchanged): {src}")
result = await rr.retrieve("query text")
print(f"\nQuery: {result.query}")
for hit in result.chunks:
snippet = hit.chunk.content.replace("\n", " ")
print(f" [score={hit.score:.3f}] {snippet}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())Persistent ingest + retrieve with Chromafrom rich import print
from railtracks.retrieval import RetrievalRuntime, VectorStore
from railtracks.retrieval.chunking import SentenceChunker
from railtracks.retrieval.loaders import TextLoader
from railtracks.retrieval.stores import ChromaBackend
from railtracks.retrieval.embedding import OpenAIEmbedding
from railtracks.retrieval.embedding.models import EmbeddingFailure
from railtracks.retrieval.runtime import BatchIngested, DocumentFailed, DocumentSkipped
async def main() -> None:
docs_path = "path/to/directory"
vsb = ChromaBackend("my_collection", path="retrieval-demos/stores")
await vsb.initialize()
rr = RetrievalRuntime(
chunker=SentenceChunker(chunk_size=5, overlap=2),
embedder=OpenAIEmbedding(model="text-embedding-3-small"),
store=VectorStore(vsb),
batch_size=64,
)
loader = TextLoader(str(docs_path))
async for event in rr.ingest(loader):
match event:
case BatchIngested(document_id=did, embedded_chunks=ch, batch_index=i):
print(f" + doc={str(did)[:8]} batch={i} chunks={len(ch)}")
case EmbeddingFailure(errors=errs):
print(f" ! embedding failed: {errs[0]}")
case DocumentFailed(document_id=did):
print(f" ! doc {str(did)[:8]} partially failed")
case DocumentSkipped(source=src):
print(f" ~ skipped (unchanged): {src}")
result = await rr.retrieve("query text")
print(f"\nQuery: {result.query}")
for hit in result.chunks:
snippet = hit.chunk.content.replace("\n", " ")
print(f" [score={hit.score:.3f}] {snippet}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())Retrieve against a previously-ingested Chroma storeRe-running ingest on unchanged sources is a no-op (see count-aware staleness), so from rich import print
from railtracks.retrieval import RetrievalRuntime, VectorStore
from railtracks.retrieval.chunking import SentenceChunker
from railtracks.retrieval.stores import ChromaBackend
from railtracks.retrieval.embedding import OpenAIEmbedding
async def main() -> None:
vsb = await ChromaBackend.create("my_collection", path="retrieval-demos/stores")
rr = RetrievalRuntime(
chunker=SentenceChunker(chunk_size=5, overlap=2),
embedder=OpenAIEmbedding(model="text-embedding-3-small"),
store=VectorStore(vsb),
batch_size=64,
)
result = await rr.retrieve("query text")
print(f"\nQuery: {result.query}")
for hit in result.chunks:
snippet = hit.chunk.content.replace("\n", " ")
print(f" [score={hit.score:.3f}] {snippet}")
if __name__ == "__main__":
import asyncio
asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds the
RetrievalRuntimeorchestrator and the supportingStore/ loaderchanges needed to drive the full ingest → retrieve flow end-to-end. Also
removes the legacy
railtracks.vector_storespackage now thatrailtracks.retrievalsupersedes it.RetrievalRuntimeThe orchestrator that wires a chunker + embedder +
Store(+ optionalscope)into the ingest/retrieve flow.
ingest(), not the constructor: one runtime captureshow to process (chunker/embedder/store/scope); the loader decides what.
A single runtime can ingest from many sources and re-ingest to update.
ingest(loader)is an async generator yieldingper-batch events;
ingest_all(loader)drains it and returnsIngestionStats.BatchIngested(carries per-batchEmbeddingMetrics— tokens, cost,latency, vector count),
EmbeddingFailure,DocumentFailed,DocumentSkipped.batch_indexis per-document, not run-global.fires
store.delete_where({"document_id": str(doc.id)})to clear the priorversion. The delete only runs once a batch succeeds, so a total embedding
failure preserves the previous version. Writes are per-chunk and not
transactional — a crash mid-write leaves a partial document (recovered on the
next ingest, see below).
the store already holds a complete copy — matched on
source_path+content_hashand the persisteddoc_chunk_count. A partially-written document(fewer chunks than expected after an interrupted run) is re-ingested rather than
left broken. Counting is done via
find()rather than acount()call so theruntime depends only on the
Storeprotocol.max_tokensis set, chunks over the per-item limitare dropped before embedding and surfaced as
EmbeddingFailureinstead ofcausing provider 4xx errors. Uses
TiktokenTokenizerby default. (Partial fixfor the embedding per-item token-cap gap — see Known limitations.)
batch; a later
retrieve()with a different embedder raisesEmbeddingModelMismatchError(cross-model similarity scores are meaningless).Note: capture is in-process only — a fresh runtime over an existing store
won't enforce until its first ingest.
on_ingest/on_retrievecallbacks for logging/observability;delete_document(id)convenience wrapper.storesmoduleStoreprotocol:delete_where(filters)andfind(filters, limit=1)(metadata-onlylookup, no vector search) — both required by the runtime's upsert/staleness paths.
StoreEntry:vectoris nowlist[float] | None. Read results no longer round-trip thevector (was
[], nowNone) — the backend owns the stored vector; callers mustnot rely on this field on retrieved entries.
StoreQuery:scopeis now optional (StoreScope | None) for single-tenant callers.metadata_filtersretypeddict[str, Any](wasdict[str, str]).strategiesfield and theRetrievalStrategyenum.VectorStore(base) /VectorBackend:VectorBackendprotocol gainedlist_where(filters, limit)andcount(filters).countlives on the backend, notStore— keeps the runtime's dependencysurface to the
Storeprotocol alone.VectorStorenow implementsfind,delete_where, andcount.chunk_metadatavalues to the top level (inaddition to the JSON-encoded blob) so flat-equality
metadata_filters/findwork against them.
Backend implementations (
chroma,in_memory,pgvector) all implementlist_where+count. Plus:_build_wherenow compares JSONB-to-JSONB(
payload->$k::text = $v::jsonb) so non-string scalars (int/bool/None) keeptheir JSON type instead of being stringified. Filters are parameterized;
LIMITis
int-cast before interpolation. Addedpool_kwargspassthrough toasyncpg.create_poolfor tuningmin_size/max_size/etc._flushis now async — JSON encode happens under the lock, thedisk write is offloaded to a thread so the event loop isn't blocked. Search now
sanitizes non-finite scores (NaN/inf from a misbehaving embedder): they're
logged and sorted/dropped to the end instead of corrupting the ranking.
loadersmoduleDocument:idis now derived deterministically fromsourceviauuid5(NAMESPACE_URL, source)so re-ingesting the same source yields the sameid across processes. Fixes a silent upsert bug where modified files left their
prior chunks orphaned in the store, because
delete_where({"document_id": ...})was keyed on a fresh random UUID each pass. Sourceless documents fall back to
uuid4()(no stable identity ⇒ no upsert semantics).added
content_hash(SHA-256, computed by the runtime at ingest time; loadersleave it
None) used by staleness detection.typenow defaults toDocumentType.TEXT.Sanitizerprotocol for PII redaction (sync or asyncsanitize; errorspropagate, no logic baked into the framework).
SanitizingLoaderwraps anyBaseDocumentLoader+ aSanitizer, running everyyielded document through it.
Removals / cleanup
railtracks.vector_storespackage (chroma,chunking/,filter,vector_store_base) and its tests — fully superseded byrailtracks.retrieval.storesandrailtracks.retrieval.chunking(~7.5k lines).retrieval.__init__now exports the public surface:RetrievalRuntime, theingestion event/stats types,
Store,StoreEntry,StoreQuery,StoreScope,VectorStore,EmbeddingFailure,EmbeddingModelMismatchError.Type of change
Checklist
ruff check . && ruff format .)pytest tests)Notes
Review callouts
what makes an interrupted ingest self-heal on the next run.
_captured_modelis in-process only — model-mismatch enforcement doesn'tsurvive a fresh runtime over a pre-populated store until its first ingest.
list_whereinterpolatesLIMIT {int(limit)}(int-cast, safe); allfilter values stay parameterized.
Known limitations / follow-ups
max_tokensguard enforces a per-item token cap (drops oversized chunkspre-embedding); it does not do batch-level token-budget packing. Batches are
still sized by count (
default_batch_size), so a batch of in-spec chunks canstill exceed a provider's per-request token limit (e.g. OpenAI's 8191). Worth a
follow-up for token-aware batch packing.